Track waiting requests in a hash table, by xid
authorMark Hills <mark.hills@framestore.com>
Thu, 20 Feb 2014 14:02:08 +0000 (14:02 +0000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 4 Apr 2014 00:52:21 +0000 (17:52 -0700)
NFS servers can respond to requests in any order, and they do. In our
tests there is also some clustering to the responses; it could be
because eg. requests are served synchronously if the data is in the cache.

Introduce a hash table so that we are able to find the pdu quickly in
all cases, assuming random distribution of the responses.

include/libnfs-private.h
lib/init.c
lib/pdu.c
lib/socket.c

index 5d4c47b25447049a273017276f2789cb553aae4e..2e4ccc05f227d8c2919fd9b2c447e96cd1b98678 100644 (file)
@@ -72,6 +72,8 @@ struct rpc_queue {
        struct rpc_pdu *head, *tail;
 };
 
+#define HASHES 1024
+
 struct rpc_context {
        uint32_t magic;
        int fd;
@@ -91,7 +93,7 @@ struct rpc_context {
 
        struct rpc_queue outqueue;
        struct sockaddr_storage udp_src;
-       struct rpc_queue waitpdu;
+       struct rpc_queue waitpdu[HASHES];
 
        uint32_t inpos;
        uint32_t insize;
@@ -136,6 +138,7 @@ struct rpc_pdu {
 void rpc_reset_queue(struct rpc_queue *q);
 void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu);
 void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu);
+unsigned int rpc_hash_xid(uint32_t xid);
 
 struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize);
 void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
index 0b7142586916d8a4196bfb753e6309d908d57622..d65520354f80ea2457c6bf633a85b4bfe9ee18db 100644 (file)
@@ -49,6 +49,7 @@ struct rpc_context *rpc_init_context(void)
 {
        struct rpc_context *rpc;
        static uint32_t salt = 0;
+       unsigned int i;
 
        rpc = malloc(sizeof(struct rpc_context));
        if (rpc == NULL) {
@@ -84,7 +85,8 @@ struct rpc_context *rpc_init_context(void)
        rpc->gid = getgid();
 #endif
        rpc_reset_queue(&rpc->outqueue);
-       rpc_reset_queue(&rpc->waitpdu);
+       for (i = 0; i < HASHES; i++)
+               rpc_reset_queue(&rpc->waitpdu[i]);
 
        return rpc;
 }
@@ -158,6 +160,7 @@ char *rpc_get_error(struct rpc_context *rpc)
 void rpc_error_all_pdus(struct rpc_context *rpc, char *error)
 {
        struct rpc_pdu *pdu, *next;
+       unsigned int i;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -168,12 +171,16 @@ void rpc_error_all_pdus(struct rpc_context *rpc, char *error)
        }
        rpc->outqueue.tail = NULL;
 
-       while((pdu = rpc->waitpdu.head) != NULL) {
-               pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
-               rpc->waitpdu.head = pdu->next;
-               rpc_free_pdu(rpc, pdu);
+       for (i = 0; i < HASHES; i++) {
+               struct rpc_queue *q = &rpc->waitpdu[i];
+
+               while((pdu = q->head) != NULL) {
+                       pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
+                       q->head = pdu->next;
+                       rpc_free_pdu(rpc, pdu);
+               }
+               q->tail = NULL;
        }
-       rpc->waitpdu.tail = NULL;
 }
 
 static void rpc_free_fragment(struct rpc_fragment *fragment)
@@ -222,6 +229,7 @@ int rpc_add_fragment(struct rpc_context *rpc, char *data, uint64_t size)
 void rpc_destroy_context(struct rpc_context *rpc)
 {
        struct rpc_pdu *pdu;
+       unsigned int i;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -230,10 +238,15 @@ void rpc_destroy_context(struct rpc_context *rpc)
                rpc->outqueue.head = pdu->next;
                rpc_free_pdu(rpc, pdu);
        }
-       while((pdu = rpc->waitpdu.head) != NULL) {
-               pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
-               rpc->outqueue.head = pdu->next;
-               rpc_free_pdu(rpc, pdu);
+
+       for (i = 0; i < HASHES; i++) {
+               struct rpc_queue *q = &rpc->waitpdu[i];
+
+               while((pdu = q->head) != NULL) {
+                       pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
+                       rpc->outqueue.head = pdu->next;
+                       rpc_free_pdu(rpc, pdu);
+               }
        }
 
        rpc_free_all_fragments(rpc);
index 05c2aededd70dac5c03612808603549fcd6e9e11..4d487b71b0c692408dd606881294eaa229dde9b0 100644 (file)
--- a/lib/pdu.c
+++ b/lib/pdu.c
@@ -79,6 +79,11 @@ void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu)
                q->tail = pdu;
 }
 
+unsigned int rpc_hash_xid(uint32_t xid)
+{
+       return (xid * 7919) % HASHES;
+}
+
 struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
 {
        struct rpc_pdu *pdu;
@@ -159,13 +164,17 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
 
        /* for udp we dont queue, we just send it straight away */
        if (rpc->is_udp != 0) {
+               unsigned int hash;
+
 // XXX add a rpc->udp_dest_sock_size  and get rid of sys/socket.h and netinet/in.h
                if (sendto(rpc->fd, rpc->encodebuf, size, MSG_DONTWAIT, rpc->udp_dest, sizeof(struct sockaddr_in)) < 0) {
                        rpc_set_error(rpc, "Sendto failed with errno %s", strerror(errno));
                        rpc_free_pdu(rpc, pdu);
                        return -1;
                }
-               rpc_enqueue(&rpc->waitpdu, pdu);
+
+               hash = rpc_hash_xid(pdu->xid);
+               rpc_enqueue(&rpc->waitpdu[hash], pdu);
                return 0;
        }
 
@@ -268,8 +277,10 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR *
 int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
 {
        struct rpc_pdu *pdu, *prev_pdu;
+       struct rpc_queue *q;
        ZDR zdr;
        int pos, recordmarker = 0;
+       unsigned int hash;
        uint32_t xid;
        char *reasbuf = NULL;
 
@@ -332,20 +343,24 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
        }
        zdr_setpos(&zdr, pos);
 
-       /* Linear traverse singly-linked list, but track previous
-        * entry for optimised removal */
+       /* Look up the transaction in a hash table of our requests */
+       hash = rpc_hash_xid(xid);
+       q = &rpc->waitpdu[hash];
+
+       /* Follow the hash chain.  Linear traverse singly-linked list,
+        * but track previous entry for optimised removal */
        prev_pdu = NULL;
-       for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
+       for (pdu=q->head; pdu; pdu=pdu->next) {
                if (pdu->xid != xid) {
                        prev_pdu = pdu;
                        continue;
                }
                if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
                        /* Singly-linked but we track head and tail */
-                       if (pdu == rpc->waitpdu.head)
-                               rpc->waitpdu.head = pdu->next;
-                       if (pdu == rpc->waitpdu.tail)
-                               rpc->waitpdu.tail = prev_pdu;
+                       if (pdu == q->head)
+                               q->head = pdu->next;
+                       if (pdu == q->tail)
+                               q->tail = prev_pdu;
                        if (prev_pdu != NULL)
                                prev_pdu->next = pdu->next;
                }
index c518603c5ef7896378bbd72e67ace1dda42dcfa4..6bf3cfff23d2d65dfe6120201c33b211de114c9e 100644 (file)
@@ -174,10 +174,14 @@ static int rpc_write_to_socket(struct rpc_context *rpc)
 
                pdu->written += count;
                if (pdu->written == total) {
+                       unsigned int hash;
+
                        rpc->outqueue.head = pdu->next;
                        if (pdu->next == NULL)
                                rpc->outqueue.tail = NULL;
-                       rpc_enqueue(&rpc->waitpdu, pdu);
+
+                       hash = rpc_hash_xid(pdu->xid);
+                       rpc_enqueue(&rpc->waitpdu[hash], pdu);
                }
        }
        return 0;
@@ -607,6 +611,7 @@ static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, vo
 static int rpc_reconnect_requeue(struct rpc_context *rpc)
 {
        struct rpc_pdu *pdu;
+       unsigned int i;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -620,12 +625,16 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc)
        /* socket is closed so we will not get any replies to any commands
         * in flight. Move them all over from the waitpdu queue back to the out queue
         */
-       for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
-               rpc_return_to_queue(&rpc->outqueue, pdu);
-               /* we have to re-send the whole pdu again */
-               pdu->written = 0;
+       for (i = 0; i < HASHES; i++) {
+               struct rpc_queue *q = &rpc->waitpdu[i];
+
+               for (pdu=q->head; pdu; pdu=pdu->next) {
+                       rpc_return_to_queue(&rpc->outqueue, pdu);
+                       /* we have to re-send the whole pdu again */
+                       pdu->written = 0;
+               }
+               rpc_reset_queue(q);
        }
-       rpc_reset_queue(&rpc->waitpdu);
 
        if (rpc->auto_reconnect != 0) {
                rpc->connect_cb  = reconnect_cb;
@@ -734,14 +743,19 @@ int rpc_queue_length(struct rpc_context *rpc)
 {
        int i=0;
        struct rpc_pdu *pdu;
+       unsigned int n;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
        for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
                i++;
        }
-       for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {
-               i++;
+
+       for (n = 0; n < HASHES; n++) {
+               struct rpc_queue *q = &rpc->waitpdu[n];
+
+               for(pdu = q->head; pdu; pdu = pdu->next)
+                       i++;
        }
        return i;
 }