Imported Upstream version 1.9.4
[deb_libnfs.git] / lib / socket.c
index 055109a73c9f7b4be1f69d73f0552feb5c2d51ff..f973f0178b923a0dab3f369fea14a04b076e85ab 100644 (file)
@@ -80,7 +80,6 @@
 #include "win32_errnowrapper.h"
 #endif
 
-
 static int rpc_reconnect_requeue(struct rpc_context *rpc);
 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
 
@@ -96,6 +95,14 @@ static void set_nonblocking(int fd)
 #endif //FIXME
 }
 
+static void set_nolinger(int fd)
+{
+       struct linger lng;
+       lng.l_onoff = 1;
+       lng.l_linger = 0;
+       setsockopt(fd, SOL_SOCKET, SO_LINGER, &lng, sizeof(lng));
+}
+
 #ifdef HAVE_NETINET_TCP_H
 int set_tcp_sockopt(int sockfd, int optname, int value)
 {
@@ -123,6 +130,11 @@ int rpc_get_fd(struct rpc_context *rpc)
        return rpc->fd;
 }
 
+static int rpc_has_queue(struct rpc_queue *q)
+{
+       return q->head != NULL;
+}
+
 int rpc_which_events(struct rpc_context *rpc)
 {
        int events;
@@ -136,7 +148,7 @@ int rpc_which_events(struct rpc_context *rpc)
                return POLLIN;
        }
 
-       if (rpc->outqueue) {
+       if (rpc_has_queue(&rpc->outqueue)) {
                events |= POLLOUT;
        }
        return events;
@@ -145,6 +157,7 @@ int rpc_which_events(struct rpc_context *rpc)
 static int rpc_write_to_socket(struct rpc_context *rpc)
 {
        int32_t count;
+       struct rpc_pdu *pdu;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -153,12 +166,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc)
                return -1;
        }
 
-       while (rpc->outqueue != NULL) {
+       while ((pdu = rpc->outqueue.head) != NULL) {
                int64_t total;
 
-               total = rpc->outqueue->outdata.size;
+               total = pdu->outdata.size;
 
-               count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+               count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
                if (count == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                                return 0;
@@ -167,12 +180,16 @@ static int rpc_write_to_socket(struct rpc_context *rpc)
                        return -1;
                }
 
-               rpc->outqueue->written += count;
-               if (rpc->outqueue->written == total) {
-                       struct rpc_pdu *pdu = rpc->outqueue;
+               pdu->written += count;
+               if (pdu->written == total) {
+                       unsigned int hash;
 
-                       SLIST_REMOVE(&rpc->outqueue, pdu);
-                       SLIST_ADD_END(&rpc->waitpdu, pdu);
+                       rpc->outqueue.head = pdu->next;
+                       if (pdu->next == NULL)
+                               rpc->outqueue.tail = NULL;
+
+                       hash = rpc_hash_xid(pdu->xid);
+                       rpc_enqueue(&rpc->waitpdu[hash], pdu);
                }
        }
        return 0;
@@ -369,7 +386,7 @@ int rpc_service(struct rpc_context *rpc, int revents)
                }
        }
 
-       if (revents & POLLOUT && rpc->outqueue != NULL) {
+       if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
                if (rpc_write_to_socket(rpc) != 0) {
                        rpc_set_error(rpc, "write to socket failed");
                        return -1;
@@ -418,6 +435,15 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s
                if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
                        set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
                }
+#endif
+               break;
+       case AF_INET6:
+               socksize = sizeof(struct sockaddr_in6);
+               rpc->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
+#ifdef HAVE_NETINET_TCP_H
+               if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
+                       set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
+               }
 #endif
                break;
        default:
