X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=lib%2Fpdu.c;h=4d487b71b0c692408dd606881294eaa229dde9b0;hb=6a88df2924732760074db7d3c05713ea4fa7e217;hp=66142607da2af4598b98ed7bffa8fd689456ff49;hpb=aab6538bbd55177221788df5bdb4b61698e00305;p=deb_libnfs.git
diff --git a/lib/pdu.c b/lib/pdu.c
index 6614260..4d487b7 100644
--- a/lib/pdu.c
+++ b/lib/pdu.c
@@ -14,20 +14,33 @@
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, see .
*/
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#ifdef AROS
+#include "aros_compat.h"
+#endif
+
#ifdef WIN32
#include "win32_compat.h"
-#ifndef MSG_DONTWAIT
-#define MSG_DONTWAIT 0
#endif
-#else
+
+#ifdef HAVE_NETINET_IN_H
+#include
+#endif
+
+#ifdef HAVE_SYS_SOCKET_H
+#include
+#endif
+
+#ifdef HAVE_STRINGS_H
#include
-#endif/*WIN32*/
+#endif
#include
#include
#include
-#include
-#include
#include
#include
#include "slist.h"
@@ -36,6 +49,41 @@
#include "libnfs-raw.h"
#include "libnfs-private.h"
+void rpc_reset_queue(struct rpc_queue *q)
+{
+ q->head = NULL;
+ q->tail = NULL;
+}
+
+/*
+ * Push to the tail end of the queue
+ */
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+ if (q->head == NULL)
+ q->head = pdu;
+ else
+ q->tail->next = pdu;
+ q->tail = pdu;
+ pdu->next = NULL;
+}
+
+/*
+ * Push to the front/head of the queue
+ */
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+ pdu->next = q->head;
+ q->head = pdu;
+ if (q->tail == NULL)
+ q->tail = pdu;
+}
+
+unsigned int rpc_hash_xid(uint32_t xid)
+{
+ return (xid * 7919) % HASHES;
+}
+
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
{
struct rpc_pdu *pdu;
@@ -70,8 +118,9 @@ struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int versi
msg.body.cbody.cred = rpc->auth->ah_cred;
msg.body.cbody.verf = rpc->auth->ah_verf;
- if (zdr_callmsg(&pdu->zdr, &msg) == 0) {
- rpc_set_error(rpc, "zdr_callmsg failed");
+ if (zdr_callmsg(rpc, &pdu->zdr, &msg) == 0) {
+ rpc_set_error(rpc, "zdr_callmsg failed with %s",
+ rpc_get_error(rpc));
zdr_destroy(&pdu->zdr);
free(pdu);
return NULL;
@@ -100,6 +149,10 @@ void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
free(pdu);
}
+void rpc_set_next_xid(struct rpc_context *rpc, uint32_t xid)
+{
+ rpc->xid = xid;
+}
int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
{
@@ -111,13 +164,17 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
/* for udp we dont queue, we just send it straight away */
if (rpc->is_udp != 0) {
+ unsigned int hash;
+
// XXX add a rpc->udp_dest_sock_size and get rid of sys/socket.h and netinet/in.h
if (sendto(rpc->fd, rpc->encodebuf, size, MSG_DONTWAIT, rpc->udp_dest, sizeof(struct sockaddr_in)) < 0) {
rpc_set_error(rpc, "Sendto failed with errno %s", strerror(errno));
rpc_free_pdu(rpc, pdu);
return -1;
}
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+
+ hash = rpc_hash_xid(pdu->xid);
+ rpc_enqueue(&rpc->waitpdu[hash], pdu);
return 0;
}
@@ -135,7 +192,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
}
memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size);
- SLIST_ADD_END(&rpc->outqueue, pdu);
+ rpc_enqueue(&rpc->outqueue, pdu);
return 0;
}
@@ -163,8 +220,11 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR *
}
pdu->zdr_decode_buf = malloc(pdu->zdr_decode_bufsize);
if (pdu->zdr_decode_buf == NULL) {
- rpc_set_error(rpc, "zdr_replymsg failed in portmap_getport_reply");
- pdu->cb(rpc, RPC_STATUS_ERROR, "Failed to allocate buffer for decoding of ZDR reply", pdu->private_data);
+ rpc_set_error(rpc, "Failed to allocate memory for "
+ "zdr_encode_buf in rpc_process_reply");
+ pdu->cb(rpc, RPC_STATUS_ERROR, "Failed to allocate "
+ "buffer for decoding of ZDR reply",
+ pdu->private_data);
return 0;
}
memset(pdu->zdr_decode_buf, 0, pdu->zdr_decode_bufsize);
@@ -172,9 +232,11 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR *
msg.body.rbody.reply.areply.reply_data.results.where = pdu->zdr_decode_buf;
msg.body.rbody.reply.areply.reply_data.results.proc = pdu->zdr_decode_fn;
- if (zdr_replymsg(zdr, &msg) == 0) {
- rpc_set_error(rpc, "zdr_replymsg failed in portmap_getport_reply");
- pdu->cb(rpc, RPC_STATUS_ERROR, "Message rejected by server", pdu->private_data);
+ if (zdr_replymsg(rpc, zdr, &msg) == 0) {
+ rpc_set_error(rpc, "zdr_replymsg failed in rpc_process_reply: "
+ "%s", rpc_get_error(rpc));
+ pdu->cb(rpc, RPC_STATUS_ERROR, "Message rejected by server",
+ pdu->private_data);
if (pdu->zdr_decode_buf != NULL) {
free(pdu->zdr_decode_buf);
pdu->zdr_decode_buf = NULL;
@@ -214,10 +276,12 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR *
int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
{
- struct rpc_pdu *pdu;
+ struct rpc_pdu *pdu, *prev_pdu;
+ struct rpc_queue *q;
ZDR zdr;
int pos, recordmarker = 0;
- unsigned int xid;
+ unsigned int hash;
+ uint32_t xid;
char *reasbuf = NULL;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
@@ -279,12 +343,26 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
}
zdr_setpos(&zdr, pos);
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
+ /* Look up the transaction in a hash table of our requests */
+ hash = rpc_hash_xid(xid);
+ q = &rpc->waitpdu[hash];
+
+ /* Follow the hash chain. Linear traverse singly-linked list,
+ * but track previous entry for optimised removal */
+ prev_pdu = NULL;
+ for (pdu=q->head; pdu; pdu=pdu->next) {
if (pdu->xid != xid) {
+ prev_pdu = pdu;
continue;
}
if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
+ /* Singly-linked but we track head and tail */
+ if (pdu == q->head)
+ q->head = pdu->next;
+ if (pdu == q->tail)
+ q->tail = prev_pdu;
+ if (prev_pdu != NULL)
+ prev_pdu->next = pdu->next;
}
if (rpc_process_reply(rpc, pdu, &zdr) != 0) {
rpc_set_error(rpc, "rpc_procdess_reply failed");