Optimisations to the pdu queues
authorMark Hills <mark.hills@framestore.com>
Tue, 7 Jan 2014 10:23:46 +0000 (10:23 +0000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 4 Apr 2014 00:51:04 +0000 (17:51 -0700)
When making many concurrent requests (as is likely in any performance
criticial application), the use of SLIST_REMOVE and SLIST_ADD_END are
a severe bottleneck because of their linear search.

I considered using a double-linked list but it was unnecessary to
allocate the additional memory for each list entry.

Instead, continue to use a single-linked list but retain:

* a pointer to the end of the list; and
* a pointer to the previous entry during a linear search.

The former would makes append operations O(1) time, and the latter
does the same for removal. We can do this because removal only happens
within the linear search, and there is no random access to the queue.

include/libnfs-private.h
lib/init.c
lib/pdu.c
lib/socket.c

index 07ce9a8f96b2b9d3edf355cbbc56cb0a4994cee5..5d4c47b25447049a273017276f2789cb553aae4e 100644 (file)
@@ -65,6 +65,13 @@ struct rpc_fragment {
 #define RPC_CONTEXT_MAGIC 0xc6e46435
 #define RPC_PARAM_UNDEFINED -1
 
+/*
+ * Queue is singly-linked but we hold on to the tail
+ */
+struct rpc_queue {
+       struct rpc_pdu *head, *tail;
+};
+
 struct rpc_context {
        uint32_t magic;
        int fd;
@@ -82,9 +89,9 @@ struct rpc_context {
        char *encodebuf;
        int encodebuflen;
 
-       struct rpc_pdu *outqueue;
+       struct rpc_queue outqueue;
        struct sockaddr_storage udp_src;
-       struct rpc_pdu *waitpdu;
+       struct rpc_queue waitpdu;
 
        uint32_t inpos;
        uint32_t insize;
@@ -126,6 +133,10 @@ struct rpc_pdu {
        uint32_t zdr_decode_bufsize;
 };
 
+void rpc_reset_queue(struct rpc_queue *q);
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu);
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu);
+
 struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize);
 void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
 int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
index 36d4880006d9f244b804fdf0a8b2631c2a220924..0b7142586916d8a4196bfb753e6309d908d57622 100644 (file)
@@ -83,6 +83,8 @@ struct rpc_context *rpc_init_context(void)
        rpc->uid = getuid();
        rpc->gid = getgid();
 #endif
+       rpc_reset_queue(&rpc->outqueue);
+       rpc_reset_queue(&rpc->waitpdu);
 
        return rpc;
 }
@@ -155,20 +157,23 @@ char *rpc_get_error(struct rpc_context *rpc)
 
 void rpc_error_all_pdus(struct rpc_context *rpc, char *error)
 {
-       struct rpc_pdu *pdu;
+       struct rpc_pdu *pdu, *next;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       while((pdu = rpc->outqueue) != NULL) {
+       while ((pdu = rpc->outqueue.head) != NULL) {
                pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
-               SLIST_REMOVE(&rpc->outqueue, pdu);
+               rpc->outqueue.head = pdu->next;
                rpc_free_pdu(rpc, pdu);
        }
-       while((pdu = rpc->waitpdu) != NULL) {
+       rpc->outqueue.tail = NULL;
+
+       while((pdu = rpc->waitpdu.head) != NULL) {
                pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
-               SLIST_REMOVE(&rpc->waitpdu, pdu);
+               rpc->waitpdu.head = pdu->next;
                rpc_free_pdu(rpc, pdu);
        }
+       rpc->waitpdu.tail = NULL;
 }
 
 static void rpc_free_fragment(struct rpc_fragment *fragment)
@@ -186,7 +191,7 @@ void rpc_free_all_fragments(struct rpc_context *rpc)
        while (rpc->fragments != NULL) {
              struct rpc_fragment *fragment = rpc->fragments;
 
-             SLIST_REMOVE(&rpc->fragments, fragment);
+             rpc->fragments = fragment->next;
              rpc_free_fragment(fragment);
        }
 }
@@ -220,14 +225,14 @@ void rpc_destroy_context(struct rpc_context *rpc)
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       while((pdu = rpc->outqueue) != NULL) {
+       while((pdu = rpc->outqueue.head) != NULL) {
                pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
-               SLIST_REMOVE(&rpc->outqueue, pdu);
+               rpc->outqueue.head = pdu->next;
                rpc_free_pdu(rpc, pdu);
        }
-       while((pdu = rpc->waitpdu) != NULL) {
+       while((pdu = rpc->waitpdu.head) != NULL) {
                pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
-               SLIST_REMOVE(&rpc->waitpdu, pdu);
+               rpc->outqueue.head = pdu->next;
                rpc_free_pdu(rpc, pdu);
        }
 
index 5def0bc5d9041363b2bc833b5f50677750e5c517..2f4761d20a2b87825d52c1c1f2df786ceb14c464 100644 (file)
--- a/lib/pdu.c
+++ b/lib/pdu.c
 #include "libnfs-raw.h"
 #include "libnfs-private.h"
 
+void rpc_reset_queue(struct rpc_queue *q)
+{
+       q->head = NULL;
+       q->tail = NULL;
+}
+
+/*
+ * Push to the tail end of the queue
+ */
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+       if (q->head == NULL)
+               q->head = pdu;
+       else
+               q->tail->next = pdu;
+       q->tail = pdu;
+}
+
+/*
+ * Push to the front/head of the queue
+ */
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+       pdu->next = q->head;
+       q->head = pdu;
+       if (q->tail == NULL)
+               q->tail = pdu;
+}
+
 struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
 {
        struct rpc_pdu *pdu;
@@ -135,7 +164,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
                        rpc_free_pdu(rpc, pdu);
                        return -1;
                }
-               SLIST_ADD_END(&rpc->waitpdu, pdu);
+               rpc_enqueue(&rpc->waitpdu, pdu);
                return 0;
        }
 
