From: Ronnie Sahlberg Date: Fri, 11 Apr 2014 02:59:28 +0000 (-0700) Subject: Merge branch 'xid-hash' X-Git-Tag: upstream/1.9.6^2~41 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=a87bb233a859992cdbe1ef4e92161b45ed8e5c92;hp=0275da678b5e0dfdb694c7287503601dcb8fa54c;p=deb_libnfs.git Merge branch 'xid-hash' --- diff --git a/include/libnfs-private.h b/include/libnfs-private.h index 07ce9a8..2e4ccc0 100644 --- a/include/libnfs-private.h +++ b/include/libnfs-private.h @@ -65,6 +65,15 @@ struct rpc_fragment { #define RPC_CONTEXT_MAGIC 0xc6e46435 #define RPC_PARAM_UNDEFINED -1 +/* + * Queue is singly-linked but we hold on to the tail + */ +struct rpc_queue { + struct rpc_pdu *head, *tail; +}; + +#define HASHES 1024 + struct rpc_context { uint32_t magic; int fd; @@ -82,9 +91,9 @@ struct rpc_context { char *encodebuf; int encodebuflen; - struct rpc_pdu *outqueue; + struct rpc_queue outqueue; struct sockaddr_storage udp_src; - struct rpc_pdu *waitpdu; + struct rpc_queue waitpdu[HASHES]; uint32_t inpos; uint32_t insize; @@ -126,6 +135,11 @@ struct rpc_pdu { uint32_t zdr_decode_bufsize; }; +void rpc_reset_queue(struct rpc_queue *q); +void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu); +void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu); +unsigned int rpc_hash_xid(uint32_t xid); + struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize); void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu); int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu); diff --git a/lib/init.c b/lib/init.c index 36d4880..d655203 100644 --- a/lib/init.c +++ b/lib/init.c @@ -49,6 +49,7 @@ struct rpc_context *rpc_init_context(void) { struct rpc_context *rpc; static uint32_t salt = 0; + unsigned int i; rpc = malloc(sizeof(struct rpc_context)); if (rpc == NULL) { @@ -83,6 +84,9 @@ struct rpc_context *rpc_init_context(void) rpc->uid = getuid(); rpc->gid = getgid(); #endif + rpc_reset_queue(&rpc->outqueue); + for (i = 0; i < HASHES; i++) + rpc_reset_queue(&rpc->waitpdu[i]); return rpc; } @@ -155,19 +159,27 @@ char *rpc_get_error(struct rpc_context *rpc) void rpc_error_all_pdus(struct rpc_context *rpc, char *error) { - struct rpc_pdu *pdu; + struct rpc_pdu *pdu, *next; + unsigned int i; assert(rpc->magic == RPC_CONTEXT_MAGIC); - while((pdu = rpc->outqueue) != NULL) { + while ((pdu = rpc->outqueue.head) != NULL) { pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data); - SLIST_REMOVE(&rpc->outqueue, pdu); + rpc->outqueue.head = pdu->next; rpc_free_pdu(rpc, pdu); } - while((pdu = rpc->waitpdu) != NULL) { - pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data); - SLIST_REMOVE(&rpc->waitpdu, pdu); - rpc_free_pdu(rpc, pdu); + rpc->outqueue.tail = NULL; + + for (i = 0; i < HASHES; i++) { + struct rpc_queue *q = &rpc->waitpdu[i]; + + while((pdu = q->head) != NULL) { + pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data); + q->head = pdu->next; + rpc_free_pdu(rpc, pdu); + } + q->tail = NULL; } } @@ -186,7 +198,7 @@ void rpc_free_all_fragments(struct rpc_context *rpc) while (rpc->fragments != NULL) { struct rpc_fragment *fragment = rpc->fragments; - SLIST_REMOVE(&rpc->fragments, fragment); + rpc->fragments = fragment->next; rpc_free_fragment(fragment); } } @@ -217,18 +229,24 @@ int rpc_add_fragment(struct rpc_context *rpc, char *data, uint64_t size) void rpc_destroy_context(struct rpc_context *rpc) { struct rpc_pdu *pdu; + unsigned int i; assert(rpc->magic == RPC_CONTEXT_MAGIC); - while((pdu = rpc->outqueue) != NULL) { + while((pdu = rpc->outqueue.head) != NULL) { pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data); - SLIST_REMOVE(&rpc->outqueue, pdu); + rpc->outqueue.head = pdu->next; rpc_free_pdu(rpc, pdu); } - while((pdu = rpc->waitpdu) != NULL) { - pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data); - SLIST_REMOVE(&rpc->waitpdu, pdu); - rpc_free_pdu(rpc, pdu); + + for (i = 0; i < HASHES; i++) { + struct rpc_queue *q = &rpc->waitpdu[i]; + + while((pdu = q->head) != NULL) { + pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data); + rpc->outqueue.head = pdu->next; + rpc_free_pdu(rpc, pdu); + } } rpc_free_all_fragments(rpc); diff --git a/lib/pdu.c b/lib/pdu.c index 5def0bc..4d487b7 100644 --- a/lib/pdu.c +++ b/lib/pdu.c @@ -49,6 +49,41 @@ #include "libnfs-raw.h" #include "libnfs-private.h" +void rpc_reset_queue(struct rpc_queue *q) +{ + q->head = NULL; + q->tail = NULL; +} + +/* + * Push to the tail end of the queue + */ +void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu) +{ + if (q->head == NULL) + q->head = pdu; + else + q->tail->next = pdu; + q->tail = pdu; + pdu->next = NULL; +} + +/* + * Push to the front/head of the queue + */ +void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu) +{ + pdu->next = q->head; + q->head = pdu; + if (q->tail == NULL) + q->tail = pdu; +} + +unsigned int rpc_hash_xid(uint32_t xid) +{ + return (xid * 7919) % HASHES; +} + struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize) { struct rpc_pdu *pdu; @@ -129,13 +164,17 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) /* for udp we dont queue, we just send it straight away */ if (rpc->is_udp != 0) { + unsigned int hash; + // XXX add a rpc->udp_dest_sock_size and get rid of sys/socket.h and netinet/in.h if (sendto(rpc->fd, rpc->encodebuf, size, MSG_DONTWAIT, rpc->udp_dest, sizeof(struct sockaddr_in)) < 0) { rpc_set_error(rpc, "Sendto failed with errno %s", strerror(errno)); rpc_free_pdu(rpc, pdu); return -1; } - SLIST_ADD_END(&rpc->waitpdu, pdu); + + hash = rpc_hash_xid(pdu->xid); + rpc_enqueue(&rpc->waitpdu[hash], pdu); return 0; } @@ -153,7 +192,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) } memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size); - SLIST_ADD_END(&rpc->outqueue, pdu); + rpc_enqueue(&rpc->outqueue, pdu); return 0; } @@ -237,9 +276,11 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR * int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) { - struct rpc_pdu *pdu; + struct rpc_pdu *pdu, *prev_pdu; + struct rpc_queue *q; ZDR zdr; int pos, recordmarker = 0; + unsigned int hash; uint32_t xid; char *reasbuf = NULL; @@ -302,12 +343,26 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) } zdr_setpos(&zdr, pos); - for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) { + /* Look up the transaction in a hash table of our requests */ + hash = rpc_hash_xid(xid); + q = &rpc->waitpdu[hash]; + + /* Follow the hash chain. Linear traverse singly-linked list, + * but track previous entry for optimised removal */ + prev_pdu = NULL; + for (pdu=q->head; pdu; pdu=pdu->next) { if (pdu->xid != xid) { + prev_pdu = pdu; continue; } if (rpc->is_udp == 0 || rpc->is_broadcast == 0) { - SLIST_REMOVE(&rpc->waitpdu, pdu); + /* Singly-linked but we track head and tail */ + if (pdu == q->head) + q->head = pdu->next; + if (pdu == q->tail) + q->tail = prev_pdu; + if (prev_pdu != NULL) + prev_pdu->next = pdu->next; } if (rpc_process_reply(rpc, pdu, &zdr) != 0) { rpc_set_error(rpc, "rpc_procdess_reply failed"); diff --git a/lib/socket.c b/lib/socket.c index aede89a..39bb6a7 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -130,6 +130,11 @@ int rpc_get_fd(struct rpc_context *rpc) return rpc->fd; } +static int rpc_has_queue(struct rpc_queue *q) +{ + return q->head != NULL; +} + int rpc_which_events(struct rpc_context *rpc) { int events; @@ -143,7 +148,7 @@ int rpc_which_events(struct rpc_context *rpc) return POLLIN; } - if (rpc->outqueue) { + if (rpc_has_queue(&rpc->outqueue)) { events |= POLLOUT; } return events; @@ -152,6 +157,7 @@ int rpc_which_events(struct rpc_context *rpc) static int rpc_write_to_socket(struct rpc_context *rpc) { int32_t count; + struct rpc_pdu *pdu; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -160,12 +166,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc) return -1; } - while (rpc->outqueue != NULL) { + while ((pdu = rpc->outqueue.head) != NULL) { int64_t total; - total = rpc->outqueue->outdata.size; + total = pdu->outdata.size; - count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0); + count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0); if (count == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return 0; @@ -174,12 +180,16 @@ static int rpc_write_to_socket(struct rpc_context *rpc) return -1; } - rpc->outqueue->written += count; - if (rpc->outqueue->written == total) { - struct rpc_pdu *pdu = rpc->outqueue; + pdu->written += count; + if (pdu->written == total) { + unsigned int hash; + + rpc->outqueue.head = pdu->next; + if (pdu->next == NULL) + rpc->outqueue.tail = NULL; - SLIST_REMOVE(&rpc->outqueue, pdu); - SLIST_ADD_END(&rpc->waitpdu, pdu); + hash = rpc_hash_xid(pdu->xid); + rpc_enqueue(&rpc->waitpdu[hash], pdu); } } return 0; @@ -376,7 +386,7 @@ int rpc_service(struct rpc_context *rpc, int revents) } } - if (revents & POLLOUT && rpc->outqueue != NULL) { + if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) { if (rpc_write_to_socket(rpc) != 0) { rpc_set_error(rpc, "write to socket failed"); return -1; @@ -610,6 +620,7 @@ static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, vo static int rpc_reconnect_requeue(struct rpc_context *rpc) { struct rpc_pdu *pdu; + unsigned int i; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -623,11 +634,15 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc) /* socket is closed so we will not get any replies to any commands * in flight. Move them all over from the waitpdu queue back to the out queue */ - for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) { - SLIST_REMOVE(&rpc->waitpdu, pdu); - SLIST_ADD(&rpc->outqueue, pdu); - /* we have to re-send the whole pdu again */ - pdu->written = 0; + for (i = 0; i < HASHES; i++) { + struct rpc_queue *q = &rpc->waitpdu[i]; + + for (pdu=q->head; pdu; pdu=pdu->next) { + rpc_return_to_queue(&rpc->outqueue, pdu); + /* we have to re-send the whole pdu again */ + pdu->written = 0; + } + rpc_reset_queue(q); } if (rpc->auto_reconnect != 0) { @@ -737,14 +752,19 @@ int rpc_queue_length(struct rpc_context *rpc) { int i=0; struct rpc_pdu *pdu; + unsigned int n; assert(rpc->magic == RPC_CONTEXT_MAGIC); - for(pdu = rpc->outqueue; pdu; pdu = pdu->next) { + for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) { i++; } - for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) { - i++; + + for (n = 0; n < HASHES; n++) { + struct rpc_queue *q = &rpc->waitpdu[n]; + + for(pdu = q->head; pdu; pdu = pdu->next) + i++; } return i; }