Optimisations to the pdu queues
[deb_libnfs.git] / lib / pdu.c
index 7dc1ee317c6dbefcdaa5845f78efac0561be020a..2f4761d20a2b87825d52c1c1f2df786ceb14c464 100644 (file)
--- a/lib/pdu.c
+++ b/lib/pdu.c
    You should have received a copy of the GNU Lesser General Public License
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#ifdef AROS
+#include "aros_compat.h"
+#endif
+
 #ifdef WIN32
 #include "win32_compat.h"
-#ifndef MSG_DONTWAIT
-#define MSG_DONTWAIT 0
 #endif
-#else
+
+#ifdef HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#endif
+
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_STRINGS_H
 #include <strings.h>
-#endif/*WIN32*/
+#endif
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
 #include <assert.h>
 #include <errno.h>
 #include "slist.h"
 #include "libnfs-raw.h"
 #include "libnfs-private.h"
 
+void rpc_reset_queue(struct rpc_queue *q)
+{
+       q->head = NULL;
+       q->tail = NULL;
+}
+
+/*
+ * Push to the tail end of the queue
+ */
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+       if (q->head == NULL)
+               q->head = pdu;
+       else
+               q->tail->next = pdu;
+       q->tail = pdu;
+}
+
+/*
+ * Push to the front/head of the queue
+ */
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+       pdu->next = q->head;
+       q->head = pdu;
+       if (q->tail == NULL)
+               q->tail = pdu;
+}
+
 struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
 {
        struct rpc_pdu *pdu;
@@ -61,17 +103,18 @@ struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int versi
        }
 
        memset(&msg, 0, sizeof(struct rpc_msg));
-       msg.rm_xid = pdu->xid;
-        msg.rm_direction = CALL;
-       msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
-       msg.rm_call.cb_prog = program;
-       msg.rm_call.cb_vers = version;
-       msg.rm_call.cb_proc = procedure;
-       msg.rm_call.cb_cred = rpc->auth->ah_cred;
-       msg.rm_call.cb_verf = rpc->auth->ah_verf;
-
-       if (zdr_callmsg(&pdu->zdr, &msg) == 0) {
-               rpc_set_error(rpc, "zdr_callmsg failed");
+       msg.xid                = pdu->xid;
+        msg.direction          = CALL;
+       msg.body.cbody.rpcvers = RPC_MSG_VERSION;
+       msg.body.cbody.prog    = program;
+       msg.body.cbody.vers    = version;
+       msg.body.cbody.proc    = procedure;
+       msg.body.cbody.cred    = rpc->auth->ah_cred;
+       msg.body.cbody.verf    = rpc->auth->ah_verf;
+
+       if (zdr_callmsg(rpc, &pdu->zdr, &msg) == 0) {
+               rpc_set_error(rpc, "zdr_callmsg failed with %s",
+                             rpc_get_error(rpc));
                zdr_destroy(&pdu->zdr);
                free(pdu);
                return NULL;
@@ -100,6 +143,10 @@ void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
        free(pdu);
 }
 
+void rpc_set_next_xid(struct rpc_context *rpc, uint32_t xid)
+{
+       rpc->xid = xid;
+}
 
 int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
 {
@@ -117,7 +164,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
                        rpc_free_pdu(rpc, pdu);
                        return -1;
                }
-               SLIST_ADD_END(&rpc->waitpdu, pdu);
+               rpc_enqueue(&rpc->waitpdu, pdu);
                return 0;
        }
 
@@ -135,7 +182,7 @@ int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu)
        }
 
        memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size);
-       SLIST_ADD_END(&rpc->outqueue, pdu);
+       rpc_enqueue(&rpc->outqueue, pdu);
 
        return 0;
 }
@@ -156,36 +203,41 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR *
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
        memset(&msg, 0, sizeof(struct rpc_msg));
