aboutsummaryrefslogtreecommitdiff
path: root/pflocal/connq.c
diff options
context:
space:
mode:
Diffstat (limited to 'pflocal/connq.c')
-rw-r--r--pflocal/connq.c327
1 files changed, 327 insertions, 0 deletions
diff --git a/pflocal/connq.c b/pflocal/connq.c
new file mode 100644
index 00000000..d86f9a24
--- /dev/null
+++ b/pflocal/connq.c
@@ -0,0 +1,327 @@
+/* Listen queue functions
+
+ Copyright (C) 1995,96,2001,2012 Free Software Foundation, Inc.
+
+ Written by Miles Bader <miles@gnu.org>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2, or (at
+ your option) any later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+
+#include <pthread.h>
+#include <assert.h>
+#include <stdlib.h>
+
+#include "connq.h"
+
+/* A queue for queueing incoming connections. */
+struct connq
+{
+ /* The connection request queue. */
+ struct connq_request *head;
+ struct connq_request **tail;
+ unsigned count;
+ unsigned max;
+
+ /* Threads that have done an accept on this queue wait on this condition. */
+ pthread_cond_t listeners;
+ unsigned num_listeners;
+
+ /* Threads that have done a connect on this queue wait on this condition. */
+ pthread_cond_t connectors;
+ unsigned num_connectors;
+
+ pthread_mutex_t lock;
+};
+
+/* ---------------------------------------------------------------- */
+
+/* A data block allocated by a thread waiting on a connq, which is used to
+ get information from and to the thread. */
+struct connq_request
+{
+ struct connq_request *next;
+
+ /* The socket that's waiting to connect. */
+ struct sock *sock;
+};
+
+static inline void
+connq_request_init (struct connq_request *req, struct sock *sock)
+{
+ req->sock = sock;
+}
+
+/* Enqueue connection request REQ onto CQ. CQ must be locked. */
+static void
+connq_request_enqueue (struct connq *cq, struct connq_request *req)
+{
+ assert (pthread_mutex_trylock (&cq->lock));
+
+ req->next = NULL;
+ *cq->tail = req;
+ cq->tail = &req->next;
+
+ cq->count ++;
+}
+
+/* Dequeue a pending request from CQ. CQ must be locked and must not
+ be empty. */
+static struct connq_request *
+connq_request_dequeue (struct connq *cq)
+{
+ struct connq_request *req;
+
+ assert (pthread_mutex_trylock (&cq->lock));
+ assert (cq->head);
+
+ req = cq->head;
+ cq->head = req->next;
+ if (! cq->head)
+ /* We just dequeued the last element. Fixup the tail pointer. */
+ cq->tail = &cq->head;
+
+ cq->count --;
+
+ return req;
+}
+
+/* ---------------------------------------------------------------- */
+
+/* Create a new listening queue, returning it in CQ. The resulting queue
+ will be of zero length, that is it won't allow connections unless someone
+ is already listening (change this with connq_set_length). */
+error_t
+connq_create (struct connq **cq)
+{
+ struct connq *new = malloc (sizeof (struct connq));
+
+ if (!new)
+ return ENOBUFS;
+
+ new->head = NULL;
+ new->tail = &new->head;
+ new->count = 0;
+ /* By default, don't queue requests. */
+ new->max = 0;
+
+ new->num_listeners = 0;
+ new->num_connectors = 0;
+
+ pthread_mutex_init (&new->lock, NULL);
+ pthread_cond_init (&new->listeners, NULL);
+ pthread_cond_init (&new->connectors, NULL);
+
+ *cq = new;
+ return 0;
+}
+
+/* Destroy a queue. */
+void
+connq_destroy (struct connq *cq)
+{
+ /* Everybody in the queue should hold a reference to the socket
+ containing the queue. */
+ assert (! cq->head);
+ assert (cq->count == 0);
+
+ free (cq);
+}
+
+/* ---------------------------------------------------------------- */
+
+/* Return a connection request on CQ. If SOCK is NULL, the request is
+ left in the queue. If TIMEOUT denotes a value of 0, EWOULDBLOCK is
+ returned when there are no immediate connections available.
+ Otherwise this value is used to limit the wait duration. If TIMEOUT
+ is NULL, the wait duration isn't bounded. */
+error_t
+connq_listen (struct connq *cq, struct timespec *tsp, struct sock **sock)
+{
+ error_t err = 0;
+
+ pthread_mutex_lock (&cq->lock);
+
+ if (tsp && tsp->tv_sec == 0 && tsp->tv_nsec == 0 && cq->count == 0
+ && cq->num_connectors == 0)
+ {
+ pthread_mutex_unlock (&cq->lock);
+ return EWOULDBLOCK;
+ }
+
+ if (! sock && (cq->count > 0 || cq->num_connectors > 0))
+ /* The caller just wants to know if a connection ready. */
+ {
+ pthread_mutex_unlock (&cq->lock);
+ return 0;
+ }
+
+ cq->num_listeners++;
+
+ if (cq->count == 0)
+ /* The request queue is empty. */
+ {
+ assert (! cq->head);
+
+ if (cq->num_connectors > 0)
+ /* Someone is waiting for an acceptor. Signal that we can
+ service their request. */
+ pthread_cond_signal (&cq->connectors);
+
+ do
+ {
+ err = pthread_hurd_cond_timedwait_np (&cq->listeners, &cq->lock, tsp);
+ if (err)
+ {
+ cq->num_listeners--;
+ goto out;
+ }
+ }
+ while (cq->count == 0);
+ }
+
+ assert (cq->head);
+
+ if (sock)
+ /* Dequeue the next request, if desired. */
+ {
+ struct connq_request *req = connq_request_dequeue (cq);
+ *sock = req->sock;
+ free (req);
+ }
+ else if (cq->num_listeners > 0)
+ /* The caller will not actually process this request but someone
+ else could. (This case is rare but possible: it would require
+ one thread to do a select on the socket and a second to do an
+ accept.) */
+ pthread_cond_signal (&cq->listeners);
+ else
+ /* There is no one else to process the request and the connection
+ has now been initiated. This is not actually a problem as even
+ if the current queue limit is 0, the connector will queue the
+ request and another listener (should) eventually come along.
+ (In fact it is very probably as the caller has likely done a
+ select and will now follow up with an accept.) */
+ { /* Do nothing. */ }
+
+ out:
+ pthread_mutex_unlock (&cq->lock);
+ return err;
+}
+
+/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is
+ true, then return EWOULDBLOCK if there are no connections
+ immediately available. On success, this call must be followed up
+ either connq_connect_complete or connq_connect_cancel. */
+error_t
+connq_connect (struct connq *cq, int noblock)
+{
+ pthread_mutex_lock (&cq->lock);
+
+ /* Check for listeners after we've locked CQ for good. */
+
+ if (noblock
+ && cq->count + cq->num_connectors >= cq->max + cq->num_listeners)
+ /* We are in non-blocking mode and would have to wait to secure an
+ entry in the listen queue. */
+ {
+ pthread_mutex_unlock (&cq->lock);
+ return EWOULDBLOCK;
+ }
+
+ cq->num_connectors ++;
+
+ while (cq->count + cq->num_connectors > cq->max + cq->num_listeners)
+ /* The queue is full and there is no immediate listener to service
+ us. Block until we can get a slot. */
+ if (pthread_hurd_cond_wait_np (&cq->connectors, &cq->lock))
+ {
+ cq->num_connectors --;
+ pthread_mutex_unlock (&cq->lock);
+ return EINTR;
+ }
+
+ pthread_mutex_unlock (&cq->lock);
+
+ return 0;
+}
+
+/* Follow up to connq_connect. Completes the connect, SOCK is the new
+ server socket. */
+void
+connq_connect_complete (struct connq *cq, struct sock *sock)
+{
+ struct connq_request *req;
+
+ req = malloc (sizeof (struct connq_request));
+ if (! req)
+ abort ();
+
+ connq_request_init (req, sock);
+
+ pthread_mutex_lock (&cq->lock);
+
+ assert (cq->num_connectors > 0);
+ cq->num_connectors --;
+
+ connq_request_enqueue (cq, req);
+
+ if (cq->num_listeners > 0)
+ /* Wake a listener up. We must consume the listener ref here as
+ someone else might call this function before the listener
+ thread dequeues this request. */
+ {
+ cq->num_listeners --;
+ pthread_cond_signal (&cq->listeners);
+ }
+
+ pthread_mutex_unlock (&cq->lock);
+}
+
+/* Follow up to connq_connect. Cancel the connect. */
+void
+connq_connect_cancel (struct connq *cq)
+{
+ pthread_mutex_lock (&cq->lock);
+
+ assert (cq->num_connectors > 0);
+ cq->num_connectors --;
+
+ if (cq->count + cq->num_connectors >= cq->max + cq->num_listeners)
+ /* A connector is blocked and could use the spot we reserved. */
+ pthread_cond_signal (&cq->connectors);
+
+ pthread_mutex_unlock (&cq->lock);
+}
+
+/* Set CQ's queue length to LENGTH. */
+error_t
+connq_set_length (struct connq *cq, int max)
+{
+ int omax;
+
+ pthread_mutex_lock (&cq->lock);
+ omax = cq->max;
+ cq->max = max;
+
+ if (max > omax && cq->count >= omax && cq->count < max
+ && cq->num_connectors >= cq->num_listeners)
+ /* This is an increase in the number of connection slots which has
+ made some slots available and there are waiting threads. Wake
+ them up. */
+ pthread_cond_broadcast (&cq->listeners);
+
+ pthread_mutex_unlock (&cq->lock);
+
+ return 0;
+}