Merge branch 'zdr'
[deb_libnfs.git] / lib / socket.c
index f833df130dacb9ae80597b616decb69816f5020a..83ca50e1765ad365a6f67650edcb01119a133bea 100644 (file)
 #include <netdb.h>
 #endif/*WIN32*/
 
-#if defined(WIN32)
-#include <winsock2.h>
-#include <ws2tcpip.h>
-#include <basetsd.h>
-#define ssize_t SSIZE_T
-#define MSG_DONTWAIT 0
-#else
-#include <unistd.h>
-#include <poll.h>
-#include <arpa/inet.h>
-#include <sys/ioctl.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#endif
-
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 #include <stdio.h>
 #include <stdlib.h>
+#include <assert.h>
 #include <fcntl.h>
 #include <string.h>
 #include <errno.h>
-#include <rpc/rpc.h>
-#include <rpc/xdr.h>
 #ifdef HAVE_SYS_FILIO_H
 #include <sys/filio.h>
 #endif
 #include <sys/sockio.h>
 #endif
 #include <sys/types.h>
+#include "libnfs-zdr.h"
 #include "libnfs.h"
 #include "libnfs-raw.h"
 #include "libnfs-private.h"
 #include "slist.h"
 
-static int rpc_disconnect_requeue(struct rpc_context *rpc);
+#ifdef WIN32
+//has to be included after stdlib!!
+#include "win32_errnowrapper.h"
+#endif
+
+
+static int rpc_reconnect_requeue(struct rpc_context *rpc);
+static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
 
 static void set_nonblocking(int fd)
 {
@@ -78,12 +70,18 @@ static void set_nonblocking(int fd)
 
 int rpc_get_fd(struct rpc_context *rpc)
 {
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
        return rpc->fd;
 }
 
 int rpc_which_events(struct rpc_context *rpc)
 {
-       int events = rpc->is_connected ? POLLIN : POLLOUT;
+       int events;
+
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+       events = rpc->is_connected ? POLLIN : POLLOUT;
 
        if (rpc->is_udp != 0) {
                /* for udp sockets we only wait for pollin */
@@ -98,18 +96,17 @@ int rpc_which_events(struct rpc_context *rpc)
 
 static int rpc_write_to_socket(struct rpc_context *rpc)
 {
-       ssize_t count;
+       int32_t count;
+
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       if (rpc == NULL) {
-               return -1;
-       }
        if (rpc->fd == -1) {
                rpc_set_error(rpc, "trying to write but not connected");
                return -1;
        }
 
        while (rpc->outqueue != NULL) {
-               ssize_t total;
+               int64_t total;
 
                total = rpc->outqueue->outdata.size;
 
@@ -142,7 +139,11 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
        int available;
        int size;
        int pdu_size;
-       ssize_t count;
+       int32_t count;
+
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
 #if defined(WIN32)
        if (ioctlsocket(rpc->fd, FIONREAD, &available) != 0) {
@@ -266,8 +267,14 @@ static int rpc_read_from_socket(struct rpc_context *rpc)
 
 int rpc_service(struct rpc_context *rpc, int revents)
 {
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
        if (revents & POLLERR) {
+#ifdef WIN32
                char err = 0;
+#else
+               int err = 0;
+#endif
                socklen_t err_size = sizeof(err);
 
                if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
@@ -282,12 +289,16 @@ int rpc_service(struct rpc_context *rpc, int revents)
                        rpc_set_error(rpc, "rpc_service: POLLERR, "
                                                "Unknown socket error.");
                }
-               rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
+               if (rpc->connect_cb != NULL) {
+                       rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
+               }
                return -1;
        }
        if (revents & POLLHUP) {
                rpc_set_error(rpc, "Socket failed with POLLHUP");
-               rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
+               if (rpc->connect_cb != NULL) {
+                       rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
+               }
                return -1;
        }
 
@@ -303,19 +314,23 @@ int rpc_service(struct rpc_context *rpc, int revents)
                        rpc_set_error(rpc, "rpc_service: socket error "
                                        "%s(%d) while connecting.",
                                        strerror(err), err);
-                       rpc->connect_cb(rpc, RPC_STATUS_ERROR,
+                       if (rpc->connect_cb != NULL) {
+                               rpc->connect_cb(rpc, RPC_STATUS_ERROR,
                                        NULL, rpc->connect_data);
+                       }
                        return -1;
                }
 
                rpc->is_connected = 1;
-               rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
+               if (rpc->connect_cb != NULL) {
+                       rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
+               }
                return 0;
        }
 
        if (revents & POLLIN) {
                if (rpc_read_from_socket(rpc) != 0) {
-                       rpc_disconnect_requeue(rpc);
+                       rpc_reconnect_requeue(rpc);
                        return 0;
                }
        }
@@ -330,37 +345,34 @@ int rpc_service(struct rpc_context *rpc, int revents)
        return 0;
 }
 
-int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
+void rpc_set_autoreconnect(struct rpc_context *rpc)
 {
-       struct sockaddr_storage s;
-       struct sockaddr_in *sin = (struct sockaddr_in *)&s;
-       int socksize;
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       if (rpc->fd != -1) {
-               rpc_set_error(rpc, "Trying to connect while already connected");
-               return -1;
-       }
+       rpc->auto_reconnect = 1;
+}
 
-       if (rpc->is_udp != 0) {
-               rpc_set_error(rpc, "Trying to connect on UDP socket");
-               return -1;
-       }
+void rpc_unset_autoreconnect(struct rpc_context *rpc)
+{
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
 
-       sin->sin_family = AF_INET;
-       sin->sin_port   = htons(port);
-       if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
-               rpc_set_error(rpc, "Not a valid server ip address");
-               return -1;
-       }
+       rpc->auto_reconnect = 0;
+}
+
+static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s)
+{
+       int socksize;
 
-       switch (s.ss_family) {
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+       switch (s->ss_family) {
        case AF_INET:
                socksize = sizeof(struct sockaddr_in);
-#ifdef HAVE_SOCKADDR_LEN
-               sin->sin_len = socksize;
-#endif
                rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
                break;
+       default:
+               rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family);
+               return -1;
        }
 
        if (rpc->fd == -1) {
@@ -368,11 +380,6 @@ int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc
                return -1;
        }
 
-       rpc->connect_cb  = cb;
-       rpc->connect_data = private_data;
-
-
-#if !defined(WIN32)
        /* Some systems allow you to set capabilities on an executable
         * to allow the file to be executed with privilege to bind to
         * privileged system ports, even if the user is not root.
@@ -387,48 +394,99 @@ int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc
         * On linux, use
         *    sudo setcap 'cap_net_bind_service=+ep' /path/executable
         * to make the executable able to bind to a system port.
+        *
+        * On Windows, there is no concept of privileged ports. Thus
+        * binding will usually succeed.
         */
-       if (1) {
-               int port;
-               int one = 1;
+       {
+               struct sockaddr_in sin;
+               static int portOfs = 0;
+               const int firstPort = 512;      /* >= 512 according to Sun docs */
+               const int portCount = IPPORT_RESERVED - firstPort;
+               int startOfs = portOfs, port, rc;
+
+               do {
+                       rc = -1;
+                       port = htons(firstPort + portOfs);
+                       portOfs = (portOfs + 1) % portCount;
+
+                       /* skip well-known ports */
+                       if (!getservbyport(port, "tcp")) {
+                               memset(&sin, 0, sizeof(sin));
+                               sin.sin_port        = port;
+                               sin.sin_family      = AF_INET;
+                               sin.sin_addr.s_addr = 0;
+
+                               rc = bind(rpc->fd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in));
+#if !defined(WIN32)
+                               /* we got EACCES, so don't try again */
+                               if (rc != 0 && errno == EACCES)
+                                       break;
+#endif
+                       }
+               } while (rc != 0 && portOfs != startOfs);
+       }
 
-               setsockopt(rpc->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
+       set_nonblocking(rpc->fd);
 
-               for (port = 200; port < 500; port++) {
-                       struct sockaddr_in sin;
+       if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
+               rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
+               return -1;
+       }               
 
-                       printf("try port %d\n", port);
-                       memset(&sin, 0, sizeof(sin));
-                       sin.sin_port        = htons(port);
-                       sin.sin_family      = AF_INET;
-                       sin.sin_addr.s_addr = 0;
+       return 0;
+}          
 
-                       if (bind(rpc->fd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) != 0 && errno != EACCES) {
-                               /* we didnt get EACCES, so try again */
-                               continue;
-                       }
-                       break;
-               }
+int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
+{
+       struct sockaddr_in *sin = (struct sockaddr_in *)&rpc->s;
+
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+       if (rpc->fd != -1) {
+               rpc_set_error(rpc, "Trying to connect while already connected");
+               return -1;
        }
-#endif
 
-       set_nonblocking(rpc->fd);
+       if (rpc->is_udp != 0) {
+               rpc_set_error(rpc, "Trying to connect on UDP socket");
+               return -1;
+       }
 
-#if defined(WIN32)
-       if (connect(rpc->fd, (struct sockaddr *)&s, socksize) == 0 && GetLastError() != WSAEINPROGRESS   )
-#else
-       if (connect(rpc->fd, (struct sockaddr *)&s, socksize) != 0 && errno != EINPROGRESS) 
+       rpc->auto_reconnect = 0;
+
+       sin->sin_family = AF_INET;
+       sin->sin_port   = htons(port);
+       if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
+               rpc_set_error(rpc, "Not a valid server ip address");
+               return -1;
+       }
+
+
+       switch (rpc->s.ss_family) {
+       case AF_INET:
+#ifdef HAVE_SOCKADDR_LEN
+               sin->sin_len = sizeof(struct sockaddr_in);
 #endif
-       {
-               rpc_set_error(rpc, "connect() to server failed");
+               break;
+       }
+
+       rpc->connect_cb  = cb;
+       rpc->connect_data = private_data;
+
+       if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
                return -1;
-       }               
+       }
 
        return 0;
 }          
 
 int rpc_disconnect(struct rpc_context *rpc, char *error)
 {
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+       rpc_unset_autoreconnect(rpc);
+
        if (rpc->fd != -1) {
 #if defined(WIN32)
                closesocket(rpc->fd);
@@ -445,11 +503,26 @@ int rpc_disconnect(struct rpc_context *rpc, char *error)
        return 0;
 }
 
-/* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue */
-static int rpc_disconnect_requeue(struct rpc_context *rpc)
+static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, void *private_data)
+{
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+       if (status != RPC_STATUS_SUCCESS) {
+               rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
+               return;
+       }
+
+       rpc->is_connected = 1;
+       rpc->connect_cb   = NULL;
+}
+
+/* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
+static int rpc_reconnect_requeue(struct rpc_context *rpc)
 {
        struct rpc_pdu *pdu;
 
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
        if (rpc->fd != -1) {
 #if defined(WIN32)
                closesocket(rpc->fd);
@@ -467,6 +540,17 @@ static int rpc_disconnect_requeue(struct rpc_context *rpc)
        for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
                SLIST_REMOVE(&rpc->waitpdu, pdu);
                SLIST_ADD(&rpc->outqueue, pdu);
+               /* we have to re-send the whole pdu again */
+               pdu->written = 0;
+       }
+
+       if (rpc->auto_reconnect != 0) {
+               rpc->connect_cb  = reconnect_cb;
+
+               if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
+                       rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
+                       return -1;
+               }
        }
 
        return 0;
