diff options
Diffstat (limited to 'libpipe')
-rw-r--r-- | libpipe/ChangeLog | 170 | ||||
-rw-r--r-- | libpipe/Makefile | 32 | ||||
-rw-r--r-- | libpipe/addr.c | 29 | ||||
-rw-r--r-- | libpipe/dgram.c | 51 | ||||
-rw-r--r-- | libpipe/pipe-funcs.c | 2 | ||||
-rw-r--r-- | libpipe/pipe.c | 416 | ||||
-rw-r--r-- | libpipe/pipe.h | 381 | ||||
-rw-r--r-- | libpipe/pq-funcs.c | 2 | ||||
-rw-r--r-- | libpipe/pq.c | 434 | ||||
-rw-r--r-- | libpipe/pq.h | 260 | ||||
-rw-r--r-- | libpipe/seqpack.c | 55 | ||||
-rw-r--r-- | libpipe/stream.c | 68 |
12 files changed, 1900 insertions, 0 deletions
diff --git a/libpipe/ChangeLog b/libpipe/ChangeLog new file mode 100644 index 00000000..e0dc3ea1 --- /dev/null +++ b/libpipe/ChangeLog @@ -0,0 +1,170 @@ +1999-07-10 Roland McGrath <roland@baalperazim.frob.com> + + * pq.c: Add #include <sys/mman.h> for munmap decl. + +1999-07-09 Thomas Bushnell, BSG <tb@mit.edu> + + * pq.c (packet_realloc): Use mmap instead of vm_allocate. + (packet_read_ports): Likewise. + (packet_read): Likewise. + +1999-07-03 Thomas Bushnell, BSG <tb@mit.edu> + + * pq.c (free_packets): Use munmap instead of vm_deallocate. + +1998-10-20 Roland McGrath <roland@baalperazim.frob.com> + + * pq.c (free_packets): Add braces to silence gcc warning. + (packet_realloc): Likewise. + * pipe.c (pipe_send): Likewise. + (pipe_recv): Likewise. + +Mon Oct 21 21:58:03 1996 Thomas Bushnell, n/BSG <thomas@gnu.ai.mit.edu> + + * pipe.h: Add extern inline protection. + * pq.h: Likewise. + * pipe-funcs.c, pq-funcs.c: New files. + * Makefile (SRCS): Add pipe-funcs.c and pq-funcs.c. + +Thu Sep 12 16:24:41 1996 Thomas Bushnell, n/BSG <thomas@gnu.ai.mit.edu> + + * Makefile (HURDLIBS): New variable. + (libpipe.so): Delete special dependency. + +Tue Jul 16 11:33:34 1996 Michael I. Bushnell, p/BSG <mib@gnu.ai.mit.edu> + + * pipe.h (EWOULDBLOCK): Define to work around new libc bug. + +Mon Jul 1 17:29:07 1996 Miles Bader <miles@gnu.ai.mit.edu> + + * pq.c (pq_queue): Initialize PACKET->buf_vm_alloced. + +Tue Jan 23 12:44:40 1996 Miles Bader <miles@gnu.ai.mit.edu> + + * pq.h (packet_ensure, packet_ensure_efficiently): Use packet_fit(). + (packet_fit): New function. + * pq.c (packet_read): If there's lots of empty space at the + beginning of a vm_alloced buffer, deallocate it. + +Mon Jan 22 17:12:39 1996 Miles Bader <miles@gnu.ai.mit.edu> + + * pq.c (packet_realloc): Reflect in the new values of BUF_START & + BUF_END that we've removed any empty space at the beginning of BUF. + +Sat Jan 13 13:56:13 1996 Miles Bader <miles@gnu.ai.mit.edu> + + * pq.h (packet_ensure, packet_ensure_efficiently): Use + packet_new_size() instead of packet_size_adjust(). + (packet_size_adjust): Declaration removed. + (packet_new_size): New declaration. + * pq.c (packet_size_adjust): Function removed. + (packet_new_size): New function. + + * pq.c (packet_read): Re-arrange to be slightly less confusing. + Reverse start-past-buf-beginning test that may have leaked memory. + + * pipe.c (pipe_send): For non-blocking writes, avoid writing more + than the user requested. + +Fri Jan 12 12:15:06 1996 Miles Bader <miles@gnu.ai.mit.edu> + + * pq.c (pq_queue): Initialize the ports_alloced field. + (packet_read): When a page-aligned read consumes the whole buffer, + but there's a non-page-multiple amount available, don't let + buf_len become negative. + +Mon Oct 9 14:57:48 1995 Roland McGrath <roland@churchy.gnu.ai.mit.edu> + + * Makefile: Specify shared library dependencies. + +Thu Sep 7 09:08:30 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu> + + * pq.c (packet_read): Don't leave PACKET in a fucked up state when + it's vm_allocate'd but doesn't a page-multiple amount of data and + we're reading everything. + + * pipe.c (_pipe_no_readers): REALLY wake up writers when the pipe + breaks. + +Fri Sep 1 10:42:03 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu> + + * pipe.c (_pipe_no_readers): Wake up write selects too when the + pipe breaks. + +Thu Aug 31 14:39:21 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu> + + * pipe.c (pipe_send): Make writes blockable. + (pipe_recv): Wakeup blocked writers. + (pipe_kick): Function deleted. + (pipe_create): Make connection-oriented pipes start out broken. + (_pipe_first_reader): New function. + (_pipe_first_writer): Don't check whether PIPE is connection- + oriented before clearing PIPE_BROKEN, as otherwise it will never + be set. + (pipe_pair_select): New function. + (pipe_multiple_lock): New variable. + * pipe.h (pipe_wait): Renamed to `pipe_wait_readable'. + (pipe_select): Renamed to `pipe_select_readable'. + (pipe_writable, pipe_wait_writable, pipe_select_writable): New funcs. + (pipe_acquire_reader): Call _pipe_first_reader if necessary. + (_pipe_first_reader): New declaration. + (struct pipe): New fields: `write_limit', `write_atomic', + `pending_writes', `pending_write_selects'. + (struct pipe): `pending_selects' changed to `pending_read_selects'. + (pipe_pair_select): New declaration. + +Tue Aug 29 14:37:49 1995 Miles Bader <miles@geech.gnu.ai.mit.edu> + + * pipe.c (pipe_send): Use condition_broadcast, not condition_signal. + * pipe.h (pipe_select): New function. + + * pipe.h (struct pipe): Remove interrupt_seq_num field. + (pipe_wait): Use hurd_condition_wait to detect interrupts instead + of previous ad-hoc mechanism. + + * pipe.c (pipe_create): Don't initialize interrupt_seq_num field. + + * pipe.h (pipe_acquire_reader, pipe_acquire_writer, + pipe_add_reader, pipe_add_writer): `aquire' -> `acquire'. + +Fri Aug 11 18:35:32 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu> + + * pipe.c (pipe_create): Initialize READERS & WRITERS, not REFS. + (_pipe_first_writer): New function. + (_pipe_no_writers, _pipe_no_writers): New function. + (pipe_break): Function deleted. + +Wed Aug 9 12:53:05 1995 Miles Bader <miles@geech.gnu.ai.mit.edu> + + * pipe.h (struct pipe): Keep separate ref counts for readers and + writers. + (pipe_aquire_reader, pipe_aquire_writer): New functions. + (pipe_release_reader, pipe_release_writer): New functions. + (pipe_add_reader, pipe_add_writer): New functions. + (pipe_remove_reader, pipe_remove_writer): New functions. + (_pipe_first_writer): New function decl. + (_pipe_no_writers, _pipe_no_writers): New function decl. + (pipe_aquire, pipe_release): Function deleted. + (pipe_break): Function decl deleted. + +Tue Aug 1 12:37:27 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu> + + * pipe.c (pipe_recv): Get rid of code to deal with getting the + source address from the control packet if there is no data packet, + since pipe_write always writes a data packet. + +Mon Jul 31 14:50:00 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu> + + * pipe.c (pipe_recv): Assert that a control packet should only + have a source address if there is no corresponding data packet. + (pipe_send): Change the test to determine whether we should write + a control packet, so that we only do so if we need to. Also, + don't record the source address in control packets, as it's + recorded in the following data packet anyway, and this prevents it + from being dealloc'd twice. + +Fri Jul 28 23:03:27 1995 Miles Bader <miles@churchy.gnu.ai.mit.edu> + + * pipe.h (stream_pipe_class, dgram_pipe_class, seqpack_pipe_class): + Make these declarations extern so they don't fuck up initialization. + (seqpacket_pipe_class): Renamed to `seqpack_pipe_class'. diff --git a/libpipe/Makefile b/libpipe/Makefile new file mode 100644 index 00000000..b64166a6 --- /dev/null +++ b/libpipe/Makefile @@ -0,0 +1,32 @@ +# Makefile for libpipe +# +# Copyright (C) 1995, 1996 Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +dir := libpipe +makemode := library + +libname = libpipe +installhdrs = pipe.h pq.h + +SRCS = pq.c dgram.c pipe.c stream.c seqpack.c addr.c pq-funcs.c pipe-funcs.c +LCLHDRS = pipe.h pq.h + +OBJS = $(SRCS:.c=.o) +HURDLIBS=threads ports + +include ../Makeconf + diff --git a/libpipe/addr.c b/libpipe/addr.c new file mode 100644 index 00000000..3af3ef8f --- /dev/null +++ b/libpipe/addr.c @@ -0,0 +1,29 @@ +/* Default address deallocate function + + Copyright (C) 1995 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#include <hurd/ports.h> + +/* This routine may be provided by the user, in which case, it should be a + function taking a non-NULL source address and deallocating it. It + defaults to calling ports_port_deref. */ +void pipe_dealloc_addr (void *addr) +{ + ports_port_deref (addr); +} diff --git a/libpipe/dgram.c b/libpipe/dgram.c new file mode 100644 index 00000000..3f3b2ab6 --- /dev/null +++ b/libpipe/dgram.c @@ -0,0 +1,51 @@ +/* The SOCK_DGRAM pipe class + + Copyright (C) 1995 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#include <sys/socket.h> /* For SOCK_DGRAM */ + +#include "pipe.h" +#include "pq.h" + +/* See the definition of struct pipe_class in "pipe.h" for documentation. */ + +static error_t +dgram_write (struct pq *pq, void *source, + char *data, size_t data_len, size_t *amount) +{ + struct packet *packet = pq_queue (pq, PACKET_TYPE_DATA, source); + if (!packet) + return ENOBUFS; + else + return packet_write (packet, data, data_len, amount); +} + +static error_t +dgram_read (struct packet *packet, int *dequeue, unsigned *flags, + char **data, size_t *data_len, size_t amount) +{ + *dequeue = 1; + return packet_read (packet, data, data_len, amount); +} + +struct pipe_class _dgram_pipe_class = +{ + SOCK_DGRAM, PIPE_CLASS_CONNECTIONLESS, dgram_read, dgram_write +}; +struct pipe_class *dgram_pipe_class = &_dgram_pipe_class; diff --git a/libpipe/pipe-funcs.c b/libpipe/pipe-funcs.c new file mode 100644 index 00000000..450180ee --- /dev/null +++ b/libpipe/pipe-funcs.c @@ -0,0 +1,2 @@ +#define PIPE_EI +#include "pipe.h" diff --git a/libpipe/pipe.c b/libpipe/pipe.c new file mode 100644 index 00000000..914816bc --- /dev/null +++ b/libpipe/pipe.c @@ -0,0 +1,416 @@ +/* Generic one-way pipes + + Copyright (C) 1995, 1998 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#include <string.h> /* For bzero() */ +#include <assert.h> + +#include <mach/time_value.h> +#include <mach/mach_host.h> + +#include <hurd/hurd_types.h> + +#include "pipe.h" + +static inline void +timestamp (time_value_t *stamp) +{ + host_get_time (mach_host_self (), stamp); +} + +/* Hold this lock before attempting to lock multiple pipes. */ +struct mutex pipe_multiple_lock = MUTEX_INITIALIZER; + +/* ---------------------------------------------------------------- */ + +#define pipe_is_connless(p) ((p)->class->flags & PIPE_CLASS_CONNECTIONLESS) + +/* Creates a new pipe of class CLASS and returns it in RESULT. */ +error_t +pipe_create (struct pipe_class *class, struct pipe **pipe) +{ + struct pipe *new = malloc (sizeof (struct pipe)); + + if (new == NULL) + return ENOMEM; + + new->readers = 0; + new->writers = 0; + new->flags = 0; + new->class = class; + new->write_limit = 16*1024; + new->write_atomic = 16*1024; + + bzero (&new->read_time, sizeof (new->read_time)); + bzero (&new->write_time, sizeof (new->write_time)); + + condition_init (&new->pending_reads); + condition_init (&new->pending_read_selects); + condition_init (&new->pending_writes); + condition_init (&new->pending_write_selects); + mutex_init (&new->lock); + + pq_create (&new->queue); + + if (! pipe_is_connless (new)) + new->flags |= PIPE_BROKEN; + + *pipe = new; + return 0; +} + +/* Free PIPE and any resources it holds. */ +void +pipe_free (struct pipe *pipe) +{ + pq_free (pipe->queue); + free (pipe); +} + +/* Take any actions necessary when PIPE acquires its first writer. */ +void _pipe_first_writer (struct pipe *pipe) +{ + if (pipe->readers > 0) + pipe->flags &= ~PIPE_BROKEN; +} + +/* Take any actions necessary when PIPE acquires its first reader. */ +void _pipe_first_reader (struct pipe *pipe) +{ + if (pipe->writers > 0) + pipe->flags &= ~PIPE_BROKEN; +} + +/* Take any actions necessary when PIPE's last reader has gone away. PIPE + should be locked. */ +void _pipe_no_readers (struct pipe *pipe) +{ + if (pipe->writers == 0) + pipe_free (pipe); + else + { + if (! pipe_is_connless (pipe)) + { + pipe->flags |= PIPE_BROKEN; + if (pipe->writers) + /* Wake up writers for the bad news... */ + { + condition_broadcast (&pipe->pending_writes); + condition_broadcast (&pipe->pending_write_selects); + } + } + mutex_unlock (&pipe->lock); + } +} + +/* Take any actions necessary when PIPE's last writer has gone away. PIPE + should be locked. */ +void _pipe_no_writers (struct pipe *pipe) +{ + if (pipe->readers == 0) + pipe_free (pipe); + else + { + if (! pipe_is_connless (pipe)) + { + pipe->flags |= PIPE_BROKEN; + if (pipe->readers) + /* Wake up readers for the bad news... */ + { + condition_broadcast (&pipe->pending_reads); + condition_broadcast (&pipe->pending_read_selects); + } + } + mutex_unlock (&pipe->lock); + } +} + +/* Return when either RPIPE is available for reading (if SELECT_READ is set + in *SELECT_TYPE), or WPIPE is available for writing (if select_write is + set in *SELECT_TYPE). *SELECT_TYPE is modified to reflect which (or both) + is now available. DATA_ONLY should be true if only data packets should be + waited for on RPIPE. Neither RPIPE or WPIPE should be locked when calling + this function (unlike most pipe functions). */ +error_t +pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe, + int *select_type, int data_only) +{ + error_t err = 0; + + *select_type &= SELECT_READ | SELECT_WRITE; + + if (*select_type == SELECT_READ) + { + mutex_lock (&rpipe->lock); + err = pipe_select_readable (rpipe, data_only); + mutex_unlock (&rpipe->lock); + } + else if (*select_type == SELECT_WRITE) + { + mutex_lock (&wpipe->lock); + err = pipe_select_writable (wpipe); + mutex_unlock (&wpipe->lock); + } + else + /* ugh */ + { + int rpipe_blocked, wpipe_blocked; + struct condition pending_read_write_select; + size_t wlimit = wpipe->write_limit; + struct mutex *lock = + (wpipe == rpipe ? &rpipe->lock : &pipe_multiple_lock); + + condition_init (&pending_read_write_select); + condition_implies (&rpipe->pending_read_selects, + &pending_read_write_select); + condition_implies (&wpipe->pending_write_selects, + &pending_read_write_select); + + mutex_lock (lock); + if (rpipe != wpipe) + { + mutex_lock (&rpipe->lock); + mutex_lock (&wpipe->lock); + } + + rpipe_blocked = + ! ((rpipe->flags & PIPE_BROKEN) || pipe_is_readable (rpipe, data_only)); + wpipe_blocked = + ! ((wpipe->flags & PIPE_BROKEN) || pipe_readable (wpipe, 1) < wlimit); + while (!err && rpipe_blocked && wpipe_blocked) + { + if (rpipe != wpipe) + { + mutex_unlock (&rpipe->lock); + mutex_unlock (&wpipe->lock); + } + if (hurd_condition_wait (&pending_read_write_select, lock)) + err = EINTR; + if (rpipe != wpipe) + { + mutex_lock (&rpipe->lock); + mutex_lock (&wpipe->lock); + } + rpipe_blocked = + ! ((rpipe->flags & PIPE_BROKEN) + || pipe_is_readable (rpipe, data_only)); + wpipe_blocked = + ! ((wpipe->flags & PIPE_BROKEN) + || pipe_readable (wpipe, 1) < wlimit); + } + + if (!err) + { + if (rpipe_blocked) + *select_type &= ~SELECT_READ; + if (wpipe_blocked) + *select_type &= ~SELECT_WRITE; + } + + if (rpipe != wpipe) + { + mutex_unlock (&rpipe->lock); + mutex_unlock (&wpipe->lock); + } + mutex_unlock (lock); + + condition_unimplies (&rpipe->pending_read_selects, + &pending_read_write_select); + condition_unimplies (&wpipe->pending_write_selects, + &pending_read_write_select); + } + + return err; +} + +/* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and + returns the amount written in AMOUNT. If present, the information in + CONTROL & PORTS is written in a preceding control packet. If an error is + returned, nothing is done. */ +error_t +pipe_send (struct pipe *pipe, int noblock, void *source, + char *data, size_t data_len, + char *control, size_t control_len, + mach_port_t *ports, size_t num_ports, + size_t *amount) +{ + error_t err = 0; + + err = pipe_wait_writable (pipe, noblock); + if (err) + return err; + + if (noblock) + { + size_t left = pipe->write_limit - pipe_readable (pipe, 1); + if (left < data_len) + { + if (data_len <= pipe->write_atomic) + return EWOULDBLOCK; + else + data_len = left; + } + } + + if (control_len > 0 || num_ports > 0) + /* Write a control packet. */ + { + /* Note that we don't record the source address in control packets, as + it's recorded in the following data packet anyway, and this prevents + it from being dealloc'd twice; this depends on the fact that we + always write a data packet. */ + struct packet *control_packet = + pq_queue (pipe->queue, PACKET_TYPE_CONTROL, NULL); + + if (control_packet == NULL) + err = ENOBUFS; + else + { + err = packet_write (control_packet, control, control_len, NULL); + if (!err) + err = packet_set_ports (control_packet, ports, num_ports); + if (err) + /* Trash CONTROL_PACKET somehow XXX */; + } + } + + if (!err) + err = (*pipe->class->write)(pipe->queue, source, data, data_len, amount); + + if (!err) + { + timestamp (&pipe->write_time); + + /* And wakeup anyone that might be interested in it. */ + condition_broadcast (&pipe->pending_reads); + mutex_unlock (&pipe->lock); + + mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */ + /* Only wakeup selects if there's still data available. */ + if (pipe_is_readable (pipe, 0)) + { + condition_broadcast (&pipe->pending_read_selects); + /* We leave PIPE locked here, assuming the caller will soon unlock + it and allow others access. */ + } + } + + return err; +} + +/* Reads up to AMOUNT bytes from PIPE, which should be locked, into DATA, and + returns the amount read in DATA_LEN. If NOBLOCK is true, EWOULDBLOCK is + returned instead of block when no data is immediately available. If an + error is returned, nothing is done. If source isn't NULL, the address of + the socket from which the data was sent is returned in it; this may be + NULL if it wasn't specified by the sender (which is usually the case with + connection-oriented protcols). + + If there is control data waiting (before any data), then the behavior + depends on whether this is an `ordinary read' (when CONTROL and PORTS are + both NULL), in which case any control data is skipped, or a `msg read', in + which case the contents of the first control packet is returned (in + CONTROL and PORTS), as well as the first data packet following that (if + the control packet is followed by another control packet or no packet in + this case, a zero length data buffer is returned; the user should be + careful to distinguish this case from EOF (when no control or ports data + is returned either). */ +error_t +pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source, + char **data, size_t *data_len, size_t amount, + char **control, size_t *control_len, + mach_port_t **ports, size_t *num_ports) +{ + error_t err; + struct packet *packet; + struct pq *pq = pipe->queue; + /* True if the user isn't asking for any `control' data. */ + int data_only = (control == NULL && ports == NULL); + + err = pipe_wait_readable (pipe, noblock, data_only); + if (err) + return err; + + packet = pq_head (pq, PACKET_TYPE_ANY, 0); + + if (data_only) + /* The user doesn't want to know about control info, so skip any... */ + while (packet && packet->type == PACKET_TYPE_CONTROL) + packet = pq_next (pq, PACKET_TYPE_ANY, 0); + else if (packet && packet->type == PACKET_TYPE_CONTROL) + /* Read this control packet first, before looking for a data packet. */ + { + if (control != NULL) + packet_read (packet, control, control_len, packet_readable (packet)); + if (ports != NULL) + /* Copy out the port rights being sent. */ + packet_read_ports (packet, ports, num_ports); + + packet = pq_next (pq, PACKET_TYPE_DATA, NULL); + assert (packet); /* pipe_write always writes a data packet. */ + } + else + /* No control data... */ + { + if (control_len) + *control_len = 0; + if (num_ports) + *num_ports = 0; + } + + if (!err) + { + if (packet) + /* Read some data (PACKET must be a data packet at this point). */ + { + int dq = 1; /* True if we should dequeue this packet. */ + + if (source) + packet_read_source (packet, source); + + err = (*pipe->class->read)(packet, &dq, flags, + data, data_len, amount); + if (dq) + pq_dequeue (pq); + } + else + /* Return EOF. */ + *data_len = 0; + } + + if (!err && packet) + { + timestamp (&pipe->read_time); + + /* And wakeup anyone that might be interested in it. */ + condition_broadcast (&pipe->pending_writes); + mutex_unlock (&pipe->lock); + + mutex_lock (&pipe->lock); /* Get back the lock on PIPE. */ + /* Only wakeup selects if there's still writing space available. */ + if (pipe_readable (pipe, 1) < pipe->write_limit) + { + condition_broadcast (&pipe->pending_write_selects); + /* We leave PIPE locked here, assuming the caller will soon unlock + it and allow others access. */ + } + } + + return err; +} diff --git a/libpipe/pipe.h b/libpipe/pipe.h new file mode 100644 index 00000000..d6c5ae8f --- /dev/null +++ b/libpipe/pipe.h @@ -0,0 +1,381 @@ +/* Generic one-way pipes + + Copyright (C) 1995, 1996 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#ifndef __PIPE_H__ +#define __PIPE_H__ + +#define EWOULDBLOCK EAGAIN /* XXX */ + +#include <cthreads.h> /* For conditions & mutexes */ + +#include "pq.h" + +#ifndef PIPE_EI +#define PIPE_EI extern inline +#endif + + +/* A description of a class of pipes and how to operate on them. */ +struct pipe_class +{ + /* What sort of socket this corresponds too. */ + int sock_type; + + /* Flags, from PIPE_CLASS_*, below. */ + unsigned flags; + + /* Operations: */ + /* Read from PACKET into DATA &c, and set *DEQUEUE to true if PACKET should + be subsequently discarded. */ + error_t (*read)(struct packet *packet, int *dequeue, unsigned *flags, + char **data, size_t *data_len, size_t amount); + /* Write DATA &c into the packet queue PQ. */ + error_t (*write)(struct pq *pq, void *source, + char *data, size_t data_len, size_t *amount); +}; + +/* pipe_class flags */ +#define PIPE_CLASS_CONNECTIONLESS 0x1 /* A non-stream protocol. */ + +/* Some pre-defined pipe_classes. */ +extern struct pipe_class *stream_pipe_class; +extern struct pipe_class *dgram_pipe_class; +extern struct pipe_class *seqpack_pipe_class; + +/* A unidirectional data pipe; it transfers data from READER to WRITER. */ +struct pipe +{ + /* What kind of pipe we are. */ + struct pipe_class *class; + + /* We use this to keep track of active threads using this pipe, so that + while a thread is waiting to read from a pipe and that pipe gets + deallocated (say by socket_shutdown), it doesn't actually go away until + the reader realizes what happened. It is normally frobbed using + pipe_acquire & pipe_release, which do locking as well.. */ + unsigned readers, writers; + + /* Various flags, from PIPE_* below. */ + unsigned flags; + + /* Various timestamps for this pipe. */ + time_value_t read_time; + time_value_t write_time; + + struct condition pending_reads; + struct condition pending_read_selects; + + struct condition pending_writes; + struct condition pending_write_selects; + + /* The maximum number of characters that this pipe will hold without + further writes blocking. */ + size_t write_limit; + + /* Write requests of less than this much are always done atomically. */ + size_t write_atomic; + + struct mutex lock; + + /* A queue of incoming packets, of type either PACKET_TYPE_DATA or + PACKET_TYPE_CONTROL. Each data packet represents one datagram for + protocols that maintain record boundaries. Control packets always + represent the control information to be returned from one read + operation, and will be returned in conjuction with the following data + packet (if any). Reads interested only in data just skip control + packets until they find a data packet. */ + struct pq *queue; +}; + +/* Pipe flags. */ +#define PIPE_BROKEN 0x1 /* This pipe isn't connected. */ + +/* Returns the number of characters quickly readable from PIPE. If DATA_ONLY + is true, then `control' packets are ignored. */ +PIPE_EI size_t +pipe_readable (struct pipe *pipe, int data_only) +{ + size_t readable = 0; + struct pq *pq = pipe->queue; + struct packet *packet = pq_head (pq, PACKET_TYPE_ANY, NULL); + while (packet) + { + if (packet->type == PACKET_TYPE_DATA) + readable += packet_readable (packet); + packet = packet->next; + } + return readable; +} + +/* Returns true if there's any data available in PIPE. If DATA_ONLY is true, + then `control' packets are ignored. Note that this is different than + (pipe_readable (PIPE) > 0) in the case where a control packet containing + only ports is present. */ +PIPE_EI int +pipe_is_readable (struct pipe *pipe, int data_only) +{ + struct pq *pq = pipe->queue; + struct packet *packet = pq_head (pq, PACKET_TYPE_ANY, NULL); + if (data_only) + while (packet && packet->type == PACKET_TYPE_CONTROL) + packet = packet->next; + return (packet != NULL); +} + +/* Waits for PIPE to be readable, or an error to occurr. If NOBLOCK is true, + this operation will return EWOULDBLOCK instead of blocking when no data is + immediately available. If DATA_ONLY is true, then `control' packets are + ignored. */ +PIPE_EI error_t +pipe_wait_readable (struct pipe *pipe, int noblock, int data_only) +{ + while (! pipe_is_readable (pipe, data_only) && ! (pipe->flags & PIPE_BROKEN)) + { + if (noblock) + return EWOULDBLOCK; + if (hurd_condition_wait (&pipe->pending_reads, &pipe->lock)) + return EINTR; + } + return 0; +} + +/* Waits for PIPE to be readable, or an error to occurr. This call only + returns once threads waiting using pipe_wait_readable have been woken and + given a chance to read, and if there is still data available thereafter. + If DATA_ONLY is true, then `control' packets are ignored. */ +PIPE_EI error_t +pipe_select_readable (struct pipe *pipe, int data_only) +{ + while (! pipe_is_readable (pipe, data_only) && ! (pipe->flags & PIPE_BROKEN)) + if (hurd_condition_wait (&pipe->pending_read_selects, &pipe->lock)) + return EINTR; + return 0; +} + +/* Block until data can be written to PIPE. If NOBLOCK is true, then + EWOULDBLOCK is returned instead of blocking if this can't be done + immediately. */ +PIPE_EI error_t +pipe_wait_writable (struct pipe *pipe, int noblock) +{ + size_t limit = pipe->write_limit; + if (pipe->flags & PIPE_BROKEN) + return EPIPE; + while (pipe_readable (pipe, 1) >= limit) + { + if (noblock) + return EWOULDBLOCK; + if (hurd_condition_wait (&pipe->pending_writes, &pipe->lock)) + return EINTR; + if (pipe->flags & PIPE_BROKEN) + return EPIPE; + } + return 0; +} + +/* Block until some data can be written to PIPE. This call only returns once + threads waiting using pipe_wait_writable have been woken and given a + chance to write, and if there is still space available thereafter. */ +PIPE_EI error_t +pipe_select_writable (struct pipe *pipe) +{ + size_t limit = pipe->write_limit; + while (! (pipe->flags & PIPE_BROKEN) && pipe_readable (pipe, 1) >= limit) + if (hurd_condition_wait (&pipe->pending_writes, &pipe->lock)) + return EINTR; + return 0; +} + +/* Creates a new pipe of class CLASS and returns it in RESULT. */ +error_t pipe_create (struct pipe_class *class, struct pipe **pipe); + +/* Free PIPE and any resources it holds. */ +void pipe_free (struct pipe *pipe); + +/* Take any actions necessary when PIPE acquires its first reader. */ +void _pipe_first_reader (struct pipe *pipe); + +/* Take any actions necessary when PIPE acquires its first writer. */ +void _pipe_first_writer (struct pipe *pipe); + +/* Take any actions necessary when PIPE's last reader has gone away. PIPE + should be locked. */ +void _pipe_no_readers (struct pipe *pipe); + +/* Take any actions necessary when PIPE's last writer has gone away. PIPE + should be locked. */ +void _pipe_no_writers (struct pipe *pipe); + +/* Lock PIPE and increment its readers count. */ +PIPE_EI void +pipe_acquire_reader (struct pipe *pipe) +{ + mutex_lock (&pipe->lock); + if (pipe->readers++ == 0) + _pipe_first_reader (pipe); +} + +/* Lock PIPE and increment its writers count. */ +PIPE_EI void +pipe_acquire_writer (struct pipe *pipe) +{ + mutex_lock (&pipe->lock); + if (pipe->writers++ == 0) + _pipe_first_writer (pipe); +} + +/* Decrement PIPE's (which should be locked) reader count and unlock it. If + there are no more refs to PIPE, it will be destroyed. */ +PIPE_EI void +pipe_release_reader (struct pipe *pipe) +{ + if (--pipe->readers == 0) + _pipe_no_readers (pipe); + else + mutex_unlock (&pipe->lock); +} + +/* Decrement PIPE's (which should be locked) writer count and unlock it. If + there are no more refs to PIPE, it will be destroyed. */ +PIPE_EI void +pipe_release_writer (struct pipe *pipe) +{ + if (--pipe->writers == 0) + _pipe_no_writers (pipe); + else + mutex_unlock (&pipe->lock); +} + +/* Increment PIPE's reader count. PIPE should be unlocked. */ +PIPE_EI void +pipe_add_reader (struct pipe *pipe) +{ + pipe_acquire_reader (pipe); + mutex_unlock (&pipe->lock); +} + +/* Increment PIPE's writer count. PIPE should be unlocked. */ +PIPE_EI void +pipe_add_writer (struct pipe *pipe) +{ + pipe_acquire_writer (pipe); + mutex_unlock (&pipe->lock); +} + +/* Decrement PIPE's (which should be unlocked) reader count and unlock it. If + there are no more refs to PIPE, it will be destroyed. */ +PIPE_EI void +pipe_remove_reader (struct pipe *pipe) +{ + mutex_lock (&pipe->lock); + pipe_release_reader (pipe); +} + +/* Decrement PIPE's (which should be unlocked) writer count and unlock it. If + there are no more refs to PIPE, it will be destroyed. */ +PIPE_EI void +pipe_remove_writer (struct pipe *pipe) +{ + mutex_lock (&pipe->lock); + pipe_release_writer (pipe); +} + +/* Empty out PIPE of any data. PIPE should be locked. */ +PIPE_EI void +pipe_drain (struct pipe *pipe) +{ + pq_drain (pipe->queue); +} + +/* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and + returns the amount written in AMOUNT. If present, the information in + CONTROL & PORTS is written in a preceding control packet. If an error is + returned, nothing is done. If non-NULL, SOURCE is recorded as the source + of the data, to be provided to any readers of it; if no reader ever reads + it, it's deallocated by calling pipe_dealloc_addr. */ +error_t pipe_send (struct pipe *pipe, int noblock, void *source, + char *data, size_t data_len, + char *control, size_t control_len, + mach_port_t *ports, size_t num_ports, + size_t *amount); + +/* Writes up to LEN bytes of DATA, to PIPE, which should be locked, and + returns the amount written in AMOUNT. If an error is returned, nothing is + done. If non-NULL, SOURCE is recorded as the source of the data, to be + provided to any readers of it; if no reader ever reads it, it's + deallocated by calling pipe_dealloc_addr. */ +#define pipe_write(pipe, noblock, source, data, data_len, amount) \ + pipe_send (pipe, noblock, source, data, data_len, 0, 0, 0, 0, amount) + +/* Reads up to AMOUNT bytes from PIPE, which should be locked, into DATA, and + returns the amount read in DATA_LEN. If NOBLOCK is true, EWOULDBLOCK is + returned instead of block when no data is immediately available. If an + error is returned, nothing is done. If source isn't NULL, the + corresponding source provided by the sender is returned in it; this may be + NULL if it wasn't specified by the sender (which is usually the case with + connection-oriented protcols). + + If there is control data waiting (before any data), then the behavior + depends on whether this is an `ordinary read' (when CONTROL and PORTS are + both NULL), in which case any control data is skipped, or a `msg read', in + which case the contents of the first control packet is returned (in + CONTROL and PORTS), as well as the first data packet following that (if + the control packet is followed by another control packet or no packet in + this case, a zero length data buffer is returned; the user should be + careful to distinguish this case from EOF (when no control or ports data + is returned either). */ +error_t pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, + void **source, + char **data, size_t *data_len, size_t amount, + char **control, size_t *control_len, + mach_port_t **ports, size_t *num_ports); + + +/* Reads up to AMOUNT bytes from PIPE, which should be locked, into DATA, and + returns the amount read in DATA_LEN. If NOBLOCK is true, EWOULDBLOCK is + returned instead of block when no data is immediately available. If an + error is returned, nothing is done. If source isn't NULL, the + corresponding source provided by the sender is returned in it; this may be + NULL if it wasn't specified by the sender (which is usually the case with + connection-oriented protcols). */ +#define pipe_read(pipe, noblock, source, data, data_len, amount) \ + pipe_recv (pipe, noblock, 0, source, data, data_len, amount, 0,0,0,0) + +/* Hold this lock before attempting to lock multiple pipes. */ +extern struct mutex pipe_multiple_lock; + +/* Return when either RPIPE is available for reading (if SELECT_READ is set + in *SELECT_TYPE), or WPIPE is available for writing (if select_write is + set in *SELECT_TYPE). *SELECT_TYPE is modified to reflect which (or both) + is now available. DATA_ONLY should be true if only data packets should be + waited for on RPIPE. Neither RPIPE or WPIPE should be locked when calling + this function (unlike most pipe functions). */ +error_t pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe, + int *select_type, int data_only); + +/* ---------------------------------------------------------------- */ +/* User-provided functions. */ + +/* This routine may be provided by the user, in which case, it should be a + function taking a non-NULL source address and deallocating it. It + defaults to calling ports_port_deref. */ +void pipe_dealloc_addr (void *addr); + +#endif /* __PIPE_H__ */ diff --git a/libpipe/pq-funcs.c b/libpipe/pq-funcs.c new file mode 100644 index 00000000..2acecd08 --- /dev/null +++ b/libpipe/pq-funcs.c @@ -0,0 +1,2 @@ +#define PQ_EI +#include "pq.h" diff --git a/libpipe/pq.c b/libpipe/pq.c new file mode 100644 index 00000000..07196000 --- /dev/null +++ b/libpipe/pq.c @@ -0,0 +1,434 @@ +/* Packet queues + + Copyright (C) 1995, 1996, 1998, 1999 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#include <malloc.h> +#include <string.h> +#include <stddef.h> +#include <sys/mman.h> + +#include "pq.h" + +/* ---------------------------------------------------------------- */ + +/* Create a new packet queue, returning it in PQ. The only possible error is + ENOMEM. */ +error_t +pq_create (struct pq **pq) +{ + *pq = malloc (sizeof (struct pq)); + + if (! *pq) + return ENOMEM; + + (*pq)->head = (*pq)->tail = 0; + (*pq)->free = 0; + + return 0; +} + +/* Free every packet (and its contents) in the linked list rooted at HEAD. */ +static void +free_packets (struct packet *head) +{ + if (head) + { + struct packet *next = head->next; + if (head->ports) + free (head->ports); + if (head->buf_len > 0) + { + if (head->buf_vm_alloced) + munmap (head->buf, head->buf_len); + else + free (head->buf); + } + free (head); + free_packets (next); + } +} + +/* Frees PQ and any resources it holds, including deallocating any ports in + packets left in the queue. */ +void +pq_free (struct pq *pq) +{ + pq_drain (pq); + free_packets (pq->free); + free (pq); +} + +/* ---------------------------------------------------------------- */ + +/* Remove the first packet (if any) in PQ, deallocating any resources it + holds. True is returned if a packet was found, false otherwise. */ +int +pq_dequeue (struct pq *pq) +{ + extern void pipe_dealloc_addr (void *addr); + struct packet *packet = pq->head; + + if (! packet) + return 0; + + /* Deallocate any resource in PACKET. */ + if (packet->num_ports) + packet_dealloc_ports (packet); + if (packet->source) + pipe_dealloc_addr (packet->source); + + pq->head = packet->next; + packet->next = pq->free; + pq->free = packet; + if (pq->head) + pq->head->prev = 0; + else + pq->tail = 0; + + return 1; +} + +/* Empties out PQ. This *will* deallocate any ports in any of the packets. */ +void +pq_drain (struct pq *pq) +{ + while (pq_dequeue (pq)) + ; +} + +/* Pushes a new packet of type TYPE and source SOURCE onto the tail of the + queue, and returns it, or 0 if there was an allocation error. */ +struct packet * +pq_queue (struct pq *pq, unsigned type, void *source) +{ + struct packet *packet = pq->free; + + if (!packet) + { + packet = malloc (sizeof (struct packet)); + if (!packet) + return 0; + packet->buf = 0; + packet->buf_len = 0; + packet->ports = 0; + packet->num_ports = packet->ports_alloced = 0; + packet->buf_start = packet->buf_end = packet->buf; + packet->buf_vm_alloced = 0; + } + else + pq->free = packet->next; + + packet->type = type; + packet->source = source; + packet->next = 0; + packet->prev = pq->tail; + if (pq->tail) + pq->tail->next = packet; + pq->tail = packet; + if (!pq->head) + pq->head = packet; + + return packet; +} + +/* ---------------------------------------------------------------- */ + +/* Returns a legal size to which PACKET can be set allowing enough room for + EXTRA bytes more than what's already in it, and perhaps more. */ +size_t +packet_new_size (struct packet *packet, size_t extra) +{ + size_t new_len = (packet->buf_end - packet->buf) + extra; + if (packet->buf_vm_alloced || new_len >= PACKET_SIZE_LARGE) + /* Round NEW_LEN up to a page boundary (OLD_LEN should already be). */ + return round_page (new_len); + else + /* Otherwise, just round up to a multiple of 512 bytes. */ + return (new_len + 511) & ~511; +} + +/* Try to extend PACKET to be NEW_LEN bytes long, which should be greater + than the current packet size. This should be a valid length -- i.e., if + it's greater than PACKET_SIZE_LARGE, it should be a mulitple of + VM_PAGE_SIZE. If PACKET cannot be extended for some reason, false is + returned, otherwise true. */ +int +packet_extend (struct packet *packet, size_t new_len) +{ + size_t old_len = packet->buf_len; + + if (old_len == 0) + /* No existing buffer to extend. */ + return 0; + + if (packet->buf_vm_alloced) + /* A vm_alloc'd packet. */ + { + char *extension = packet->buf + old_len; + /* Try to allocate memory at the end of our current buffer. */ + if (vm_allocate (mach_task_self (), + (vm_address_t *)&extension, new_len - old_len, 0) != 0) + return 0; + } + else + /* A malloc'd packet. */ + { + char *new_buf; + char *old_buf = packet->buf; + + if (new_len >= PACKET_SIZE_LARGE) + /* The old packet length is malloc'd, but we want to vm_allocate the + new length, so we'd have to copy the old contents. */ + return 0; + + new_buf = realloc (old_buf, new_len); + if (! new_buf) + return 0; + + packet->buf = new_buf; + packet->buf_start = new_buf + (packet->buf_start - old_buf); + packet->buf_end = new_buf + (packet->buf_end - old_buf); + } + + packet->buf_len = new_len; + + return 1; +} + +/* Reallocate PACKET to have NEW_LEN bytes of buffer space, which should be + greater than the current packet size. This should be a valid length -- + i.e., if it's greater than PACKET_SIZE_LARGE, it should be a multiple of + VM_PAGE_SIZE. If an error occurs, PACKET is not modified and the error is + returned. */ +error_t +packet_realloc (struct packet *packet, size_t new_len) +{ + error_t err; + char *new_buf; + char *old_buf = packet->buf; + int vm_alloc = (new_len >= PACKET_SIZE_LARGE); + + /* Make a new buffer. */ + if (vm_alloc) + { + new_buf = mmap (0, new_len, PROT_READ|PROT_WRITE, MAP_ANON, 0, 0); + err = (new_buf == (char *) -1) ? errno : 0; + } + else + { + new_buf = malloc (new_len); + err = (new_buf ? 0 : ENOMEM); + } + + if (! err) + { + size_t old_len = packet->buf_len; + char *start = packet->buf_start, *end = packet->buf_end; + + /* Copy what we must. */ + if (end != start) + /* If there was an operation like vm_move, we could use that in the + case where both the old and the new buffers were vm_alloced (on + the assumption that creating COW pages is somewhat more costly). + But there's not, and bcopy will do vm_copy where it can. Will we + still takes faults on the new copy, even though we've deallocated + the old one??? XXX */ + bcopy (start, new_buf, end - start); + + /* And get rid of the old buffer. */ + if (old_len > 0) + { + if (packet->buf_vm_alloced) + vm_deallocate (mach_task_self (), (vm_address_t)old_buf, old_len); + else + free (old_buf); + } + + packet->buf = new_buf; + packet->buf_len = new_len; + packet->buf_vm_alloced = vm_alloc; + packet->buf_start = new_buf; + packet->buf_end = new_buf + (end - start); + } + + return err; +} + +/* ---------------------------------------------------------------- */ + +/* If PACKET has any ports, deallocates them. */ +void +packet_dealloc_ports (struct packet *packet) +{ + unsigned i; + for (i = 0; i < packet->num_ports; i++) + { + mach_port_t port = packet->ports[i]; + if (port != MACH_PORT_NULL) + mach_port_deallocate (mach_task_self (), port); + } +} + +/* Sets PACKET's ports to be PORTS, of length NUM_PORTS. ENOMEM is returned + if a memory allocation error occurred, otherwise, 0. */ +error_t +packet_set_ports (struct packet *packet, + mach_port_t *ports, size_t num_ports) +{ + if (packet->num_ports > 0) + packet_dealloc_ports (packet); + if (num_ports > packet->ports_alloced) + { + mach_port_t *new_ports = malloc (sizeof (mach_port_t *) * num_ports); + if (! new_ports) + return ENOMEM; + free (packet->ports); + packet->ports_alloced = num_ports; + } + bcopy (ports, packet->ports, sizeof (mach_port_t *) * num_ports); + packet->num_ports = num_ports; + return 0; +} + +/* Returns any ports in PACKET in PORTS and NUM_PORTS, and removes them from + PACKET. */ +error_t +packet_read_ports (struct packet *packet, + mach_port_t **ports, size_t *num_ports) +{ + int length = packet->num_ports * sizeof (mach_port_t *); + if (*num_ports < packet->num_ports) + { + *ports = mmap (0, length, PROT_READ|PROT_WRITE, MAP_ANON, 0, 0); + if (*ports == (mach_port_t *) -1) + return errno; + } + *num_ports = packet->num_ports; + bcopy (packet->ports, *ports, length); + packet->num_ports = 0; + return 0; +} + +/* Append the bytes in DATA, of length DATA_LEN, to what's already in PACKET, + and return the amount appended in AMOUNT. */ +error_t +packet_write (struct packet *packet, + char *data, size_t data_len, size_t *amount) +{ + error_t err = packet_ensure (packet, data_len); + + if (err) + return err; + + /* Add the new data. */ + bcopy (data, packet->buf_end, data_len); + packet->buf_end += data_len; + *amount = data_len; + + return 0; +} + +/* Removes up to AMOUNT bytes from the beginning of the data in PACKET, and + puts it into *DATA, and the amount read into DATA_LEN. If more than the + original *DATA_LEN bytes are available, new memory is vm_allocated, and + the address and length of this array put into DATA and DATA_LEN. */ +error_t +packet_read (struct packet *packet, + char **data, size_t *data_len, size_t amount) +{ + char *start = packet->buf_start; + char *end = packet->buf_end; + + if (amount > end - start) + amount = end - start; + + if (amount > 0) + { + char *buf = packet->buf; + + if (packet->buf_vm_alloced && amount >= vm_page_size) + /* We can return memory from BUF directly without copying. */ + { + if (buf + vm_page_size <= start) + /* BUF_START has been advanced past the start of the buffer + (perhaps by a series of small reads); as we're going to assume + everything before START is gone, make sure we deallocate any + memory on pages before those we return to the user. */ + vm_deallocate (mach_task_self (), + (vm_address_t)buf, + trunc_page (start) - (vm_address_t)buf); + + *data = start; /* Return the buffer directly. */ + start += amount; /* Advance the read point. */ + + if (start < end) + /* Since returning a partial page actually means returning the + whole page, we have to be careful not to grab past the page + boundary before the end of the data we want. */ + { + char *non_aligned_start = start; + start = (char *)trunc_page (start); + amount -= non_aligned_start - start; + } + else + /* This read will be up to the end of the buffer, so we can just + consume any space on the page following BUF_END (vm_alloced + buffers are always allocated in whole pages). */ + { + start = (char *)round_page (start); + packet->buf_end = start; /* Ensure BUF_START <= BUF_END. */ + } + + /* We've actually consumed the memory at the start of BUF. */ + packet->buf = start; + packet->buf_start = start; + packet->buf_len -= start - buf; + } + else + /* Just copy the data the old fashioned way.... */ + { + if (*data_len < amount) + *data = mmap (0, amount, PROT_READ|PROT_WRITE, MAP_ANON, 0, 0); + + bcopy (start, *data, amount); + start += amount; + + if (start - buf > 2 * PACKET_SIZE_LARGE) + /* Get rid of unused space at the beginning of the buffer -- we + know it's vm_alloced because of the size, and this will allow + the buffer to just slide through memory. Because we wait for + a relatively large amount of free space before doing this, and + packet_write() would have gotten rid the free space if it + didn't require copying much data, it's unlikely that this will + happen if it would have been cheaper to just move the packet + contents around to make space for the next write. */ + { + vm_size_t dealloc = trunc_page (start) - (vm_address_t)buf; + vm_deallocate (mach_task_self (), (vm_address_t)buf, dealloc); + packet->buf = buf + dealloc; + packet->buf_len -= dealloc; + } + + packet->buf_start = start; + } + } + *data_len = amount; + + return 0; +} diff --git a/libpipe/pq.h b/libpipe/pq.h new file mode 100644 index 00000000..b98c8b19 --- /dev/null +++ b/libpipe/pq.h @@ -0,0 +1,260 @@ +/* Packet queues + + Copyright (C) 1995, 1996 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#ifndef __PQ_H__ +#define __PQ_H__ + +#include <errno.h> +#include <stddef.h> /* for size_t */ +#include <string.h> +#include <mach/mach.h> + +#ifndef PQ_EI +#define PQ_EI extern inline +#endif + + +struct packet +{ + /* The packet type, from PACKET_* below. */ + unsigned short type; + + /* Where this packet was sent from. */ + void *source; + + /* Buffer space. */ + char *buf; + size_t buf_len; + /* Pointers to the data within BUF. */ + char *buf_start, *buf_end; + /* True if BUF was allocated using vm_allocate rather than malloc; only + valid if BUF_LEN > 0. */ + int buf_vm_alloced; + + /* Port data */ + mach_port_t *ports; + size_t num_ports, ports_alloced; + + /* Next and previous packets within the packet queue we're part of. If + PREV is null, we're at the head of the queue, and if NEXT is null, we're + at the tail. */ + struct packet *next, *prev; +}; + +#define PACKET_TYPE_ANY 0 /* matches any type of packet */ +#define PACKET_TYPE_DATA 1 +#define PACKET_TYPE_CONTROL 2 + +/* Sets PACKET's ports to be PORTS, of length NUM_PORTS. ENOMEM is returned + if a memory allocation error occurred, otherwise, 0. */ +error_t packet_set_ports (struct packet *packet, + mach_port_t *ports, size_t num_ports); + +/* If PACKET has any ports, deallocates them. */ +void packet_dealloc_ports (struct packet *packet); + +/* Returns the number of bytes of data in PACKET. */ +PQ_EI size_t +packet_readable (struct packet *packet) +{ + return packet->buf_end - packet->buf_start; +} + +/* Append the bytes in DATA, of length DATA_LEN, to what's already in PACKET, + and return the amount appended in AMOUNT. */ +error_t packet_write (struct packet *packet, + char *data, size_t data_len, size_t *amount); + +/* Removes up to AMOUNT bytes from the beginning of the data in PACKET, and + puts it into *DATA, and the amount read into DATA_LEN. If more than the + original *DATA_LEN bytes are available, new memory is vm_allocated, and + the address and length of this array put into DATA and DATA_LEN. */ +error_t packet_read (struct packet *packet, + char **data, size_t *data_len, size_t amount); + +/* Returns any ports in PACKET in PORTS and NUM_PORTS, and removes them from + PACKET. */ +error_t packet_read_ports (struct packet *packet, + mach_port_t **ports, size_t *num_ports); + +/* Return the source addressd in PACKET in SOURCE, deallocating it from + PACKET. */ +PQ_EI void +packet_read_source (struct packet *packet, void **source) +{ + *source = packet->source; + packet->source = 0; +} + +/* The packet size above which we start to do things differently to avoid + copying around data. */ +#define PACKET_SIZE_LARGE 8192 + +/* Returns a legal size to which PACKET can be set allowing enough room for + EXTRA bytes more than what's already in it, and perhaps more. */ +size_t packet_new_size (struct packet *packet, size_t extra); + +/* Try to extend PACKET to be NEW_LEN bytes long, which should be greater + than the current packet size. This should be a valid length -- i.e., if + it's greater than PAGE_PACKET_SIZE, it should be a mulitple of + VM_PAGE_SIZE. If PACKET cannot be extended for some reason, false is + returned, otherwise true. */ +int packet_extend (struct packet *packet, size_t new_len); + +/* Reallocate PACKET to have NEW_LEN bytes of buffer space, which should be + greater than the current packet size. This should be a valid length -- + i.e., if it's greater than PAGE_PACKET_SIZE, it should be a multiple of + VM_PAGE_SIZE. If an error occurs, PACKET is not modified and the error is + returned. */ +error_t packet_realloc (struct packet *packet, size_t new_len); + +/* Try to make space in PACKET for AMOUNT more bytes without growing the + buffer, returning true if we could do it. */ +PQ_EI int +packet_fit (struct packet *packet, size_t amount) +{ + char *buf = packet->buf, *end = packet->buf_end; + size_t buf_len = packet->buf_len; + size_t left = buf + buf_len - end; /* Free space at the end of the buffer. */ + + if (amount > left) + { + char *start = packet->buf_start; + size_t cur_len = end - start; /* Amount of data currently in the buf. */ + + if (buf_len - cur_len >= amount + && cur_len < PACKET_SIZE_LARGE && cur_len < (buf_len >> 2)) + /* If we could fit the data in by moving what's already in the + buffer, and there's not too much there, and it represents less + than 25% of the buffer size, then move the data instead of growing + the buffer. */ + { + bcopy (start, buf, cur_len); + packet->buf_start = buf; + packet->buf_end = buf + cur_len; + } + else + return 0; /* We failed... */ + } + + return 1; +} + +/* Make sure that PACKET has room for at least AMOUNT more bytes, or return + the reason why not. */ +PQ_EI error_t +packet_ensure (struct packet *packet, size_t amount) +{ + if (! packet_fit (packet, amount)) + /* We must make the buffer bigger. */ + { + size_t new_len = packet_new_size (packet, amount); + if (! packet_extend (packet, new_len)) + return packet_realloc (packet, new_len); + } + return 0; +} + +/* Make sure that PACKET has room for at least AMOUNT more bytes, *only* if + it can be done efficiently, e.g., the packet can be grown in place, rather + than moving the contents (or there is little enough data so that copying + it is OK). True is returned if room was made, false otherwise. */ +PQ_EI int +packet_ensure_efficiently (struct packet *packet, size_t amount) +{ + if (! packet_fit (packet, amount)) + { + size_t new_len = packet_new_size (packet, amount); + if (packet_extend (packet, new_len)) + return 1; + if ((packet->buf_end - packet->buf_start) < PACKET_SIZE_LARGE) + return packet_realloc (packet, new_len) == 0; + } + return 0; +} + +struct pq +{ + struct packet *head, *tail; /* Packet queue */ + struct packet *free; /* Free packets */ +}; + +/* Pushes a new packet of type TYPE and source SOURCE, and returns it, or + NULL if there was an allocation error. SOURCE is returned to readers of + the packet, or deallocated by calling pipe_dealloc_addr. */ +struct packet *pq_queue (struct pq *pq, unsigned type, void *source); + +/* Returns the tail of the packet queue PQ, which may mean pushing a new + packet if TYPE and SOURCE do not match the current tail, or this is the + first packet. */ +PQ_EI struct packet * +pq_tail (struct pq *pq, unsigned type, void *source) +{ + struct packet *tail = pq->tail; + if (!tail + || (type && tail->type != type) || (source && tail->source != source)) + tail = pq_queue (pq, type, source); + return tail; +} + +/* Remove the first packet (if any) in PQ, deallocating any resources it + holds. True is returned if a packet was found, false otherwise. */ +int pq_dequeue (struct pq *pq); + +/* Returns the next available packet in PQ, without removing it from the + queue, or NULL if there is none, or the next packet isn't appropiate. + A packet is inappropiate if SOURCE is non-NULL its source field doesn't + match it, or TYPE is non-NULL and the packet's type field doesn't match + it. */ +PQ_EI struct packet * +pq_head (struct pq *pq, unsigned type, void *source) +{ + struct packet *head = pq->head; + if (!head) + return 0; + if (type && head->type != type) + return 0; + if (source && head->source != source) + return 0; + return head; +} + +/* The same as pq_head, but first discards the head of the queue. */ +PQ_EI struct packet * +pq_next (struct pq *pq, unsigned type, void *source) +{ + if (!pq->head) + return 0; + pq_dequeue (pq); + return pq_head (pq, type, source); +} + +/* Dequeues all packets in PQ. */ +void pq_drain (struct pq *pq); + +/* Create a new packet queue, returning it in PQ. The only possible error is + ENOMEM. */ +error_t pq_create (struct pq **pq); + +/* Frees PQ and any resources it holds, including deallocating any ports in + packets left in the queue. */ +void pq_free (struct pq *pq); + +#endif /* __PQ_H__ */ diff --git a/libpipe/seqpack.c b/libpipe/seqpack.c new file mode 100644 index 00000000..44a15a03 --- /dev/null +++ b/libpipe/seqpack.c @@ -0,0 +1,55 @@ +/* The SOCK_SEQPACKET pipe class + + Copyright (C) 1995 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#include <sys/socket.h> /* For SOCK_SEQPACKET */ + +#include "pipe.h" +#include "pq.h" + +/* See the definition of struct pipe_class in "pipe.h" for documentation. */ + +/* This type of pipe is the same as a SOCK_STREAM, but maintains record + boundaries. */ + +static error_t +seqpack_write (struct pq *pq, void *source, + char *data, size_t data_len, size_t *amount) +{ + struct packet *packet = pq_queue (pq, PACKET_TYPE_DATA, source); + if (!packet) + return ENOBUFS; + else + return packet_write (packet, data, data_len, amount); +} + +static error_t +seqpack_read (struct packet *packet, int *dequeue, unsigned *flags, + char **data, size_t *data_len, size_t amount) +{ + error_t err = packet_read (packet, data, data_len, amount); + *dequeue = (packet_readable (packet) == 0); + return err; +} + +struct pipe_class _seqpack_pipe_class = +{ + SOCK_SEQPACKET, 0, seqpack_read, seqpack_write +}; +struct pipe_class *seqpack_pipe_class = &_seqpack_pipe_class; diff --git a/libpipe/stream.c b/libpipe/stream.c new file mode 100644 index 00000000..8eb90435 --- /dev/null +++ b/libpipe/stream.c @@ -0,0 +1,68 @@ +/* The SOCK_STREAM pipe class + + Copyright (C) 1995 Free Software Foundation, Inc. + + Written by Miles Bader <miles@gnu.ai.mit.edu> + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2, or (at + your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ + +#include <sys/socket.h> /* For SOCK_STREAM */ + +#include "pipe.h" +#include "pq.h" + +/* See the definition of struct pipe_class in "pipe.h" for an explanation. */ + +/* This should be in some system header... XXX */ +static inline int page_aligned (vm_offset_t num) +{ + return trunc_page (num) == num; +} + +static error_t +stream_write (struct pq *pq, void *source, + char *data, size_t data_len, size_t *amount) +{ + struct packet *packet = pq_tail (pq, PACKET_TYPE_DATA, source); + + if (packet_readable (packet) > 0 + && data_len > PACKET_SIZE_LARGE + && (! page_aligned (data - packet->buf_end) + || ! packet_ensure_efficiently (packet, data_len))) + /* Put a large page-aligned transfer in its own packet, if it's + page-aligned `differently' than the end of the current packet, or if + the current packet can't be extended in place. */ + packet = pq_queue (pq, PACKET_TYPE_DATA, source); + + if (!packet) + return ENOBUFS; + else + return packet_write (packet, data, data_len, amount); +} + +static error_t +stream_read (struct packet *packet, int *dequeue, unsigned *flags, + char **data, size_t *data_len, size_t amount) +{ + error_t err = packet_read (packet, data, data_len, amount); + *dequeue = (packet_readable (packet) == 0); + return err; +} + +struct pipe_class _stream_pipe_class = +{ + SOCK_STREAM, 0, stream_read, stream_write +}; +struct pipe_class *stream_pipe_class = &_stream_pipe_class; |