-       msg.acpted_rply.ar_verf = _null_auth;
+       msg.body.rbody.reply.areply.verf = _null_auth;
        if (pdu->zdr_decode_bufsize > 0) {
                if (pdu->zdr_decode_buf != NULL) {
                        free(pdu->zdr_decode_buf);
                }
                pdu->zdr_decode_buf = malloc(pdu->zdr_decode_bufsize);
                if (pdu->zdr_decode_buf == NULL) {
-                       rpc_set_error(rpc, "zdr_replymsg failed in portmap_getport_reply");
-                       pdu->cb(rpc, RPC_STATUS_ERROR, "Failed to allocate buffer for decoding of ZDR reply", pdu->private_data);
+                       rpc_set_error(rpc, "Failed to allocate memory for "
+                                     "zdr_encode_buf in rpc_process_reply");
+                       pdu->cb(rpc, RPC_STATUS_ERROR, "Failed to allocate "
+                               "buffer for decoding of ZDR reply",
+                               pdu->private_data);
                        return 0;
                }
                memset(pdu->zdr_decode_buf, 0, pdu->zdr_decode_bufsize);
        }
-       msg.acpted_rply.ar_results.where = pdu->zdr_decode_buf;
-       msg.acpted_rply.ar_results.proc  = pdu->zdr_decode_fn;
-
-       if (zdr_replymsg(zdr, &msg) == 0) {
-               rpc_set_error(rpc, "zdr_replymsg failed in portmap_getport_reply");
-               pdu->cb(rpc, RPC_STATUS_ERROR, "Message rejected by server", pdu->private_data);
+       msg.body.rbody.reply.areply.reply_data.results.where = pdu->zdr_decode_buf;
+       msg.body.rbody.reply.areply.reply_data.results.proc  = pdu->zdr_decode_fn;
+
+       if (zdr_replymsg(rpc, zdr, &msg) == 0) {
+               rpc_set_error(rpc, "zdr_replymsg failed in rpc_process_reply: "
+                             "%s", rpc_get_error(rpc));
+               pdu->cb(rpc, RPC_STATUS_ERROR, "Message rejected by server",
+                       pdu->private_data);
                if (pdu->zdr_decode_buf != NULL) {
                        free(pdu->zdr_decode_buf);
                        pdu->zdr_decode_buf = NULL;
                }
                return 0;
        }
-       if (msg.rm_reply.rp_stat != MSG_ACCEPTED) {
+       if (msg.body.rbody.stat != MSG_ACCEPTED) {
                pdu->cb(rpc, RPC_STATUS_ERROR, "RPC Packet not accepted by the server", pdu->private_data);
                return 0;
        }
-       switch (msg.rm_reply.rp_acpt.ar_stat) {
+       switch (msg.body.rbody.reply.areply.stat) {
        case SUCCESS:
                pdu->cb(rpc, RPC_STATUS_SUCCESS, pdu->zdr_decode_buf, pdu->private_data);
                break;
@@ -214,10 +266,10 @@ static int rpc_process_reply(struct rpc_context *rpc, struct rpc_pdu *pdu, ZDR *
 
 int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
 {
-       struct rpc_pdu *pdu;
+       struct rpc_pdu *pdu, *prev_pdu;
        ZDR zdr;
        int pos, recordmarker = 0;
-       unsigned int xid;
+       uint32_t xid;
        char *reasbuf = NULL;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
@@ -279,12 +331,22 @@ int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
        }
        zdr_setpos(&zdr, pos);
 
-       for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
+       /* Linear traverse singly-linked list, but track previous
+        * entry for optimised removal */
+       prev_pdu = NULL;
+       for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
                if (pdu->xid != xid) {
+                       prev_pdu = pdu;
                        continue;
                }
                if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
-                       SLIST_REMOVE(&rpc->waitpdu, pdu);
+                       /* Singly-linked but we track head and tail */
+                       if (pdu == rpc->waitpdu.head)
+                               rpc->waitpdu.head = pdu->next;
+                       if (pdu == rpc->waitpdu.tail)
+                               rpc->waitpdu.tail = prev_pdu;
+                       if (prev_pdu != NULL)
+                               prev_pdu->next = pdu->next;
                }
                if (rpc_process_reply(rpc, pdu, &zdr) != 0) {
                        rpc_set_error(rpc, "rpc_procdess_reply failed");