@@ -449,7 +475,7 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s
         * binding will usually succeed.
         */
        {
-               struct sockaddr_in sin;
+               struct sockaddr_storage ss;
                static int portOfs = 0;
                const int firstPort = 512;      /* >= 512 according to Sun docs */
                const int portCount = IPPORT_RESERVED - firstPort;
@@ -466,12 +492,26 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s
 
                        /* skip well-known ports */
                        if (!getservbyport(port, "tcp")) {
-                               memset(&sin, 0, sizeof(sin));
-                               sin.sin_port        = port;
-                               sin.sin_family      = AF_INET;
-                               sin.sin_addr.s_addr = 0;
+                               memset(&ss, 0, sizeof(ss));
 
-                               rc = bind(rpc->fd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in));
+                               switch (s->ss_family) {
+                               case AF_INET:
+                                       ((struct sockaddr_in *)&ss)->sin_port = port;
+                                       ((struct sockaddr_in *)&ss)->sin_family      = AF_INET;
+#ifdef HAVE_SOCKADDR_LEN
+                                       ((struct sockaddr_in *)&ss)->sin_len = sizeof(struct sockaddr_in);
+#endif
+                                       break;
+                               case AF_INET6:
+                                       ((struct sockaddr_in6 *)&ss)->sin6_port = port;
+                                       ((struct sockaddr_in6 *)&ss)->sin6_family      = AF_INET6;
+#ifdef HAVE_SOCKADDR_LEN
+                                       ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr6_in);
+#endif
+                                       break;
+                               }
+
+                               rc = bind(rpc->fd, (struct sockaddr *)&ss, socksize);
 #if !defined(WIN32)
                                /* we got EACCES, so don't try again */
                                if (rc != 0 && errno == EACCES)
@@ -482,6 +522,7 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s
        }
 
        set_nonblocking(rpc->fd);
+       set_nolinger(rpc->fd);
 
        if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
                rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
@@ -493,7 +534,7 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s
 
 int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
 {
-       struct sockaddr_in *sin = (struct sockaddr_in *)&rpc->s;
+       struct addrinfo *ai = NULL;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -509,18 +550,27 @@ int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc
 
        rpc->auto_reconnect = 0;
 
-       sin->sin_family = AF_INET;
-       sin->sin_port   = htons(port);
-       if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
-               rpc_set_error(rpc, "Not a valid server ip address");
+       if (getaddrinfo(server, NULL, NULL, &ai) != 0) {
+               rpc_set_error(rpc, "Invalid address:%s. "
+                             "Can not resolv into IPv4/v6 structure.", server);
                return -1;
-       }
-
+       }
 
-       switch (rpc->s.ss_family) {
+       switch (ai->ai_family) {
        case AF_INET:
+               ((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family;
+               ((struct sockaddr_in *)&rpc->s)->sin_port   = htons(port);
+               ((struct sockaddr_in *)&rpc->s)->sin_addr   = ((struct sockaddr_in *)(ai->ai_addr))->sin_addr;
+#ifdef HAVE_SOCKADDR_LEN
+               ((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in);
+#endif
+               break;
+       case AF_INET6:
+               ((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family;
+               ((struct sockaddr_in6 *)&rpc->s)->sin6_port   = htons(port);
+               ((struct sockaddr_in6 *)&rpc->s)->sin6_addr   = ((struct sockaddr_in6 *)(ai->ai_addr))->sin6_addr;
 #ifdef HAVE_SOCKADDR_LEN
-               sin->sin_len = sizeof(struct sockaddr_in);
+               ((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6);
 #endif
                break;
        }
@@ -528,6 +578,8 @@ int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc
        rpc->connect_cb  = cb;
        rpc->connect_data = private_data;
 
+       freeaddrinfo(ai);
+
        if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
                return -1;
        }
@@ -570,6 +622,7 @@ static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, vo
 static int rpc_reconnect_requeue(struct rpc_context *rpc)
 {
        struct rpc_pdu *pdu;
+       unsigned int i;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
@@ -583,11 +636,15 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc)
        /* socket is closed so we will not get any replies to any commands
         * in flight. Move them all over from the waitpdu queue back to the out queue
         */
-       for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
-               SLIST_REMOVE(&rpc->waitpdu, pdu);
-               SLIST_ADD(&rpc->outqueue, pdu);
-               /* we have to re-send the whole pdu again */
-               pdu->written = 0;
+       for (i = 0; i < HASHES; i++) {
+               struct rpc_queue *q = &rpc->waitpdu[i];
+
+               for (pdu=q->head; pdu; pdu=pdu->next) {
+                       rpc_return_to_queue(&rpc->outqueue, pdu);
+                       /* we have to re-send the whole pdu again */
+                       pdu->written = 0;
+               }
+               rpc_reset_queue(q);
        }
 
        if (rpc->auto_reconnect != 0) {
@@ -697,14 +754,19 @@ int rpc_queue_length(struct rpc_context *rpc)
 {
        int i=0;
        struct rpc_pdu *pdu;
+       unsigned int n;
 
        assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+       for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
                i++;
        }
-       for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
-               i++;
+
+       for (n = 0; n < HASHES; n++) {
+               struct rpc_queue *q = &rpc->waitpdu[n];
+
+               for(pdu = q->head; pdu; pdu = pdu->next)
+                       i++;
        }
        return i;
 }