X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=lib%2Fpdu.c;h=4d487b71b0c692408dd606881294eaa229dde9b0;hb=8e003243fbec4cff4af3e9ca01ea713065336970;hp=3d8fecaaaefc738467f34253db8fb2dbfb47f5bb;hpb=67a9f57e67c3b2c147969c752532fd55eb99259c;p=deb_libnfs.git diff --git a/lib/pdu.c b/lib/pdu.c index 3d8feca..4d487b7 100644 --- a/lib/pdu.c +++ b/lib/pdu.c @@ -14,17 +14,29 @@ You should have received a copy of the GNU Lesser General Public License along with this program; if not, see . */ +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + #ifdef AROS #include "aros_compat.h" #endif #ifdef WIN32 #include "win32_compat.h" -#else -#include -#include +#endif + +#ifdef HAVE_NETINET_IN_H #include -#endif/*WIN32*/ +#endif + +#ifdef HAVE_SYS_SOCKET_H +#include +#endif + +#ifdef HAVE_STRINGS_H +#include +#endif #include #include @@ -37,6 +49,41 @@ #include "libnfs-raw.h" #include "libnfs-private.h" +void rpc_reset_queue(struct rpc_queue *q) +{ + q->head = NULL; + q->tail = NULL; +} + +/* + * Push to the tail end of the queue + */ +void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu) +{ + if (q->head == NULL) + q->head = pdu; + else + q->tail->next = pdu; + q->tail = pdu; + pdu->next = NULL; +} + +/* + * Push to the front/head of the queue + */ +void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu) +{ + pdu->next = q->head; + q->head = pdu; + if (q->tail == NULL) + q->tail = pdu; +} + +unsigned int rpc_hash_xid(uint32_t xid) +{ + return (xid * 7919) % HASHES; +} + struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize) { struct rpc_pdu *pdu; @@ -71,8 +118,9 @@ struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int versi msg.body.cbody.cred = rpc->auth->ah_cred; msg.body.cbody.verf = rpc->auth->ah_verf; - if (zdr_callmsg(&pdu->zdr, &msg) == 0) { - rpc_set_error(rpc, "zdr_callmsg failed"); + if (zdr_callmsg(rpc, &pdu->zdr, &msg) == 0) { + rpc_set_error(rpc, "zdr_callmsg failed with %s", + rpc_get_error(rpc)); zdr_destroy(&pdu->zdr); free(pdu); return NULL; @@ -101,6 +149,10 @@ void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) free(pdu); } +void rpc_set_next_xid(struct rpc_context *rpc, uint32_t xid) +{ + rpc->xid = xid; +} int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) { @@ -112,13 +164,17 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) /* for udp we dont queue, we just send it straight away */ if (rpc->is_udp != 0) { + unsigned int hash; + // XXX add a rpc->udp_dest_sock_size and get rid of sys/socket.h and netinet/in.h if (sendto(rpc->fd, rpc->encodebuf, size, MSG_DONTWAIT, rpc->udp_dest, sizeof(struct sockaddr_in)) < 0) { rpc_set_error(rpc, "Sendto failed with errno %s", strerror(errno)); rpc_free_pdu(rpc, pdu); return -1; } - SLIST_ADD_END(&rpc->waitpdu, pdu); + + hash = rpc_hash_xid(pdu->xid); + rpc_enqueue(&rpc->waitpdu[hash], pdu); return 0; } @@ -136,7 +192,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu) } memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size); - SLIST_ADD_END(&rpc->outqueue, pdu); + rpc_enqueue(&rpc->outqueue, pdu); return 0; } @@ -164,8 +220,11 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR * } pdu->zdr_decode_buf = malloc(pdu->zdr_decode_bufsize); if (pdu->zdr_decode_buf == NULL) { - rpc_set_error(rpc, "zdr_replymsg failed in portmap_getport_reply"); - pdu->cb(rpc, RPC_STATUS_ERROR, "Failed to allocate buffer for decoding of ZDR reply", pdu->private_data); + rpc_set_error(rpc, "Failed to allocate memory for " + "zdr_encode_buf in rpc_process_reply"); + pdu->cb(rpc, RPC_STATUS_ERROR, "Failed to allocate " + "buffer for decoding of ZDR reply", + pdu->private_data); return 0; } memset(pdu->zdr_decode_buf, 0, pdu->zdr_decode_bufsize); @@ -173,9 +232,11 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR * msg.body.rbody.reply.areply.reply_data.results.where = pdu->zdr_decode_buf; msg.body.rbody.reply.areply.reply_data.results.proc = pdu->zdr_decode_fn; - if (zdr_replymsg(zdr, &msg) == 0) { - rpc_set_error(rpc, "zdr_replymsg failed in portmap_getport_reply"); - pdu->cb(rpc, RPC_STATUS_ERROR, "Message rejected by server", pdu->private_data); + if (zdr_replymsg(rpc, zdr, &msg) == 0) { + rpc_set_error(rpc, "zdr_replymsg failed in rpc_process_reply: " + "%s", rpc_get_error(rpc)); + pdu->cb(rpc, RPC_STATUS_ERROR, "Message rejected by server", + pdu->private_data); if (pdu->zdr_decode_buf != NULL) { free(pdu->zdr_decode_buf); pdu->zdr_decode_buf = NULL; @@ -215,10 +276,12 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR * int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) { - struct rpc_pdu *pdu; + struct rpc_pdu *pdu, *prev_pdu; + struct rpc_queue *q; ZDR zdr; int pos, recordmarker = 0; - unsigned int xid; + unsigned int hash; + uint32_t xid; char *reasbuf = NULL; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -280,12 +343,26 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size) } zdr_setpos(&zdr, pos); - for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) { + /* Look up the transaction in a hash table of our requests */ + hash = rpc_hash_xid(xid); + q = &rpc->waitpdu[hash]; + + /* Follow the hash chain. Linear traverse singly-linked list, + * but track previous entry for optimised removal */ + prev_pdu = NULL; + for (pdu=q->head; pdu; pdu=pdu->next) { if (pdu->xid != xid) { + prev_pdu = pdu; continue; } if (rpc->is_udp == 0 || rpc->is_broadcast == 0) { - SLIST_REMOVE(&rpc->waitpdu, pdu); + /* Singly-linked but we track head and tail */ + if (pdu == q->head) + q->head = pdu->next; + if (pdu == q->tail) + q->tail = prev_pdu; + if (prev_pdu != NULL) + prev_pdu->next = pdu->next; } if (rpc_process_reply(rpc, pdu, &zdr) != 0) { rpc_set_error(rpc, "rpc_procdess_reply failed");