Merge branch 'xid-hash'
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 11 Apr 2014 02:59:28 +0000 (19:59 -0700)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 11 Apr 2014 02:59:28 +0000 (19:59 -0700)
1  2 
lib/socket.c

diff --combined lib/socket.c
index aede89adf65c56a7050301522b4ca438ed6ba65b,6bf3cfff23d2d65dfe6120201c33b211de114c9e..39bb6a7e97323ca67a372c070ccd5cdecc6d69f0
@@@ -95,14 -95,6 +95,14 @@@ static void set_nonblocking(int fd
  #endif //FIXME
  }
  
 +static void set_nolinger(int fd)
 +{
 +      struct linger lng;
 +      lng.l_onoff = 1;
 +      lng.l_linger = 0;
 +      setsockopt(fd, SOL_SOCKET, SO_LINGER, &lng, sizeof(lng));
 +}
 +
  #ifdef HAVE_NETINET_TCP_H
  int set_tcp_sockopt(int sockfd, int optname, int value)
  {
@@@ -130,6 -122,11 +130,11 @@@ int rpc_get_fd(struct rpc_context *rpc
        return rpc->fd;
  }
  
+ static int rpc_has_queue(struct rpc_queue *q)
+ {
+       return q->head != NULL;
+ }
  int rpc_which_events(struct rpc_context *rpc)
  {
        int events;
                return POLLIN;
        }
  
-       if (rpc->outqueue) {
+       if (rpc_has_queue(&rpc->outqueue)) {
                events |= POLLOUT;
        }
        return events;
  static int rpc_write_to_socket(struct rpc_context *rpc)
  {
        int32_t count;
+       struct rpc_pdu *pdu;
  
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
  
                return -1;
        }
  
-       while (rpc->outqueue != NULL) {
+       while ((pdu = rpc->outqueue.head) != NULL) {
                int64_t total;
  
-               total = rpc->outqueue->outdata.size;
+               total = pdu->outdata.size;
  
-               count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+               count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
                if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                                return 0;
                        return -1;
                }
  
-               rpc->outqueue->written += count;
-               if (rpc->outqueue->written == total) {
-                       struct rpc_pdu *pdu = rpc->outqueue;
+               pdu->written += count;
+               if (pdu->written == total) {
+                       unsigned int hash;
+                       rpc->outqueue.head = pdu->next;
+                       if (pdu->next == NULL)
+                               rpc->outqueue.tail = NULL;
  
-                       SLIST_REMOVE(&rpc->outqueue, pdu);
-                       SLIST_ADD_END(&rpc->waitpdu, pdu);
+                       hash = rpc_hash_xid(pdu->xid);
+                       rpc_enqueue(&rpc->waitpdu[hash], pdu);
                }
        }
        return 0;
@@@ -376,7 -378,7 +386,7 @@@ int rpc_service(struct rpc_context *rpc
                }
        }
  
-       if (revents & POLLOUT && rpc->outqueue != NULL) {
+       if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
                if (rpc_write_to_socket(rpc) != 0) {
                        rpc_set_error(rpc, "write to socket failed");
                        return -1;
@@@ -512,7 -514,6 +522,7 @@@ static int rpc_connect_sockaddr_async(s
        }
  
        set_nonblocking(rpc->fd);
 +      set_nolinger(rpc->fd);
  
        if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
                rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
@@@ -610,6 -611,7 +620,7 @@@ static void reconnect_cb(struct rpc_con
  static int rpc_reconnect_requeue(struct rpc_context *rpc)
  {
        struct rpc_pdu *pdu;
+       unsigned int i;
  
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
  
        /* socket is closed so we will not get any replies to any commands
         * in flight. Move them all over from the waitpdu queue back to the out queue
         */
-       for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
-               SLIST_REMOVE(&rpc->waitpdu, pdu);
-               SLIST_ADD(&rpc->outqueue, pdu);
-               /* we have to re-send the whole pdu again */
-               pdu->written = 0;
+       for (i = 0; i < HASHES; i++) {
+               struct rpc_queue *q = &rpc->waitpdu[i];
+               for (pdu=q->head; pdu; pdu=pdu->next) {
+                       rpc_return_to_queue(&rpc->outqueue, pdu);
+                       /* we have to re-send the whole pdu again */
+                       pdu->written = 0;
+               }
+               rpc_reset_queue(q);
        }
  
        if (rpc->auto_reconnect != 0) {
@@@ -737,14 -743,19 +752,19 @@@ int rpc_queue_length(struct rpc_contex
  {
        int i=0;
        struct rpc_pdu *pdu;
+       unsigned int n;
  
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
  
-       for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+       for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
                i++;
        }
-       for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
-               i++;
+       for (n = 0; n < HASHES; n++) {
+               struct rpc_queue *q = &rpc->waitpdu[n];
+               for(pdu = q->head; pdu; pdu = pdu->next)
+                       i++;
        }
        return i;
  }