aboutsummaryrefslogtreecommitdiff
path: root/libpipe
diff options
context:
space:
mode:
Diffstat (limited to 'libpipe')
-rw-r--r--libpipe/ChangeLog170
-rw-r--r--libpipe/Makefile32
-rw-r--r--libpipe/addr.c29
-rw-r--r--libpipe/dgram.c51
-rw-r--r--libpipe/pipe-funcs.c2
-rw-r--r--libpipe/pipe.c416
-rw-r--r--libpipe/pipe.h381
-rw-r--r--libpipe/pq-funcs.c2
-rw-r--r--libpipe/pq.c434
-rw-r--r--libpipe/pq.h260
-rw-r--r--libpipe/seqpack.c55
-rw-r--r--libpipe/stream.c68
12 files changed, 1900 insertions, 0 deletions
diff --git a/libpipe/ChangeLog b/libpipe/ChangeLog
new file mode 100644
index 00000000..e0dc3ea1
--- /dev/null
+++ b/libpipe/ChangeLog
@@ -0,0 +1,170 @@
+1999-07-10 Roland McGrath <roland@baalperazim.frob.com>
+
+ * pq.c: Add #include <sys/mman.h> for munmap decl.
+
+1999-07-09 Thomas Bushnell, BSG <tb@mit.edu>
+
+ * pq.c (packet_realloc): Use mmap instead of vm_allocate.
+ (packet_read_ports): Likewise.
+ (packet_read): Likewise.
+
+1999-07-03 Thomas Bushnell, BSG <tb@mit.edu>
+
+ * pq.c (free_packets): Use munmap instead of vm_deallocate.
+
+1998-10-20 Roland McGrath <roland@baalperazim.frob.com>
+
+ * pq.c (free_packets): Add braces to silence gcc warning.
+ (packet_realloc): Likewise.
+ * pipe.c (pipe_send): Likewise.
+ (pipe_recv): Likewise.
+
+Mon Oct 21 21:58:03 1996 Thomas Bushnell, n/BSG <thomas@gnu.ai.mit.edu>
+
+ * pipe.h: Add extern inline protection.
+ * pq.h: Likewise.
+ * pipe-funcs.c, pq-funcs.c: New files.
+ * Makefile (SRCS): Add pipe-funcs.c and pq-funcs.c.
+
+Thu Sep 12 16:24:41 1996 Thomas Bushnell, n/BSG <thomas@gnu.ai.mit.edu>
+
+ * Makefile (HURDLIBS): New variable.
+ (libpipe.so): Delete special dependency.
+
+Tue Jul 16 11:33:34 1996 Michael I. Bushnell, p/BSG <mib@gnu.ai.mit.edu>
+
+ * pipe.h (EWOULDBLOCK): Define to work around new libc bug.
+
+Mon Jul 1 17:29:07 1996 Miles Bader <miles@gnu.ai.mit.edu>
+
+ * pq.c (pq_queue): Initialize PACKET->buf_vm_alloced.
+
+Tue Jan 23 12:44:40 1996 Miles Bader <miles@gnu.ai.mit.edu>
+
+ * pq.h (packet_ensure, packet_ensure_efficiently): Use packet_fit().
+ (packet_fit): New function.
+ * pq.c (packet_read): If there's lots of empty space at the
+ beginning of a vm_alloced buffer, deallocate it.
+
+Mon Jan 22 17:12:39 1996 Miles Bader <miles@gnu.ai.mit.edu>
+
+ * pq.c (packet_realloc): Reflect in the new values of BUF_START &
+ BUF_END that we've removed any empty space at the beginning of BUF.
+
+Sat Jan 13 13:56:13 1996 Miles Bader <miles@gnu.ai.mit.edu>
+
+ * pq.h (packet_ensure, packet_ensure_efficiently): Use
+ packet_new_size() instead of packet_size_adjust().
+ (packet_size_adjust): Declaration removed.
+ (packet_new_size): New declaration.
+ * pq.c (packet_size_adjust): Function removed.
+ (packet_new_size): New function.
+
+ * pq.c (packet_read): Re-arrange to be slightly less confusing.
+ Reverse start-past-buf-beginning test that may have leaked memory.
+
+ * pipe.c (pipe_send): For non-blocking writes, avoid writing more
+ than the user requested.
+
+Fri Jan 12 12:15:06 1996 Miles Bader <miles@gnu.ai.mit.edu>
+
+ * pq.c (pq_queue): Initialize the ports_alloced field.
+ (packet_read): When a page-aligned read consumes the whole buffer,
+ but there's a non-page-multiple amount available, don't let
+ buf_len become negative.
+
+Mon Oct 9 14:57:48 1995 Roland McGrath <roland@churchy.gnu.ai.mit.edu>
+
+ * Makefile: Specify shared library dependencies.
+
+Thu Sep 7 09:08:30 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu>
+
+ * pq.c (packet_read): Don't leave PACKET in a fucked up state when
+ it's vm_allocate'd but doesn't a page-multiple amount of data and
+ we're reading everything.
+
+ * pipe.c (_pipe_no_readers): REALLY wake up writers when the pipe
+ breaks.
+
+Fri Sep 1 10:42:03 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu>
+
+ * pipe.c (_pipe_no_readers): Wake up write selects too when the
+ pipe breaks.
+
+Thu Aug 31 14:39:21 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu>
+
+ * pipe.c (pipe_send): Make writes blockable.
+ (pipe_recv): Wakeup blocked writers.
+ (pipe_kick): Function deleted.
+ (pipe_create): Make connection-oriented pipes start out broken.
+ (_pipe_first_reader): New function.
+ (_pipe_first_writer): Don't check whether PIPE is connection-
+ oriented before clearing PIPE_BROKEN, as otherwise it will never
+ be set.
+ (pipe_pair_select): New function.
+ (pipe_multiple_lock): New variable.
+ * pipe.h (pipe_wait): Renamed to `pipe_wait_readable'.
+ (pipe_select): Renamed to `pipe_select_readable'.
+ (pipe_writable, pipe_wait_writable, pipe_select_writable): New funcs.
+ (pipe_acquire_reader): Call _pipe_first_reader if necessary.
+ (_pipe_first_reader): New declaration.
+ (struct pipe): New fields: `write_limit', `write_atomic',
+ `pending_writes', `pending_write_selects'.
+ (struct pipe): `pending_selects' changed to `pending_read_selects'.
+ (pipe_pair_select): New declaration.
+
+Tue Aug 29 14:37:49 1995 Miles Bader <miles@geech.gnu.ai.mit.edu>
+
+ * pipe.c (pipe_send): Use condition_broadcast, not condition_signal.
+ * pipe.h (pipe_select): New function.
+
+ * pipe.h (struct pipe): Remove interrupt_seq_num field.
+ (pipe_wait): Use hurd_condition_wait to detect interrupts instead
+ of previous ad-hoc mechanism.
+
+ * pipe.c (pipe_create): Don't initialize interrupt_seq_num field.
+
+ * pipe.h (pipe_acquire_reader, pipe_acquire_writer,
+ pipe_add_reader, pipe_add_writer): `aquire' -> `acquire'.
+
+Fri Aug 11 18:35:32 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu>
+
+ * pipe.c (pipe_create): Initialize READERS & WRITERS, not REFS.
+ (_pipe_first_writer): New function.
+ (_pipe_no_writers, _pipe_no_writers): New function.
+ (pipe_break): Function deleted.
+
+Wed Aug 9 12:53:05 1995 Miles Bader <miles@geech.gnu.ai.mit.edu>
+
+ * pipe.h (struct pipe): Keep separate ref counts for readers and
+ writers.
+ (pipe_aquire_reader, pipe_aquire_writer): New functions.
+ (pipe_release_reader, pipe_release_writer): New functions.
+ (pipe_add_reader, pipe_add_writer): New functions.
+ (pipe_remove_reader, pipe_remove_writer): New functions.
+ (_pipe_first_writer): New function decl.
+ (_pipe_no_writers, _pipe_no_writers): New function decl.
+ (pipe_aquire, pipe_release): Function deleted.
+ (pipe_break): Function decl deleted.
+
+Tue Aug 1 12:37:27 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu>
+
+ * pipe.c (pipe_recv): Get rid of code to deal with getting the
+ source address from the control packet if there is no data packet,
+ since pipe_write always writes a data packet.
+
+Mon Jul 31 14:50:00 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu>
+
+ * pipe.c (pipe_recv): Assert that a control packet should only
+ have a source address if there is no corresponding data packet.
+ (pipe_send): Change the test to determine whether we should write
+ a control packet, so that we only do so if we need to. Also,
+ don't record the source address in control packets, as it's
+ recorded in the following data packet anyway, and this prevents it
+ from being dealloc'd twice.
+
+Fri Jul 28 23:03:27 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu>
+
+ * pipe.h (stream_pipe_class, dgram_pipe_class, seqpack_pipe_class):
+ Make these declarations extern so they don't fuck up initialization.
+ (seqpacket_pipe_class): Renamed to `seqpack_pipe_class'.
diff --git a/libpipe/Makefile b/libpipe/Makefile
new file mode 100644
index 00000000..b64166a6
--- /dev/null
+++ b/libpipe/Makefile
@@ -0,0 +1,32 @@
+# Makefile for libpipe
+#
+# Copyright (C) 1995, 1996 Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2, or (at
+# your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+dir := libpipe
+makemode := library
+
+libname = libpipe
+installhdrs = pipe.h pq.h
+
+SRCS = pq.c dgram.c pipe.c stream.c seqpack.c addr.c pq-funcs.c pipe-funcs.c
+LCLHDRS = pipe.h pq.h
+
+OBJS = $(SRCS:.c=.o)
+HURDLIBS=threads ports
+
+include ../Makeconf
+
diff --git a/libpipe/addr.c b/libpipe/addr.c
new file mode 100644
index 00000000..3af3ef8f
--- /dev/null
+++ b/libpipe/addr.c
@@ -0,0 +1,29 @@
+/* Default address deallocate function
+
+ Copyright (C) 1995 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#include <hurd/ports.h>
+
+/* This routine may be provided by the user, in which case, it should be a
+ function taking a non-NULL source address and deallocating it. It
+ defaults to calling ports_port_deref. */
+void pipe_dealloc_addr (void *addr)
+{
+ ports_port_deref (addr);
+}
diff --git a/libpipe/dgram.c b/libpipe/dgram.c
new file mode 100644
index 00000000..3f3b2ab6
--- /dev/null
+++ b/libpipe/dgram.c
@@ -0,0 +1,51 @@
+/* The SOCK_DGRAM pipe class
+
+ Copyright (C) 1995 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#include <sys/socket.h> /* For SOCK_DGRAM */
+
+#include "pipe.h"
+#include "pq.h"
+
+/* See the definition of struct pipe_class in "pipe.h" for documentation. */
+
+static error_t
+dgram_write (struct pq *pq, void *source,
+ char *data, size_t data_len, size_t *amount)
+{
+ struct packet *packet = pq_queue (pq, PACKET_TYPE_DATA, source);
+ if (!packet)
+ return ENOBUFS;
+ else
+ return packet_write (packet, data, data_len, amount);
+}
+
+static error_t
+dgram_read (struct packet *packet, int *dequeue, unsigned *flags,
+ char **data, size_t *data_len, size_t amount)
+{
+ *dequeue = 1;
+ return packet_read (packet, data, data_len, amount);
+}
+
+struct pipe_class _dgram_pipe_class =
+{
+ SOCK_DGRAM, PIPE_CLASS_CONNECTIONLESS, dgram_read, dgram_write
+};
+struct pipe_class *dgram_pipe_class = &_dgram_pipe_class;
diff --git a/libpipe/pipe-funcs.c b/libpipe/pipe-funcs.c
new file mode 100644
index 00000000..450180ee
--- /dev/null
+++ b/libpipe/pipe-funcs.c
@@ -0,0 +1,2 @@
+#define PIPE_EI
+#include "pipe.h"
diff --git a/libpipe/pipe.c b/libpipe/pipe.c
new file mode 100644
index 00000000..914816bc
--- /dev/null
+++ b/libpipe/pipe.c
@@ -0,0 +1,416 @@
+/* Generic one-way pipes
+
+ Copyright (C) 1995, 1998 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#include <string.h> /* For bzero() */
+#include <assert.h>
+
+#include <mach/time_value.h>
+#include <mach/mach_host.h>
+
+#include <hurd/hurd_types.h>
+
+#include "pipe.h"
+
+static inline void
+timestamp (time_value_t *stamp)
+{
+ host_get_time (mach_host_self (), stamp);
+}
+
+/* Hold this lock before attempting to lock multiple pipes. */
+struct mutex pipe_multiple_lock = MUTEX_INITIALIZER;
+
+/* ---------------------------------------------------------------- */
+
+#define pipe_is_connless(p) ((p)->class->flags & PIPE_CLASS_CONNECTIONLESS)
+
+/* Creates a new pipe of class CLASS and returns it in RESULT. */
+error_t
+pipe_create (struct pipe_class *class, struct pipe **pipe)
+{
+ struct pipe *new = malloc (sizeof (struct pipe));
+
+ if (new == NULL)
+ return ENOMEM;
+
+ new->readers = 0;
+ new->writers = 0;
+ new->flags = 0;
+ new->class = class;
+ new->write_limit = 16*1024;
+ new->write_atomic = 16*1024;
+
+ bzero (&new->read_time, sizeof (new->read_time));
+ bzero (&new->write_time, sizeof (new->write_time));
+
+ condition_init (&new->pending_reads);
+ condition_init (&new->pending_read_selects);
+ condition_init (&new->pending_writes);
+ condition_init (&new->pending_write_selects);
+ mutex_init (&new->lock);
+
+ pq_create (&new->queue);
+
+ if (! pipe_is_connless (new))
+ new->flags |= PIPE_BROKEN;
+
+ *pipe = new;
+ return 0;
+}
+
+/* Free PIPE and any resources it holds. */
+void
+pipe_free (struct pipe *pipe)
+{
+ pq_free (pipe->queue);
+ free (pipe);
+}
+
+/* Take any actions necessary when PIPE acquires its first writer. */
+void _pipe_first_writer (struct pipe *pipe)
+{
+ if (pipe->readers > 0)
+ pipe->flags &= ~PIPE_BROKEN;
+}
+
+/* Take any actions necessary when PIPE acquires its first reader. */
+void _pipe_first_reader (struct pipe *pipe)
+{
+ if (pipe->writers > 0)
+ pipe->flags &= ~PIPE_BROKEN;
+}
+
+/* Take any actions necessary when PIPE's last reader has gone away. PIPE
+ should be locked. */
+void _pipe_no_readers (struct pipe *pipe)
+{
+ if (pipe->writers == 0)
+ pipe_free (pipe);
+ else
+ {
+ if (! pipe_is_connless (pipe))
+ {
+ pipe->flags |= PIPE_BROKEN;
+ if (pipe->writers)
+ /* Wake up writers for the bad news... */
+ {
+ condition_broadcast (&pipe->pending_writes);
+ condition_broadcast (&pipe->pending_write_selects);
+ }
+ }
+ mutex_unlock (&pipe->lock);
+ }
+}
+
+/* Take any actions necessary when PIPE's last writer has gone away. PIPE
+ should be locked. */
+void _pipe_no_writers (struct pipe *pipe)
+{
+ if (pipe->readers == 0)
+ pipe_free (pipe);
+ else
+ {
+ if (! pipe_is_connless (pipe))
+ {
+ pipe->flags |= PIPE_BROKEN;
+ if (pipe->readers)
+ /* Wake up readers for the bad news... */
+ {
+ condition_broadcast (&pipe->pending_reads);
+ condition_broadcast (&pipe->pending_read_selects);
+ }
+ }
+ mutex_unlock (&pipe->lock);
+ }
+}
+
+/* Return when either RPIPE is available for reading (if SELECT_READ is set
+ in *SELECT_TYPE), or WPIPE is available for writing (if select_write is
+ set in *SELECT_TYPE). *SELECT_TYPE is modified to reflect which (or both)
+ is now available. DATA_ONLY should be true if only data packets should be
+ waited for on RPIPE. Neither RPIPE or WPIPE should be locked when calling
+ this function (unlike most pipe functions). */
+error_t
+pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
+ int *select_type, int data_only)
+{
+ error_t err = 0;
+
+ *select_type &= SELECT_READ | SELECT_WRITE;
+
+ if (*select_type == SELECT_READ)
+ {
+ mutex_lock (&rpipe->lock);
+ err = pipe_select_readable (rpipe, data_only);
+ mutex_unlock (&rpipe->lock);
+ }
+ else if (*select_type == SELECT_WRITE)
+ {
+ mutex_lock (&wpipe->lock);
+ err = pipe_select_writable (wpipe);
+ mutex_unlock (&wpipe->lock);
+ }
+ else
+ /* ugh */
+ {
+ int rpipe_blocked, wpipe_blocked;
+ struct condition pending_read_write_select;
+ size_t wlimit = wpipe->write_limit;
+ struct mutex *lock =
+ (wpipe == rpipe ? &rpipe->lock : &pipe_multiple_lock);
+
+ condition_init (&pending_read_write_select);
+ condition_implies (&rpipe->pending_read_selects,
+ &pending_read_write_select);
+ condition_implies (&wpipe->pending_write_selects,
+ &pending_read_write_select);
+
+ mutex_lock (lock);
+ if (rpipe != wpipe)
+ {
+ mutex_lock (&rpipe->lock);
+ mutex_lock (&wpipe->lock);
+ }
+
+ rpipe_blocked =
+ ! ((rpipe->flags & PIPE_BROKEN) || pipe_is_readable (rpipe, data_only));
+ wpipe_blocked =
+ ! ((wpipe->flags & PIPE_BROKEN) || pipe_readable (wpipe, 1) < wlimit);
+ while (!err && rpipe_blocked && wpipe_blocked)
+ {
+ if (rpipe != wpipe)
+ {
+ mutex_unlock (&rpipe->lock);
+ mutex_unlock (&wpipe->lock);
+ }
+ if (hurd_condition_wait (&pending_read_write_select, lock))
+ err = EINTR;
+ if (rpipe != wpipe)
+ {
+ mutex_lock (&rpipe->lock);
+ mutex_lock (&wpipe->lock);
+ }
+ rpipe_blocked =
+ ! ((rpipe->flags & PIPE_BROKEN)
+ || pipe_is_readable (rpipe, data_only));
+ wpipe_blocked =
+ ! ((wpipe->flags & PIPE_BROKEN)
+ || pipe_readable (wpipe, 1) < wlimit);
+ }
+
+ if (!err)
+ {
+ if (rpipe_blocked)
+ *select_type &= ~SELECT_READ;
+ if (wpipe_blocked)
+ *select_type &= ~SELECT_WRITE;
+ }
+
+ if (rpipe != wpipe)
+ {
+ mutex_unlock (&rpipe->lock);
+ mutex_unlock (&wpipe->lock);
+ }
+ mutex_unlock (lock);
+
+ condition_unimplies (&rpipe->pending_read_selects,
+ &pending_read_write_select);
+ condition_unimplies (&wpipe->pending_write_selects,
+ &pending_read_write_select);
+ }
+
+ return err;
+}
+
+/* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and
+ returns the amount written in AMOUNT. If present, the information in
+ CONTROL & PORTS is written in a preceding control packet. If an error is
+ returned, nothing is done. */
+error_t
+pipe_send (struct pipe *pipe, int noblock, void *source,
+ char *data, size_t data_len,
+ char *control, size_t control_len,
+ mach_port_t *ports, size_t num_ports,
+ size_t *amount)
+{
+ error_t err = 0;
+
+ err = pipe_wait_writable (pipe, noblock);
+ if (err)
+ return err;
+
+ if (noblock)
+ {
+ size_t left = pipe->write_limit - pipe_readable (pipe, 1);
+ if (left < data_len)
+ {
+ if (data_len <= pipe->write_atomic)
+ return EWOULDBLOCK;
+ else
+ data_len = left;
+ }
+ }
+
+ if (control_len > 0 || num_ports > 0)
+ /* Write a control packet. */
+ {
+ /* Note that we don't record the source address in control packets, as
+ it's recorded in the following data packet anyway, and this prevents
+ it from being dealloc'd twice; this depends on the fact that we
+ always write a data packet. */
+ struct packet *control_packet =
+ pq_queue (pipe->queue, PACKET_TYPE_CONTROL, NULL);
+
+ if (control_packet == NULL)
+ err = ENOBUFS;
+ else
+ {
+ err = packet_write (control_packet, control, control_len, NULL);
+ if (!err)
+ err = packet_set_ports (control_packet, ports, num_ports);
+ if (err)
+ /* Trash CONTROL_PACKET somehow XXX */;
+ }
+ }
+
+ if (!err)
+ err = (*pipe->class->write)(pipe->queue, source, data, data_len, amount);
+
+ if (!err)
+ {
+ timestamp (&pipe->write_time);
+
+ /* And wakeup anyone that might be interested in it. */
+ condition_broadcast (&pipe->pending_reads);
+ mutex_unlock (&pipe->lock);
+
+ mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */
+ /* Only wakeup selects if there's still data available. */
+ if (pipe_is_readable (pipe, 0))
+ {
+ condition_broadcast (&pipe->pending_read_selects);
+ /* We leave PIPE locked here, assuming the caller will soon unlock
+ it and allow others access. */
+ }
+ }
+
+ return err;
+}
+
+/* Reads up to AMOUNT bytes from PIPE, which should be locked, into DATA, and
+ returns the amount read in DATA_LEN. If NOBLOCK is true, EWOULDBLOCK is
+ returned instead of block when no data is immediately available. If an
+ error is returned, nothing is done. If source isn't NULL, the address of
+ the socket from which the data was sent is returned in it; this may be
+ NULL if it wasn't specified by the sender (which is usually the case with
+ connection-oriented protcols).
+
+ If there is control data waiting (before any data), then the behavior
+ depends on whether this is an `ordinary read' (when CONTROL and PORTS are
+ both NULL), in which case any control data is skipped, or a `msg read', in
+ which case the contents of the first control packet is returned (in
+ CONTROL and PORTS), as well as the first data packet following that (if
+ the control packet is followed by another control packet or no packet in
+ this case, a zero length data buffer is returned; the user should be
+ careful to distinguish this case from EOF (when no control or ports data
+ is returned either). */
+error_t
+pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source,
+ char **data, size_t *data_len, size_t amount,
+ char **control, size_t *control_len,
+ mach_port_t **ports, size_t *num_ports)
+{
+ error_t err;
+ struct packet *packet;
+ struct pq *pq = pipe->queue;
+ /* True if the user isn't asking for any `control' data. */
+ int data_only = (control == NULL && ports == NULL);
+
+ err = pipe_wait_readable (pipe, noblock, data_only);
+ if (err)
+ return err;
+
+ packet = pq_head (pq, PACKET_TYPE_ANY, 0);
+
+ if (data_only)
+ /* The user doesn't want to know about control info, so skip any... */
+ while (packet && packet->type == PACKET_TYPE_CONTROL)
+ packet = pq_next (pq, PACKET_TYPE_ANY, 0);
+ else if (packet && packet->type == PACKET_TYPE_CONTROL)
+ /* Read this control packet first, before looking for a data packet. */
+ {
+ if (control != NULL)
+ packet_read (packet, control, control_len, packet_readable (packet));
+ if (ports != NULL)
+ /* Copy out the port rights being sent. */
+ packet_read_ports (packet, ports, num_ports);
+
+ packet = pq_next (pq, PACKET_TYPE_DATA, NULL);
+ assert (packet); /* pipe_write always writes a data packet. */
+ }
+ else
+ /* No control data... */
+ {
+ if (control_len)
+ *control_len = 0;
+ if (num_ports)
+ *num_ports = 0;
+ }
+
+ if (!err)
+ {
+ if (packet)
+ /* Read some data (PACKET must be a data packet at this point). */
+ {
+ int dq = 1; /* True if we should dequeue this packet. */
+
+ if (source)
+ packet_read_source (packet, source);
+
+ err = (*pipe->class->read)(packet, &dq, flags,
+ data, data_len, amount);
+ if (dq)
+ pq_dequeue (pq);
+ }
+ else
+ /* Return EOF. */
+ *data_len = 0;
+ }
+
+ if (!err && packet)
+ {
+ timestamp (&pipe->read_time);
+
+ /* And wakeup anyone that might be interested in it. */
+ condition_broadcast (&pipe->pending_writes);
+ mutex_unlock (&pipe->lock);
+
+ mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */
+ /* Only wakeup selects if there's still writing space available. */
+ if (pipe_readable (pipe, 1) < pipe->write_limit)
+ {
+ condition_broadcast (&pipe->pending_write_selects);
+ /* We leave PIPE locked here, assuming the caller will soon unlock
+ it and allow others access. */
+ }
+ }
+
+ return err;
+}
diff --git a/libpipe/pipe.h b/libpipe/pipe.h
new file mode 100644
index 00000000..d6c5ae8f
--- /dev/null
+++ b/libpipe/pipe.h
@@ -0,0 +1,381 @@
+/* Generic one-way pipes
+
+ Copyright (C) 1995, 1996 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#ifndef __PIPE_H__
+#define __PIPE_H__
+
+#define EWOULDBLOCK EAGAIN /* XXX */
+
+#include <cthreads.h> /* For conditions & mutexes */
+
+#include "pq.h"
+
+#ifndef PIPE_EI
+#define PIPE_EI extern inline
+#endif
+
+
+/* A description of a class of pipes and how to operate on them. */
+struct pipe_class
+{
+ /* What sort of socket this corresponds too. */
+ int sock_type;
+
+ /* Flags, from PIPE_CLASS_*, below. */
+ unsigned flags;
+
+ /* Operations: */
+ /* Read from PACKET into DATA &c, and set *DEQUEUE to true if PACKET should
+ be subsequently discarded. */
+ error_t (*read)(struct packet *packet, int *dequeue, unsigned *flags,
+ char **data, size_t *data_len, size_t amount);
+ /* Write DATA &c into the packet queue PQ. */
+ error_t (*write)(struct pq *pq, void *source,
+ char *data, size_t data_len, size_t *amount);
+};
+
+/* pipe_class flags */
+#define PIPE_CLASS_CONNECTIONLESS 0x1 /* A non-stream protocol. */
+
+/* Some pre-defined pipe_classes. */
+extern struct pipe_class *stream_pipe_class;
+extern struct pipe_class *dgram_pipe_class;
+extern struct pipe_class *seqpack_pipe_class;
+
+/* A unidirectional data pipe; it transfers data from READER to WRITER. */
+struct pipe
+{
+ /* What kind of pipe we are. */
+ struct pipe_class *class;
+
+ /* We use this to keep track of active threads using this pipe, so that
+ while a thread is waiting to read from a pipe and that pipe gets
+ deallocated (say by socket_shutdown), it doesn't actually go away until
+ the reader realizes what happened. It is normally frobbed using
+ pipe_acquire & pipe_release, which do locking as well.. */
+ unsigned readers, writers;
+
+ /* Various flags, from PIPE_* below. */
+ unsigned flags;
+
+ /* Various timestamps for this pipe. */
+ time_value_t read_time;
+ time_value_t write_time;
+
+ struct condition pending_reads;
+ struct condition pending_read_selects;
+
+ struct condition pending_writes;
+ struct condition pending_write_selects;
+
+ /* The maximum number of characters that this pipe will hold without
+ further writes blocking. */
+ size_t write_limit;
+
+ /* Write requests of less than this much are always done atomically. */
+ size_t write_atomic;
+
+ struct mutex lock;
+
+ /* A queue of incoming packets, of type either PACKET_TYPE_DATA or
+ PACKET_TYPE_CONTROL. Each data packet represents one datagram for
+ protocols that maintain record boundaries. Control packets always
+ represent the control information to be returned from one read
+ operation, and will be returned in conjuction with the following data
+ packet (if any). Reads interested only in data just skip control
+ packets until they find a data packet. */
+ struct pq *queue;
+};
+
+/* Pipe flags. */
+#define PIPE_BROKEN 0x1 /* This pipe isn't connected. */
+
+/* Returns the number of characters quickly readable from PIPE. If DATA_ONLY
+ is true, then `control' packets are ignored. */
+PIPE_EI size_t
+pipe_readable (struct pipe *pipe, int data_only)
+{
+ size_t readable = 0;
+ struct pq *pq = pipe->queue;
+ struct packet *packet = pq_head (pq, PACKET_TYPE_ANY, NULL);
+ while (packet)
+ {
+ if (packet->type == PACKET_TYPE_DATA)
+ readable += packet_readable (packet);
+ packet = packet->next;
+ }
+ return readable;
+}
+
+/* Returns true if there's any data available in PIPE. If DATA_ONLY is true,
+ then `control' packets are ignored. Note that this is different than
+ (pipe_readable (PIPE) > 0) in the case where a control packet containing
+ only ports is present. */
+PIPE_EI int
+pipe_is_readable (struct pipe *pipe, int data_only)
+{
+ struct pq *pq = pipe->queue;
+ struct packet *packet = pq_head (pq, PACKET_TYPE_ANY, NULL);
+ if (data_only)
+ while (packet && packet->type == PACKET_TYPE_CONTROL)
+ packet = packet->next;
+ return (packet != NULL);
+}
+
+/* Waits for PIPE to be readable, or an error to occurr. If NOBLOCK is true,
+ this operation will return EWOULDBLOCK instead of blocking when no data is
+ immediately available. If DATA_ONLY is true, then `control' packets are
+ ignored. */
+PIPE_EI error_t
+pipe_wait_readable (struct pipe *pipe, int noblock, int data_only)
+{
+ while (! pipe_is_readable (pipe, data_only) && ! (pipe->flags & PIPE_BROKEN))
+ {
+ if (noblock)
+ return EWOULDBLOCK;
+ if (hurd_condition_wait (&pipe->pending_reads, &pipe->lock))
+ return EINTR;
+ }
+ return 0;
+}
+
+/* Waits for PIPE to be readable, or an error to occurr. This call only
+ returns once threads waiting using pipe_wait_readable have been woken and
+ given a chance to read, and if there is still data available thereafter.
+ If DATA_ONLY is true, then `control' packets are ignored. */
+PIPE_EI error_t
+pipe_select_readable (struct pipe *pipe, int data_only)
+{
+ while (! pipe_is_readable (pipe, data_only) && ! (pipe->flags & PIPE_BROKEN))
+ if (hurd_condition_wait (&pipe->pending_read_selects, &pipe->lock))
+ return EINTR;
+ return 0;
+}
+
+/* Block until data can be written to PIPE. If NOBLOCK is true, then
+ EWOULDBLOCK is returned instead of blocking if this can't be done
+ immediately. */
+PIPE_EI error_t
+pipe_wait_writable (struct pipe *pipe, int noblock)
+{
+ size_t limit = pipe->write_limit;
+ if (pipe->flags & PIPE_BROKEN)
+ return EPIPE;
+ while (pipe_readable (pipe, 1) >= limit)
+ {
+ if (noblock)
+ return EWOULDBLOCK;
+ if (hurd_condition_wait (&pipe->pending_writes, &pipe->lock))
+ return EINTR;
+ if (pipe->flags & PIPE_BROKEN)
+ return EPIPE;
+ }
+ return 0;
+}
+
+/* Block until some data can be written to PIPE. This call only returns once
+ threads waiting using pipe_wait_writable have been woken and given a
+ chance to write, and if there is still space available thereafter. */
+PIPE_EI error_t
+pipe_select_writable (struct pipe *pipe)
+{
+ size_t limit = pipe->write_limit;
+ while (! (pipe->flags & PIPE_BROKEN) && pipe_readable (pipe, 1) >= limit)
+ if (hurd_condition_wait (&pipe->pending_writes, &pipe->lock))
+ return EINTR;
+ return 0;
+}
+
+/* Creates a new pipe of class CLASS and returns it in RESULT. */
+error_t pipe_create (struct pipe_class *class, struct pipe **pipe);
+
+/* Free PIPE and any resources it holds. */
+void pipe_free (struct pipe *pipe);
+
+/* Take any actions necessary when PIPE acquires its first reader. */
+void _pipe_first_reader (struct pipe *pipe);
+
+/* Take any actions necessary when PIPE acquires its first writer. */
+void _pipe_first_writer (struct pipe *pipe);
+
+/* Take any actions necessary when PIPE's last reader has gone away. PIPE
+ should be locked. */
+void _pipe_no_readers (struct pipe *pipe);
+
+/* Take any actions necessary when PIPE's last writer has gone away. PIPE
+ should be locked. */
+void _pipe_no_writers (struct pipe *pipe);
+
+/* Lock PIPE and increment its readers count. */
+PIPE_EI void
+pipe_acquire_reader (struct pipe *pipe)
+{
+ mutex_lock (&pipe->lock);
+ if (pipe->readers++ == 0)
+ _pipe_first_reader (pipe);
+}
+
+/* Lock PIPE and increment its writers count. */
+PIPE_EI void
+pipe_acquire_writer (struct pipe *pipe)
+{
+ mutex_lock (&pipe->lock);
+ if (pipe->writers++ == 0)
+ _pipe_first_writer (pipe);
+}
+
+/* Decrement PIPE's (which should be locked) reader count and unlock it. If
+ there are no more refs to PIPE, it will be destroyed. */
+PIPE_EI void
+pipe_release_reader (struct pipe *pipe)
+{
+ if (--pipe->readers == 0)
+ _pipe_no_readers (pipe);
+ else
+ mutex_unlock (&pipe->lock);
+}
+
+/* Decrement PIPE's (which should be locked) writer count and unlock it. If
+ there are no more refs to PIPE, it will be destroyed. */
+PIPE_EI void
+pipe_release_writer (struct pipe *pipe)
+{
+ if (--pipe->writers == 0)
+ _pipe_no_writers (pipe);
+ else
+ mutex_unlock (&pipe->lock);
+}
+
+/* Increment PIPE's reader count. PIPE should be unlocked. */
+PIPE_EI void
+pipe_add_reader (struct pipe *pipe)
+{
+ pipe_acquire_reader (pipe);
+ mutex_unlock (&pipe->lock);
+}
+
+/* Increment PIPE's writer count. PIPE should be unlocked. */
+PIPE_EI void
+pipe_add_writer (struct pipe *pipe)
+{
+ pipe_acquire_writer (pipe);
+ mutex_unlock (&pipe->lock);
+}
+
+/* Decrement PIPE's (which should be unlocked) reader count and unlock it. If
+ there are no more refs to PIPE, it will be destroyed. */
+PIPE_EI void
+pipe_remove_reader (struct pipe *pipe)
+{
+ mutex_lock (&pipe->lock);
+ pipe_release_reader (pipe);
+}
+
+/* Decrement PIPE's (which should be unlocked) writer count and unlock it. If
+ there are no more refs to PIPE, it will be destroyed. */
+PIPE_EI void
+pipe_remove_writer (struct pipe *pipe)
+{
+ mutex_lock (&pipe->lock);
+ pipe_release_writer (pipe);
+}
+
+/* Empty out PIPE of any data. PIPE should be locked. */
+PIPE_EI void
+pipe_drain (struct pipe *pipe)
+{
+ pq_drain (pipe->queue);
+}
+
+/* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and
+ returns the amount written in AMOUNT. If present, the information in
+ CONTROL & PORTS is written in a preceding control packet. If an error is
+ returned, nothing is done. If non-NULL, SOURCE is recorded as the source
+ of the data, to be provided to any readers of it; if no reader ever reads
+ it, it's deallocated by calling pipe_dealloc_addr. */
+error_t pipe_send (struct pipe *pipe, int noblock, void *source,
+ char *data, size_t data_len,
+ char *control, size_t control_len,
+ mach_port_t *ports, size_t num_ports,
+ size_t *amount);
+
+/* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and
+ returns the amount written in AMOUNT. If an error is returned, nothing is
+ done. If non-NULL, SOURCE is recorded as the source of the data, to be
+ provided to any readers of it; if no reader ever reads it, it's
+ deallocated by calling pipe_dealloc_addr. */
+#define pipe_write(pipe, noblock, source, data, data_len, amount) \
+ pipe_send (pipe, noblock, source, data, data_len, 0, 0, 0, 0, amount)
+
+/* Reads up to AMOUNT bytes from PIPE, which should be locked, into DATA, and
+ returns the amount read in DATA_LEN. If NOBLOCK is true, EWOULDBLOCK is
+ returned instead of block when no data is immediately available. If an
+ error is returned, nothing is done. If source isn't NULL, the
+ corresponding source provided by the sender is returned in it; this may be
+ NULL if it wasn't specified by the sender (which is usually the case with
+ connection-oriented protcols).
+
+ If there is control data waiting (before any data), then the behavior
+ depends on whether this is an `ordinary read' (when CONTROL and PORTS are
+ both NULL), in which case any control data is skipped, or a `msg read', in
+ which case the contents of the first control packet is returned (in
+ CONTROL and PORTS), as well as the first data packet following that (if
+ the control packet is followed by another control packet or no packet in
+ this case, a zero length data buffer is returned; the user should be
+ careful to distinguish this case from EOF (when no control or ports data
+ is returned either). */
+error_t pipe_recv (struct pipe *pipe, int noblock, unsigned *flags,
+ void **source,
+ char **data, size_t *data_len, size_t amount,
+ char **control, size_t *control_len,
+ mach_port_t **ports, size_t *num_ports);
+
+
+/* Reads up to AMOUNT bytes from PIPE, which should be locked, into DATA, and
+ returns the amount read in DATA_LEN. If NOBLOCK is true, EWOULDBLOCK is
+ returned instead of block when no data is immediately available. If an
+ error is returned, nothing is done. If source isn't NULL, the
+ corresponding source provided by the sender is returned in it; this may be
+ NULL if it wasn't specified by the sender (which is usually the case with
+ connection-oriented protcols). */
+#define pipe_read(pipe, noblock, source, data, data_len, amount) \
+ pipe_recv (pipe, noblock, 0, source, data, data_len, amount, 0,0,0,0)
+
+/* Hold this lock before attempting to lock multiple pipes. */
+extern struct mutex pipe_multiple_lock;
+
+/* Return when either RPIPE is available for reading (if SELECT_READ is set
+ in *SELECT_TYPE), or WPIPE is available for writing (if select_write is
+ set in *SELECT_TYPE). *SELECT_TYPE is modified to reflect which (or both)
+ is now available. DATA_ONLY should be true if only data packets should be
+ waited for on RPIPE. Neither RPIPE or WPIPE should be locked when calling
+ this function (unlike most pipe functions). */
+error_t pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
+ int *select_type, int data_only);
+
+/* ---------------------------------------------------------------- */
+/* User-provided functions. */
+
+/* This routine may be provided by the user, in which case, it should be a
+ function taking a non-NULL source address and deallocating it. It
+ defaults to calling ports_port_deref. */
+void pipe_dealloc_addr (void *addr);
+
+#endif /* __PIPE_H__ */
diff --git a/libpipe/pq-funcs.c b/libpipe/pq-funcs.c
new file mode 100644
index 00000000..2acecd08
--- /dev/null
+++ b/libpipe/pq-funcs.c
@@ -0,0 +1,2 @@
+#define PQ_EI
+#include "pq.h"
diff --git a/libpipe/pq.c b/libpipe/pq.c
new file mode 100644
index 00000000..07196000
--- /dev/null
+++ b/libpipe/pq.c
@@ -0,0 +1,434 @@
+/* Packet queues
+
+ Copyright (C) 1995, 1996, 1998, 1999 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#include <malloc.h>
+#include <string.h>
+#include <stddef.h>
+#include <sys/mman.h>
+
+#include "pq.h"
+
+/* ---------------------------------------------------------------- */
+
+/* Create a new packet queue, returning it in PQ. The only possible error is
+ ENOMEM. */
+error_t
+pq_create (struct pq **pq)
+{
+ *pq = malloc (sizeof (struct pq));
+
+ if (! *pq)
+ return ENOMEM;
+
+ (*pq)->head = (*pq)->tail = 0;
+ (*pq)->free = 0;
+
+ return 0;
+}
+
+/* Free every packet (and its contents) in the linked list rooted at HEAD. */
+static void
+free_packets (struct packet *head)
+{
+ if (head)
+ {
+ struct packet *next = head->next;
+ if (head->ports)
+ free (head->ports);
+ if (head->buf_len > 0)
+ {
+ if (head->buf_vm_alloced)
+ munmap (head->buf, head->buf_len);
+ else
+ free (head->buf);
+ }
+ free (head);
+ free_packets (next);
+ }
+}
+
+/* Frees PQ and any resources it holds, including deallocating any ports in
+ packets left in the queue. */
+void
+pq_free (struct pq *pq)
+{
+ pq_drain (pq);
+ free_packets (pq->free);
+ free (pq);
+}
+
+/* ---------------------------------------------------------------- */
+
+/* Remove the first packet (if any) in PQ, deallocating any resources it
+ holds. True is returned if a packet was found, false otherwise. */
+int
+pq_dequeue (struct pq *pq)
+{
+ extern void pipe_dealloc_addr (void *addr);
+ struct packet *packet = pq->head;
+
+ if (! packet)
+ return 0;
+
+ /* Deallocate any resource in PACKET. */
+ if (packet->num_ports)
+ packet_dealloc_ports (packet);
+ if (packet->source)
+ pipe_dealloc_addr (packet->source);
+
+ pq->head = packet->next;
+ packet->next = pq->free;
+ pq->free = packet;
+ if (pq->head)
+ pq->head->prev = 0;
+ else
+ pq->tail = 0;
+
+ return 1;
+}
+
+/* Empties out PQ. This *will* deallocate any ports in any of the packets. */
+void
+pq_drain (struct pq *pq)
+{
+ while (pq_dequeue (pq))
+ ;
+}
+
+/* Pushes a new packet of type TYPE and source SOURCE onto the tail of the
+ queue, and returns it, or 0 if there was an allocation error. */
+struct packet *
+pq_queue (struct pq *pq, unsigned type, void *source)
+{
+ struct packet *packet = pq->free;
+
+ if (!packet)
+ {
+ packet = malloc (sizeof (struct packet));
+ if (!packet)
+ return 0;
+ packet->buf = 0;
+ packet->buf_len = 0;
+ packet->ports = 0;
+ packet->num_ports = packet->ports_alloced = 0;
+ packet->buf_start = packet->buf_end = packet->buf;
+ packet->buf_vm_alloced = 0;
+ }
+ else
+ pq->free = packet->next;
+
+ packet->type = type;
+ packet->source = source;
+ packet->next = 0;
+ packet->prev = pq->tail;
+ if (pq->tail)
+ pq->tail->next = packet;
+ pq->tail = packet;
+ if (!pq->head)
+ pq->head = packet;
+
+ return packet;
+}
+
+/* ---------------------------------------------------------------- */
+
+/* Returns a legal size to which PACKET can be set allowing enough room for
+ EXTRA bytes more than what's already in it, and perhaps more. */
+size_t
+packet_new_size (struct packet *packet, size_t extra)
+{
+ size_t new_len = (packet->buf_end - packet->buf) + extra;
+ if (packet->buf_vm_alloced || new_len >= PACKET_SIZE_LARGE)
+ /* Round NEW_LEN up to a page boundary (OLD_LEN should already be). */
+ return round_page (new_len);
+ else
+ /* Otherwise, just round up to a multiple of 512 bytes. */
+ return (new_len + 511) & ~511;
+}
+
+/* Try to extend PACKET to be NEW_LEN bytes long, which should be greater
+ than the current packet size. This should be a valid length -- i.e., if
+ it's greater than PACKET_SIZE_LARGE, it should be a mulitple of
+ VM_PAGE_SIZE. If PACKET cannot be extended for some reason, false is
+ returned, otherwise true. */
+int
+packet_extend (struct packet *packet, size_t new_len)
+{
+ size_t old_len = packet->buf_len;
+
+ if (old_len == 0)
+ /* No existing buffer to extend. */
+ return 0;
+
+ if (packet->buf_vm_alloced)
+ /* A vm_alloc'd packet. */
+ {
+ char *extension = packet->buf + old_len;
+ /* Try to allocate memory at the end of our current buffer. */
+ if (vm_allocate (mach_task_self (),
+ (vm_address_t *)&extension, new_len - old_len, 0) != 0)
+ return 0;
+ }
+ else
+ /* A malloc'd packet. */
+ {
+ char *new_buf;
+ char *old_buf = packet->buf;
+
+ if (new_len >= PACKET_SIZE_LARGE)
+ /* The old packet length is malloc'd, but we want to vm_allocate the
+ new length, so we'd have to copy the old contents. */
+ return 0;
+
+ new_buf = realloc (old_buf, new_len);
+ if (! new_buf)
+ return 0;
+
+ packet->buf = new_buf;
+ packet->buf_start = new_buf + (packet->buf_start - old_buf);
+ packet->buf_end = new_buf + (packet->buf_end - old_buf);
+ }
+
+ packet->buf_len = new_len;
+
+ return 1;
+}
+
+/* Reallocate PACKET to have NEW_LEN bytes of buffer space, which should be
+ greater than the current packet size. This should be a valid length --
+ i.e., if it's greater than PACKET_SIZE_LARGE, it should be a multiple of
+ VM_PAGE_SIZE. If an error occurs, PACKET is not modified and the error is
+ returned. */
+error_t
+packet_realloc (struct packet *packet, size_t new_len)
+{
+ error_t err;
+ char *new_buf;
+ char *old_buf = packet->buf;
+ int vm_alloc = (new_len >= PACKET_SIZE_LARGE);
+
+ /* Make a new buffer. */
+ if (vm_alloc)
+ {
+ new_buf = mmap (0, new_len, PROT_READ|PROT_WRITE, MAP_ANON, 0, 0);
+ err = (new_buf == (char *) -1) ? errno : 0;
+ }
+ else
+ {
+ new_buf = malloc (new_len);
+ err = (new_buf ? 0 : ENOMEM);
+ }
+
+ if (! err)
+ {
+ size_t old_len = packet->buf_len;
+ char *start = packet->buf_start, *end = packet->buf_end;
+
+ /* Copy what we must. */
+ if (end != start)
+ /* If there was an operation like vm_move, we could use that in the
+ case where both the old and the new buffers were vm_alloced (on
+ the assumption that creating COW pages is somewhat more costly).
+ But there's not, and bcopy will do vm_copy where it can. Will we
+ still takes faults on the new copy, even though we've deallocated
+ the old one??? XXX */
+ bcopy (start, new_buf, end - start);
+
+ /* And get rid of the old buffer. */
+ if (old_len > 0)
+ {
+ if (packet->buf_vm_alloced)
+ vm_deallocate (mach_task_self (), (vm_address_t)old_buf, old_len);
+ else
+ free (old_buf);
+ }
+
+ packet->buf = new_buf;
+ packet->buf_len = new_len;
+ packet->buf_vm_alloced = vm_alloc;
+ packet->buf_start = new_buf;
+ packet->buf_end = new_buf + (end - start);
+ }
+
+ return err;
+}
+
+/* ---------------------------------------------------------------- */
+
+/* If PACKET has any ports, deallocates them. */
+void
+packet_dealloc_ports (struct packet *packet)
+{
+ unsigned i;
+ for (i = 0; i < packet->num_ports; i++)
+ {
+ mach_port_t port = packet->ports[i];
+ if (port != MACH_PORT_NULL)
+ mach_port_deallocate (mach_task_self (), port);
+ }
+}
+
+/* Sets PACKET's ports to be PORTS, of length NUM_PORTS. ENOMEM is returned
+ if a memory allocation error occurred, otherwise, 0. */
+error_t
+packet_set_ports (struct packet *packet,
+ mach_port_t *ports, size_t num_ports)
+{
+ if (packet->num_ports > 0)
+ packet_dealloc_ports (packet);
+ if (num_ports > packet->ports_alloced)
+ {
+ mach_port_t *new_ports = malloc (sizeof (mach_port_t *) * num_ports);
+ if (! new_ports)
+ return ENOMEM;
+ free (packet->ports);
+ packet->ports_alloced = num_ports;
+ }
+ bcopy (ports, packet->ports, sizeof (mach_port_t *) * num_ports);
+ packet->num_ports = num_ports;
+ return 0;
+}
+
+/* Returns any ports in PACKET in PORTS and NUM_PORTS, and removes them from
+ PACKET. */
+error_t
+packet_read_ports (struct packet *packet,
+ mach_port_t **ports, size_t *num_ports)
+{
+ int length = packet->num_ports * sizeof (mach_port_t *);
+ if (*num_ports < packet->num_ports)
+ {
+ *ports = mmap (0, length, PROT_READ|PROT_WRITE, MAP_ANON, 0, 0);
+ if (*ports == (mach_port_t *) -1)
+ return errno;
+ }
+ *num_ports = packet->num_ports;
+ bcopy (packet->ports, *ports, length);
+ packet->num_ports = 0;
+ return 0;
+}
+
+/* Append the bytes in DATA, of length DATA_LEN, to what's already in PACKET,
+ and return the amount appended in AMOUNT. */
+error_t
+packet_write (struct packet *packet,
+ char *data, size_t data_len, size_t *amount)
+{
+ error_t err = packet_ensure (packet, data_len);
+
+ if (err)
+ return err;
+
+ /* Add the new data. */
+ bcopy (data, packet->buf_end, data_len);
+ packet->buf_end += data_len;
+ *amount = data_len;
+
+ return 0;
+}
+
+/* Removes up to AMOUNT bytes from the beginning of the data in PACKET, and
+ puts it into *DATA, and the amount read into DATA_LEN. If more than the
+ original *DATA_LEN bytes are available, new memory is vm_allocated, and
+ the address and length of this array put into DATA and DATA_LEN. */
+error_t
+packet_read (struct packet *packet,
+ char **data, size_t *data_len, size_t amount)
+{
+ char *start = packet->buf_start;
+ char *end = packet->buf_end;
+
+ if (amount > end - start)
+ amount = end - start;
+
+ if (amount > 0)
+ {
+ char *buf = packet->buf;
+
+ if (packet->buf_vm_alloced && amount >= vm_page_size)
+ /* We can return memory from BUF directly without copying. */
+ {
+ if (buf + vm_page_size <= start)
+ /* BUF_START has been advanced past the start of the buffer
+ (perhaps by a series of small reads); as we're going to assume
+ everything before START is gone, make sure we deallocate any
+ memory on pages before those we return to the user. */
+ vm_deallocate (mach_task_self (),
+ (vm_address_t)buf,
+ trunc_page (start) - (vm_address_t)buf);
+
+ *data = start; /* Return the buffer directly. */
+ start += amount; /* Advance the read point. */
+
+ if (start < end)
+ /* Since returning a partial page actually means returning the
+ whole page, we have to be careful not to grab past the page
+ boundary before the end of the data we want. */
+ {
+ char *non_aligned_start = start;
+ start = (char *)trunc_page (start);
+ amount -= non_aligned_start - start;
+ }
+ else
+ /* This read will be up to the end of the buffer, so we can just
+ consume any space on the page following BUF_END (vm_alloced
+ buffers are always allocated in whole pages). */
+ {
+ start = (char *)round_page (start);
+ packet->buf_end = start; /* Ensure BUF_START <= BUF_END. */
+ }
+
+ /* We've actually consumed the memory at the start of BUF. */
+ packet->buf = start;
+ packet->buf_start = start;
+ packet->buf_len -= start - buf;
+ }
+ else
+ /* Just copy the data the old fashioned way.... */
+ {
+ if (*data_len < amount)
+ *data = mmap (0, amount, PROT_READ|PROT_WRITE, MAP_ANON, 0, 0);
+
+ bcopy (start, *data, amount);
+ start += amount;
+
+ if (start - buf > 2 * PACKET_SIZE_LARGE)
+ /* Get rid of unused space at the beginning of the buffer -- we
+ know it's vm_alloced because of the size, and this will allow
+ the buffer to just slide through memory. Because we wait for
+ a relatively large amount of free space before doing this, and
+ packet_write() would have gotten rid the free space if it
+ didn't require copying much data, it's unlikely that this will
+ happen if it would have been cheaper to just move the packet
+ contents around to make space for the next write. */
+ {
+ vm_size_t dealloc = trunc_page (start) - (vm_address_t)buf;
+ vm_deallocate (mach_task_self (), (vm_address_t)buf, dealloc);
+ packet->buf = buf + dealloc;
+ packet->buf_len -= dealloc;
+ }
+
+ packet->buf_start = start;
+ }
+ }
+ *data_len = amount;
+
+ return 0;
+}
diff --git a/libpipe/pq.h b/libpipe/pq.h
new file mode 100644
index 00000000..b98c8b19
--- /dev/null
+++ b/libpipe/pq.h
@@ -0,0 +1,260 @@
+/* Packet queues
+
+ Copyright (C) 1995, 1996 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#ifndef __PQ_H__
+#define __PQ_H__
+
+#include <errno.h>
+#include <stddef.h> /* for size_t */
+#include <string.h>
+#include <mach/mach.h>
+
+#ifndef PQ_EI
+#define PQ_EI extern inline
+#endif
+
+
+struct packet
+{
+ /* The packet type, from PACKET_* below. */
+ unsigned short type;
+
+ /* Where this packet was sent from. */
+ void *source;
+
+ /* Buffer space. */
+ char *buf;
+ size_t buf_len;
+ /* Pointers to the data within BUF. */
+ char *buf_start, *buf_end;
+ /* True if BUF was allocated using vm_allocate rather than malloc; only
+ valid if BUF_LEN > 0. */
+ int buf_vm_alloced;
+
+ /* Port data */
+ mach_port_t *ports;
+ size_t num_ports, ports_alloced;
+
+ /* Next and previous packets within the packet queue we're part of. If
+ PREV is null, we're at the head of the queue, and if NEXT is null, we're
+ at the tail. */
+ struct packet *next, *prev;
+};
+
+#define PACKET_TYPE_ANY 0 /* matches any type of packet */
+#define PACKET_TYPE_DATA 1
+#define PACKET_TYPE_CONTROL 2
+
+/* Sets PACKET's ports to be PORTS, of length NUM_PORTS. ENOMEM is returned
+ if a memory allocation error occurred, otherwise, 0. */
+error_t packet_set_ports (struct packet *packet,
+ mach_port_t *ports, size_t num_ports);
+
+/* If PACKET has any ports, deallocates them. */
+void packet_dealloc_ports (struct packet *packet);
+
+/* Returns the number of bytes of data in PACKET. */
+PQ_EI size_t
+packet_readable (struct packet *packet)
+{
+ return packet->buf_end - packet->buf_start;
+}
+
+/* Append the bytes in DATA, of length DATA_LEN, to what's already in PACKET,
+ and return the amount appended in AMOUNT. */
+error_t packet_write (struct packet *packet,
+ char *data, size_t data_len, size_t *amount);
+
+/* Removes up to AMOUNT bytes from the beginning of the data in PACKET, and
+ puts it into *DATA, and the amount read into DATA_LEN. If more than the
+ original *DATA_LEN bytes are available, new memory is vm_allocated, and
+ the address and length of this array put into DATA and DATA_LEN. */
+error_t packet_read (struct packet *packet,
+ char **data, size_t *data_len, size_t amount);
+
+/* Returns any ports in PACKET in PORTS and NUM_PORTS, and removes them from
+ PACKET. */
+error_t packet_read_ports (struct packet *packet,
+ mach_port_t **ports, size_t *num_ports);
+
+/* Return the source addressd in PACKET in SOURCE, deallocating it from
+ PACKET. */
+PQ_EI void
+packet_read_source (struct packet *packet, void **source)
+{
+ *source = packet->source;
+ packet->source = 0;
+}
+
+/* The packet size above which we start to do things differently to avoid
+ copying around data. */
+#define PACKET_SIZE_LARGE 8192
+
+/* Returns a legal size to which PACKET can be set allowing enough room for
+ EXTRA bytes more than what's already in it, and perhaps more. */
+size_t packet_new_size (struct packet *packet, size_t extra);
+
+/* Try to extend PACKET to be NEW_LEN bytes long, which should be greater
+ than the current packet size. This should be a valid length -- i.e., if
+ it's greater than PAGE_PACKET_SIZE, it should be a mulitple of
+ VM_PAGE_SIZE. If PACKET cannot be extended for some reason, false is
+ returned, otherwise true. */
+int packet_extend (struct packet *packet, size_t new_len);
+
+/* Reallocate PACKET to have NEW_LEN bytes of buffer space, which should be
+ greater than the current packet size. This should be a valid length --
+ i.e., if it's greater than PAGE_PACKET_SIZE, it should be a multiple of
+ VM_PAGE_SIZE. If an error occurs, PACKET is not modified and the error is
+ returned. */
+error_t packet_realloc (struct packet *packet, size_t new_len);
+
+/* Try to make space in PACKET for AMOUNT more bytes without growing the
+ buffer, returning true if we could do it. */
+PQ_EI int
+packet_fit (struct packet *packet, size_t amount)
+{
+ char *buf = packet->buf, *end = packet->buf_end;
+ size_t buf_len = packet->buf_len;
+ size_t left = buf + buf_len - end; /* Free space at the end of the buffer. */
+
+ if (amount > left)
+ {
+ char *start = packet->buf_start;
+ size_t cur_len = end - start; /* Amount of data currently in the buf. */
+
+ if (buf_len - cur_len >= amount
+ && cur_len < PACKET_SIZE_LARGE && cur_len < (buf_len >> 2))
+ /* If we could fit the data in by moving what's already in the
+ buffer, and there's not too much there, and it represents less
+ than 25% of the buffer size, then move the data instead of growing
+ the buffer. */
+ {
+ bcopy (start, buf, cur_len);
+ packet->buf_start = buf;
+ packet->buf_end = buf + cur_len;
+ }
+ else
+ return 0; /* We failed... */
+ }
+
+ return 1;
+}
+
+/* Make sure that PACKET has room for at least AMOUNT more bytes, or return
+ the reason why not. */
+PQ_EI error_t
+packet_ensure (struct packet *packet, size_t amount)
+{
+ if (! packet_fit (packet, amount))
+ /* We must make the buffer bigger. */
+ {
+ size_t new_len = packet_new_size (packet, amount);
+ if (! packet_extend (packet, new_len))
+ return packet_realloc (packet, new_len);
+ }
+ return 0;
+}
+
+/* Make sure that PACKET has room for at least AMOUNT more bytes, *only* if
+ it can be done efficiently, e.g., the packet can be grown in place, rather
+ than moving the contents (or there is little enough data so that copying
+ it is OK). True is returned if room was made, false otherwise. */
+PQ_EI int
+packet_ensure_efficiently (struct packet *packet, size_t amount)
+{
+ if (! packet_fit (packet, amount))
+ {
+ size_t new_len = packet_new_size (packet, amount);
+ if (packet_extend (packet, new_len))
+ return 1;
+ if ((packet->buf_end - packet->buf_start) < PACKET_SIZE_LARGE)
+ return packet_realloc (packet, new_len) == 0;
+ }
+ return 0;
+}
+
+struct pq
+{
+ struct packet *head, *tail; /* Packet queue */
+ struct packet *free; /* Free packets */
+};
+
+/* Pushes a new packet of type TYPE and source SOURCE, and returns it, or
+ NULL if there was an allocation error. SOURCE is returned to readers of
+ the packet, or deallocated by calling pipe_dealloc_addr. */
+struct packet *pq_queue (struct pq *pq, unsigned type, void *source);
+
+/* Returns the tail of the packet queue PQ, which may mean pushing a new
+ packet if TYPE and SOURCE do not match the current tail, or this is the
+ first packet. */
+PQ_EI struct packet *
+pq_tail (struct pq *pq, unsigned type, void *source)
+{
+ struct packet *tail = pq->tail;
+ if (!tail
+ || (type && tail->type != type) || (source && tail->source != source))
+ tail = pq_queue (pq, type, source);
+ return tail;
+}
+
+/* Remove the first packet (if any) in PQ, deallocating any resources it
+ holds. True is returned if a packet was found, false otherwise. */
+int pq_dequeue (struct pq *pq);
+
+/* Returns the next available packet in PQ, without removing it from the
+ queue, or NULL if there is none, or the next packet isn't appropiate.
+ A packet is inappropiate if SOURCE is non-NULL its source field doesn't
+ match it, or TYPE is non-NULL and the packet's type field doesn't match
+ it. */
+PQ_EI struct packet *
+pq_head (struct pq *pq, unsigned type, void *source)
+{
+ struct packet *head = pq->head;
+ if (!head)
+ return 0;
+ if (type && head->type != type)
+ return 0;
+ if (source && head->source != source)
+ return 0;
+ return head;
+}
+
+/* The same as pq_head, but first discards the head of the queue. */
+PQ_EI struct packet *
+pq_next (struct pq *pq, unsigned type, void *source)
+{
+ if (!pq->head)
+ return 0;
+ pq_dequeue (pq);
+ return pq_head (pq, type, source);
+}
+
+/* Dequeues all packets in PQ. */
+void pq_drain (struct pq *pq);
+
+/* Create a new packet queue, returning it in PQ. The only possible error is
+ ENOMEM. */
+error_t pq_create (struct pq **pq);
+
+/* Frees PQ and any resources it holds, including deallocating any ports in
+ packets left in the queue. */
+void pq_free (struct pq *pq);
+
+#endif /* __PQ_H__ */
diff --git a/libpipe/seqpack.c b/libpipe/seqpack.c
new file mode 100644
index 00000000..44a15a03
--- /dev/null
+++ b/libpipe/seqpack.c
@@ -0,0 +1,55 @@
+/* The SOCK_SEQPACKET pipe class
+
+ Copyright (C) 1995 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#include <sys/socket.h> /* For SOCK_SEQPACKET */
+
+#include "pipe.h"
+#include "pq.h"
+
+/* See the definition of struct pipe_class in "pipe.h" for documentation. */
+
+/* This type of pipe is the same as a SOCK_STREAM, but maintains record
+ boundaries. */
+
+static error_t
+seqpack_write (struct pq *pq, void *source,
+ char *data, size_t data_len, size_t *amount)
+{
+ struct packet *packet = pq_queue (pq, PACKET_TYPE_DATA, source);
+ if (!packet)
+ return ENOBUFS;
+ else
+ return packet_write (packet, data, data_len, amount);
+}
+
+static error_t
+seqpack_read (struct packet *packet, int *dequeue, unsigned *flags,
+ char **data, size_t *data_len, size_t amount)
+{
+ error_t err = packet_read (packet, data, data_len, amount);
+ *dequeue = (packet_readable (packet) == 0);
+ return err;
+}
+
+struct pipe_class _seqpack_pipe_class =
+{
+ SOCK_SEQPACKET, 0, seqpack_read, seqpack_write
+};
+struct pipe_class *seqpack_pipe_class = &_seqpack_pipe_class;
diff --git a/libpipe/stream.c b/libpipe/stream.c
new file mode 100644
index 00000000..8eb90435
--- /dev/null
+++ b/libpipe/stream.c
@@ -0,0 +1,68 @@
+/* The SOCK_STREAM pipe class
+
+ Copyright (C) 1995 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.ai.mit.edu>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#include <sys/socket.h> /* For SOCK_STREAM */
+
+#include "pipe.h"
+#include "pq.h"
+
+/* See the definition of struct pipe_class in "pipe.h" for an explanation. */
+
+/* This should be in some system header... XXX */
+static inline int page_aligned (vm_offset_t num)
+{
+ return trunc_page (num) == num;
+}
+
+static error_t
+stream_write (struct pq *pq, void *source,
+ char *data, size_t data_len, size_t *amount)
+{
+ struct packet *packet = pq_tail (pq, PACKET_TYPE_DATA, source);
+
+ if (packet_readable (packet) > 0
+ && data_len > PACKET_SIZE_LARGE
+ && (! page_aligned (data - packet->buf_end)
+ || ! packet_ensure_efficiently (packet, data_len)))
+ /* Put a large page-aligned transfer in its own packet, if it's
+ page-aligned `differently' than the end of the current packet, or if
+ the current packet can't be extended in place. */
+ packet = pq_queue (pq, PACKET_TYPE_DATA, source);
+
+ if (!packet)
+ return ENOBUFS;
+ else
+ return packet_write (packet, data, data_len, amount);
+}
+
+static error_t
+stream_read (struct packet *packet, int *dequeue, unsigned *flags,
+ char **data, size_t *data_len, size_t amount)
+{
+ error_t err = packet_read (packet, data, data_len, amount);
+ *dequeue = (packet_readable (packet) == 0);
+ return err;
+}
+
+struct pipe_class _stream_pipe_class =
+{
+ SOCK_STREAM, 0, stream_read, stream_write
+};
+struct pipe_class *stream_pipe_class = &_stream_pipe_class;