@@ -153,7 +182,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
        }
 
        memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size);
-       SLIST_ADD_END(&rpc->outqueue, pdu);
+       rpc_enqueue(&rpc->outqueue, pdu);
 
        return 0;
 }
@@ -237,7 +266,7 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR *
 
 int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
 {
-       struct rpc_pdu *pdu;
+       struct rpc_pdu *pdu, *prev_pdu;
        ZDR zdr;
        int pos, recordmarker = 0;
        uint32_t xid;
@@ -302,12 +331,22 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
        }
        zdr_setpos(&zdr, pos);
 
-       for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
+       /* Linear traverse singly-linked list, but track previous
+        * entry for optimised removal */
+       prev_pdu = NULL;
+       for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
                if (pdu->xid != xid) {
+                       prev_pdu = pdu;
                        continue;
                }
                if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
-                       SLIST_REMOVE(&rpc->waitpdu, pdu);
+                       /* Singly-linked but we track head and tail */
+                       if (pdu == rpc->waitpdu.head)
+                               rpc->waitpdu.head = pdu->next;
+                       if (pdu == rpc->waitpdu.tail)
+                               rpc->waitpdu.tail = prev_pdu;
+                       if (prev_pdu != NULL)
+                               prev_pdu->next = pdu->next;
                }
                if (rpc_process_reply(rpc, pdu, &zdr) != 0) {
                        rpc_set_error(rpc, "rpc_procdess_reply failed");
index d025e070e017091e1f796178c139e8ff44c7cf48..c518603c5ef7896378bbd72e67ace1dda42dcfa4 100644 (file)
@@ -122,6 +122,11 @@ int rpc_get_fd(struct rpc_context *rpc)
        return rpc->fd;
 }
 
+static int rpc_has_queue(struct rpc_queue *q)
+{
+       return q->head != NULL;
+}
+
 int rpc_which_events(struct rpc_context *rpc)
 {
        int events;
@@ -135,7 +140,7 @@ int rpc_which_events(struct rpc_context *rpc)
                return POLLIN;
        }
 
-       if (rpc->outqueue) {
+       if (rpc_has_queue(&rpc->outqueue)) {
                events |= POLLOUT;
        }
        return events;
@@ -144,6 +149,7 @@ int rpc_which_events(struct rpc_context *rpc)
 static int rpc_write_to_socket(struct rpc_context *rpc)
 {
        int32_t count;
+       struct rpc_pdu *pdu;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -152,12 +158,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc)
                return -1;
        }
 
-       while (rpc->outqueue != NULL) {
+       while ((pdu = rpc->outqueue.head) != NULL) {
                int64_t total;
 
-               total = rpc->outqueue->outdata.size;
+               total = pdu->outdata.size;
 
-               count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+               count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
                if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                                return 0;
@@ -166,12 +172,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc)
                        return -1;
                }
 
-               rpc->outqueue->written += count;
-               if (rpc->outqueue->written == total) {
-                       struct rpc_pdu *pdu = rpc->outqueue;
-
-                       SLIST_REMOVE(&rpc->outqueue, pdu);
-                       SLIST_ADD_END(&rpc->waitpdu, pdu);
+               pdu->written += count;
+               if (pdu->written == total) {
+                       rpc->outqueue.head = pdu->next;
+                       if (pdu->next == NULL)
+                               rpc->outqueue.tail = NULL;
+                       rpc_enqueue(&rpc->waitpdu, pdu);
                }
        }
        return 0;
@@ -368,7 +374,7 @@ int rpc_service(struct rpc_context *rpc, int revents)
                }
        }
 
-       if (revents & POLLOUT && rpc->outqueue != NULL) {
+       if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
                if (rpc_write_to_socket(rpc) != 0) {
                        rpc_set_error(rpc, "write to socket failed");
                        return -1;
@@ -614,12 +620,12 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc)
        /* socket is closed so we will not get any replies to any commands
         * in flight. Move them all over from the waitpdu queue back to the out queue
         */
-       for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
-               SLIST_REMOVE(&rpc->waitpdu, pdu);
-               SLIST_ADD(&rpc->outqueue, pdu);
+       for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
+               rpc_return_to_queue(&rpc->outqueue, pdu);
                /* we have to re-send the whole pdu again */
                pdu->written = 0;
        }
+       rpc_reset_queue(&rpc->waitpdu);
 
        if (rpc->auto_reconnect != 0) {
                rpc->connect_cb  = reconnect_cb;
@@ -731,10 +737,10 @@ int rpc_queue_length(struct rpc_context *rpc)
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+       for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
                i++;
        }
-       for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
+       for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {
                i++;
        }
        return i;