@@ -478,6 +562,8 @@ int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port)
        struct addrinfo *ai = NULL;
        char service[6];
 
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
        if (rpc->is_udp == 0) {
                rpc_set_error(rpc, "Cant not bind UDP. Not UDP context");
                return -1;
@@ -521,6 +607,8 @@ int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int i
        struct addrinfo *ai = NULL;
        char service[6];
 
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
        if (rpc->is_udp == 0) {
                rpc_set_error(rpc, "Can not set destination sockaddr. Not UDP context");
                return -1;
@@ -540,6 +628,7 @@ int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int i
        rpc->udp_dest = malloc(ai->ai_addrlen);
        if (rpc->udp_dest == NULL) {
                rpc_set_error(rpc, "Out of memory. Failed to allocate sockaddr structure");
+               freeaddrinfo(ai);
                return -1;
        }
        memcpy(rpc->udp_dest, ai->ai_addr, ai->ai_addrlen);
@@ -553,6 +642,8 @@ int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int i
 
 struct sockaddr *rpc_get_recv_sockaddr(struct rpc_context *rpc)
 {
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
        return (struct sockaddr *)&rpc->udp_src;
 }
 
@@ -561,6 +652,8 @@ int rpc_queue_length(struct rpc_context *rpc)
        int i=0;
        struct rpc_pdu *pdu;
 
+       assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
        for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
                i++;
        }