Optimisations to the pdu queues
[deb_libnfs.git] / lib / socket.c
index d025e070e017091e1f796178c139e8ff44c7cf48..c518603c5ef7896378bbd72e67ace1dda42dcfa4 100644 (file)
@@ -122,6 +122,11 @@ int rpc_get_fd(struct rpc_context *rpc)
        return rpc->fd;
 }
 
+static int rpc_has_queue(struct rpc_queue *q)
+{
+       return q->head != NULL;
+}
+
 int rpc_which_events(struct rpc_context *rpc)
 {
        int events;
@@ -135,7 +140,7 @@ int rpc_which_events(struct rpc_context *rpc)
                return POLLIN;
        }
 
-       if (rpc->outqueue) {
+       if (rpc_has_queue(&rpc->outqueue)) {
                events |= POLLOUT;
        }
        return events;
@@ -144,6 +149,7 @@ int rpc_which_events(struct rpc_context *rpc)
 static int rpc_write_to_socket(struct rpc_context *rpc)
 {
        int32_t count;
+       struct rpc_pdu *pdu;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -152,12 +158,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc)
                return -1;
        }
 
-       while (rpc->outqueue != NULL) {
+       while ((pdu = rpc->outqueue.head) != NULL) {
                int64_t total;
 
-               total = rpc->outqueue->outdata.size;
+               total = pdu->outdata.size;
 
-               count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+               count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
                if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                                return 0;
@@ -166,12 +172,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc)
                        return -1;
                }
 
-               rpc->outqueue->written += count;
-               if (rpc->outqueue->written == total) {
-                       struct rpc_pdu *pdu = rpc->outqueue;
-
-                       SLIST_REMOVE(&rpc->outqueue, pdu);
-                       SLIST_ADD_END(&rpc->waitpdu, pdu);
+               pdu->written += count;
+               if (pdu->written == total) {
+                       rpc->outqueue.head = pdu->next;
+                       if (pdu->next == NULL)
+                               rpc->outqueue.tail = NULL;
+                       rpc_enqueue(&rpc->waitpdu, pdu);
                }
        }
        return 0;
@@ -368,7 +374,7 @@ int rpc_service(struct rpc_context *rpc, int revents)
                }
        }
 
-       if (revents & POLLOUT && rpc->outqueue != NULL) {
+       if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
                if (rpc_write_to_socket(rpc) != 0) {
                        rpc_set_error(rpc, "write to socket failed");
                        return -1;
@@ -614,12 +620,12 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc)
        /* socket is closed so we will not get any replies to any commands
         * in flight. Move them all over from the waitpdu queue back to the out queue
         */
-       for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
-               SLIST_REMOVE(&rpc->waitpdu, pdu);
-               SLIST_ADD(&rpc->outqueue, pdu);
+       for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
+               rpc_return_to_queue(&rpc->outqueue, pdu);
                /* we have to re-send the whole pdu again */
                pdu->written = 0;
        }
+       rpc_reset_queue(&rpc->waitpdu);
 
        if (rpc->auto_reconnect != 0) {
                rpc->connect_cb  = reconnect_cb;
@@ -731,10 +737,10 @@ int rpc_queue_length(struct rpc_context *rpc)
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+       for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
                i++;
        }
-       for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
+       for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {
                i++;
        }
        return i;