From 63f36a0923e8aef6ef3f91cd30fba8ddc9f9509a Mon Sep 17 00:00:00 2001 From: Mark Hills Date: Thu, 20 Feb 2014 14:02:08 +0000 Subject: [PATCH] Track waiting requests in a hash table, by xid NFS servers can respond to requests in any order, and they do. In our tests there is also some clustering to the responses; it could be because eg. requests are served synchronously if the data is in the cache. Introduce a hash table so that we are able to find the pdu quickly in all cases, assuming random distribution of the responses. --- include/libnfs-private.h | 5 ++++- lib/init.c | 33 +++++++++++++++++++++++---------- lib/pdu.c | 31 +++++++++++++++++++++++-------- lib/socket.c | 30 ++++++++++++++++++++++-------- 4 files changed, 72 insertions(+), 27 deletions(-) diff --git a/include/libnfs-private.h b/include/libnfs-private.h index 5d4c47b..2e4ccc0 100644 --- a/include/libnfs-private.h +++ b/include/libnfs-private.h @@ -72,6 +72,8 @@ struct rpc_queue { struct rpc_pdu *head, *tail; }; +#define HASHES 1024 + struct rpc_context { uint32_t magic; int fd; @@ -91,7 +93,7 @@ struct rpc_context { struct rpc_queue outqueue; struct sockaddr_storage udp_src; - struct rpc_queue waitpdu; + struct rpc_queue waitpdu[HASHES]; uint32_t inpos; uint32_t insize; @@ -136,6 +138,7 @@ struct rpc_pdu { void rpc_reset_queue(struct rpc_queue *q); void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu); void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu); +unsigned int rpc_hash_xid(uint32_t xid); struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize); void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu); diff --git a/lib/init.c b/lib/init.c index 0b71425..d655203 100644 --- a/lib/init.c +++ b/lib/init.c @@ -49,6 +49,7 @@ struct rpc_context *rpc_init_context(void) { struct rpc_context *rpc; static uint32_t salt = 0; + unsigned int i; rpc = malloc(sizeof(struct rpc_context)); if (rpc == NULL) { @@ -84,7 +85,8 @@ struct rpc_context *rpc_init_context(void) rpc->gid = getgid(); #endif rpc_reset_queue(&rpc->outqueue); - rpc_reset_queue(&rpc->waitpdu); + for (i = 0; i < HASHES; i++) + rpc_reset_queue(&rpc->waitpdu[i]); return rpc; } @@ -158,6 +160,7 @@ char *rpc_get_error(struct rpc_context *rpc) void rpc_error_all_pdus(struct rpc_context *rpc, char *error) { struct rpc_pdu *pdu, *next; + unsigned int i; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -168,12 +171,16 @@ void rpc_error_all_pdus(struct rpc_context *rpc, char *error) } rpc->outqueue.tail = NULL; - while((pdu = rpc->waitpdu.head) != NULL) { - pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data); - rpc->waitpdu.head = pdu->next; - rpc_free_pdu(rpc, pdu); + for (i = 0; i < HASHES; i++) { + struct rpc_queue *q = &rpc->waitpdu[i]; + + while((pdu = q->head) != NULL) { + pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data); + q->head = pdu->next; + rpc_free_pdu(rpc, pdu); + } + q->tail = NULL; } - rpc->waitpdu.tail = NULL; } static void rpc_free_fragment(struct rpc_fragment *fragment) @@ -222,6 +229,7 @@ int rpc_add_fragment(struct rpc_context *rpc, char *data, uint64_t size) void rpc_destroy_context(struct rpc_context *rpc) { struct rpc_pdu *pdu; + unsigned int i; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -230,10 +238,15 @@ void rpc_destroy_context(struct rpc_context *rpc) rpc->outqueue.head = pdu->next; rpc_free_pdu(rpc, pdu); } - while((pdu = rpc->waitpdu.head) != NULL) { - pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data); - rpc->outqueue.head = pdu->next; - rpc_free_pdu(rpc, pdu); + + for (i = 0; i < HASHES; i++) { + struct rpc_queue *q = &rpc->waitpdu[i]; + + while((pdu = q->head) != NULL) { + pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data); + rpc->outqueue.head = pdu->next; + rpc_free_pdu(rpc, pdu); + } } rpc_free_all_fragments(rpc); diff --git a/lib/pdu.c b/lib/pdu.c index 05c2aed..4d487b7 100644 --- a/lib/pdu.c +++ b/lib/pdu.c @@ -79,6 +79,11 @@ void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu) q->tail = pdu; } +unsigned int rpc_hash_xid(uint32_t xid) +{ + return (xid * 7919) % HASHES; +} + struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize) { struct rpc_pdu *pdu; @@ -159,13 +164,17 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) /* for udp we dont queue, we just send it straight away */ if (rpc->is_udp != 0) { + unsigned int hash; + // XXX add a rpc->udp_dest_sock_size and get rid of sys/socket.h and netinet/in.h if (sendto(rpc->fd, rpc->encodebuf, size, MSG_DONTWAIT, rpc->udp_dest, sizeof(struct sockaddr_in)) < 0) { rpc_set_error(rpc, "Sendto failed with errno %s", strerror(errno)); rpc_free_pdu(rpc, pdu); return -1; } - rpc_enqueue(&rpc->waitpdu, pdu); + + hash = rpc_hash_xid(pdu->xid); + rpc_enqueue(&rpc->waitpdu[hash], pdu); return 0; } @@ -268,8 +277,10 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR * int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) { struct rpc_pdu *pdu, *prev_pdu; + struct rpc_queue *q; ZDR zdr; int pos, recordmarker = 0; + unsigned int hash; uint32_t xid; char *reasbuf = NULL; @@ -332,20 +343,24 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) } zdr_setpos(&zdr, pos); - /* Linear traverse singly-linked list, but track previous - * entry for optimised removal */ + /* Look up the transaction in a hash table of our requests */ + hash = rpc_hash_xid(xid); + q = &rpc->waitpdu[hash]; + + /* Follow the hash chain. Linear traverse singly-linked list, + * but track previous entry for optimised removal */ prev_pdu = NULL; - for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) { + for (pdu=q->head; pdu; pdu=pdu->next) { if (pdu->xid != xid) { prev_pdu = pdu; continue; } if (rpc->is_udp == 0 || rpc->is_broadcast == 0) { /* Singly-linked but we track head and tail */ - if (pdu == rpc->waitpdu.head) - rpc->waitpdu.head = pdu->next; - if (pdu == rpc->waitpdu.tail) - rpc->waitpdu.tail = prev_pdu; + if (pdu == q->head) + q->head = pdu->next; + if (pdu == q->tail) + q->tail = prev_pdu; if (prev_pdu != NULL) prev_pdu->next = pdu->next; } diff --git a/lib/socket.c b/lib/socket.c index c518603..6bf3cff 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -174,10 +174,14 @@ static int rpc_write_to_socket(struct rpc_context *rpc) pdu->written += count; if (pdu->written == total) { + unsigned int hash; + rpc->outqueue.head = pdu->next; if (pdu->next == NULL) rpc->outqueue.tail = NULL; - rpc_enqueue(&rpc->waitpdu, pdu); + + hash = rpc_hash_xid(pdu->xid); + rpc_enqueue(&rpc->waitpdu[hash], pdu); } } return 0; @@ -607,6 +611,7 @@ static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, vo static int rpc_reconnect_requeue(struct rpc_context *rpc) { struct rpc_pdu *pdu; + unsigned int i; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -620,12 +625,16 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc) /* socket is closed so we will not get any replies to any commands * in flight. Move them all over from the waitpdu queue back to the out queue */ - for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) { - rpc_return_to_queue(&rpc->outqueue, pdu); - /* we have to re-send the whole pdu again */ - pdu->written = 0; + for (i = 0; i < HASHES; i++) { + struct rpc_queue *q = &rpc->waitpdu[i]; + + for (pdu=q->head; pdu; pdu=pdu->next) { + rpc_return_to_queue(&rpc->outqueue, pdu); + /* we have to re-send the whole pdu again */ + pdu->written = 0; + } + rpc_reset_queue(q); } - rpc_reset_queue(&rpc->waitpdu); if (rpc->auto_reconnect != 0) { rpc->connect_cb = reconnect_cb; @@ -734,14 +743,19 @@ int rpc_queue_length(struct rpc_context *rpc) { int i=0; struct rpc_pdu *pdu; + unsigned int n; assert(rpc->magic == RPC_CONTEXT_MAGIC); for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) { i++; } - for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) { - i++; + + for (n = 0; n < HASHES; n++) { + struct rpc_queue *q = &rpc->waitpdu[n]; + + for(pdu = q->head; pdu; pdu = pdu->next) + i++; } return i; } -- 2.34.1