From: Mark Hills Date: Tue, 7 Jan 2014 10:23:46 +0000 (+0000) Subject: Optimisations to the pdu queues X-Git-Tag: upstream/1.9.6^2~41^2~2 X-Git-Url: https://git.piment-noir.org/?p=deb_libnfs.git;a=commitdiff_plain;h=aec45c6274bffc92fc1595a95d043e8aae292451 Optimisations to the pdu queues When making many concurrent requests (as is likely in any performance criticial application), the use of SLIST_REMOVE and SLIST_ADD_END are a severe bottleneck because of their linear search. I considered using a double-linked list but it was unnecessary to allocate the additional memory for each list entry. Instead, continue to use a single-linked list but retain: * a pointer to the end of the list; and * a pointer to the previous entry during a linear search. The former would makes append operations O(1) time, and the latter does the same for removal. We can do this because removal only happens within the linear search, and there is no random access to the queue. --- diff --git a/include/libnfs-private.h b/include/libnfs-private.h index 07ce9a8..5d4c47b 100644 --- a/include/libnfs-private.h +++ b/include/libnfs-private.h @@ -65,6 +65,13 @@ struct rpc_fragment { #define RPC_CONTEXT_MAGIC 0xc6e46435 #define RPC_PARAM_UNDEFINED -1 +/* + * Queue is singly-linked but we hold on to the tail + */ +struct rpc_queue { + struct rpc_pdu *head, *tail; +}; + struct rpc_context { uint32_t magic; int fd; @@ -82,9 +89,9 @@ struct rpc_context { char *encodebuf; int encodebuflen; - struct rpc_pdu *outqueue; + struct rpc_queue outqueue; struct sockaddr_storage udp_src; - struct rpc_pdu *waitpdu; + struct rpc_queue waitpdu; uint32_t inpos; uint32_t insize; @@ -126,6 +133,10 @@ struct rpc_pdu { uint32_t zdr_decode_bufsize; }; +void rpc_reset_queue(struct rpc_queue *q); +void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu); +void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu); + struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize); void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu); int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu); diff --git a/lib/init.c b/lib/init.c index 36d4880..0b71425 100644 --- a/lib/init.c +++ b/lib/init.c @@ -83,6 +83,8 @@ struct rpc_context *rpc_init_context(void) rpc->uid = getuid(); rpc->gid = getgid(); #endif + rpc_reset_queue(&rpc->outqueue); + rpc_reset_queue(&rpc->waitpdu); return rpc; } @@ -155,20 +157,23 @@ char *rpc_get_error(struct rpc_context *rpc) void rpc_error_all_pdus(struct rpc_context *rpc, char *error) { - struct rpc_pdu *pdu; + struct rpc_pdu *pdu, *next; assert(rpc->magic == RPC_CONTEXT_MAGIC); - while((pdu = rpc->outqueue) != NULL) { + while ((pdu = rpc->outqueue.head) != NULL) { pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data); - SLIST_REMOVE(&rpc->outqueue, pdu); + rpc->outqueue.head = pdu->next; rpc_free_pdu(rpc, pdu); } - while((pdu = rpc->waitpdu) != NULL) { + rpc->outqueue.tail = NULL; + + while((pdu = rpc->waitpdu.head) != NULL) { pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data); - SLIST_REMOVE(&rpc->waitpdu, pdu); + rpc->waitpdu.head = pdu->next; rpc_free_pdu(rpc, pdu); } + rpc->waitpdu.tail = NULL; } static void rpc_free_fragment(struct rpc_fragment *fragment) @@ -186,7 +191,7 @@ void rpc_free_all_fragments(struct rpc_context *rpc) while (rpc->fragments != NULL) { struct rpc_fragment *fragment = rpc->fragments; - SLIST_REMOVE(&rpc->fragments, fragment); + rpc->fragments = fragment->next; rpc_free_fragment(fragment); } } @@ -220,14 +225,14 @@ void rpc_destroy_context(struct rpc_context *rpc) assert(rpc->magic == RPC_CONTEXT_MAGIC); - while((pdu = rpc->outqueue) != NULL) { + while((pdu = rpc->outqueue.head) != NULL) { pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data); - SLIST_REMOVE(&rpc->outqueue, pdu); + rpc->outqueue.head = pdu->next; rpc_free_pdu(rpc, pdu); } - while((pdu = rpc->waitpdu) != NULL) { + while((pdu = rpc->waitpdu.head) != NULL) { pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data); - SLIST_REMOVE(&rpc->waitpdu, pdu); + rpc->outqueue.head = pdu->next; rpc_free_pdu(rpc, pdu); } diff --git a/lib/pdu.c b/lib/pdu.c index 5def0bc..2f4761d 100644 --- a/lib/pdu.c +++ b/lib/pdu.c @@ -49,6 +49,35 @@ #include "libnfs-raw.h" #include "libnfs-private.h" +void rpc_reset_queue(struct rpc_queue *q) +{ + q->head = NULL; + q->tail = NULL; +} + +/* + * Push to the tail end of the queue + */ +void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu) +{ + if (q->head == NULL) + q->head = pdu; + else + q->tail->next = pdu; + q->tail = pdu; +} + +/* + * Push to the front/head of the queue + */ +void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu) +{ + pdu->next = q->head; + q->head = pdu; + if (q->tail == NULL) + q->tail = pdu; +} + struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize) { struct rpc_pdu *pdu; @@ -135,7 +164,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) rpc_free_pdu(rpc, pdu); return -1; } - SLIST_ADD_END(&rpc->waitpdu, pdu); + rpc_enqueue(&rpc->waitpdu, pdu); return 0; } @@ -153,7 +182,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) } memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size); - SLIST_ADD_END(&rpc->outqueue, pdu); + rpc_enqueue(&rpc->outqueue, pdu); return 0; } @@ -237,7 +266,7 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR * int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) { - struct rpc_pdu *pdu; + struct rpc_pdu *pdu, *prev_pdu; ZDR zdr; int pos, recordmarker = 0; uint32_t xid; @@ -302,12 +331,22 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) } zdr_setpos(&zdr, pos); - for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) { + /* Linear traverse singly-linked list, but track previous + * entry for optimised removal */ + prev_pdu = NULL; + for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) { if (pdu->xid != xid) { + prev_pdu = pdu; continue; } if (rpc->is_udp == 0 || rpc->is_broadcast == 0) { - SLIST_REMOVE(&rpc->waitpdu, pdu); + /* Singly-linked but we track head and tail */ + if (pdu == rpc->waitpdu.head) + rpc->waitpdu.head = pdu->next; + if (pdu == rpc->waitpdu.tail) + rpc->waitpdu.tail = prev_pdu; + if (prev_pdu != NULL) + prev_pdu->next = pdu->next; } if (rpc_process_reply(rpc, pdu, &zdr) != 0) { rpc_set_error(rpc, "rpc_procdess_reply failed"); diff --git a/lib/socket.c b/lib/socket.c index d025e07..c518603 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -122,6 +122,11 @@ int rpc_get_fd(struct rpc_context *rpc) return rpc->fd; } +static int rpc_has_queue(struct rpc_queue *q) +{ + return q->head != NULL; +} + int rpc_which_events(struct rpc_context *rpc) { int events; @@ -135,7 +140,7 @@ int rpc_which_events(struct rpc_context *rpc) return POLLIN; } - if (rpc->outqueue) { + if (rpc_has_queue(&rpc->outqueue)) { events |= POLLOUT; } return events; @@ -144,6 +149,7 @@ int rpc_which_events(struct rpc_context *rpc) static int rpc_write_to_socket(struct rpc_context *rpc) { int32_t count; + struct rpc_pdu *pdu; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -152,12 +158,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc) return -1; } - while (rpc->outqueue != NULL) { + while ((pdu = rpc->outqueue.head) != NULL) { int64_t total; - total = rpc->outqueue->outdata.size; + total = pdu->outdata.size; - count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0); + count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0); if (count == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return 0; @@ -166,12 +172,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc) return -1; } - rpc->outqueue->written += count; - if (rpc->outqueue->written == total) { - struct rpc_pdu *pdu = rpc->outqueue; - - SLIST_REMOVE(&rpc->outqueue, pdu); - SLIST_ADD_END(&rpc->waitpdu, pdu); + pdu->written += count; + if (pdu->written == total) { + rpc->outqueue.head = pdu->next; + if (pdu->next == NULL) + rpc->outqueue.tail = NULL; + rpc_enqueue(&rpc->waitpdu, pdu); } } return 0; @@ -368,7 +374,7 @@ int rpc_service(struct rpc_context *rpc, int revents) } } - if (revents & POLLOUT && rpc->outqueue != NULL) { + if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) { if (rpc_write_to_socket(rpc) != 0) { rpc_set_error(rpc, "write to socket failed"); return -1; @@ -614,12 +620,12 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc) /* socket is closed so we will not get any replies to any commands * in flight. Move them all over from the waitpdu queue back to the out queue */ - for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) { - SLIST_REMOVE(&rpc->waitpdu, pdu); - SLIST_ADD(&rpc->outqueue, pdu); + for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) { + rpc_return_to_queue(&rpc->outqueue, pdu); /* we have to re-send the whole pdu again */ pdu->written = 0; } + rpc_reset_queue(&rpc->waitpdu); if (rpc->auto_reconnect != 0) { rpc->connect_cb = reconnect_cb; @@ -731,10 +737,10 @@ int rpc_queue_length(struct rpc_context *rpc) assert(rpc->magic == RPC_CONTEXT_MAGIC); - for(pdu = rpc->outqueue; pdu; pdu = pdu->next) { + for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) { i++; } - for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) { + for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) { i++; } return i;