X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=lib%2Fsocket.c;h=3f7f89f90c9a368dc5ff6c48c0ab87f790d4e096;hb=18c94b4633c6157c3e75890c15ea51b73e7bf876;hp=733f1f4768e8a0b41c9a51a0903d73e1d08539c0;hpb=bff8fe460dcf4b25071fff966d86877b30eeec90;p=deb_libnfs.git diff --git a/lib/socket.c b/lib/socket.c index 733f1f4..3f7f89f 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -24,9 +24,11 @@ #ifdef WIN32 #include "win32_compat.h" -#else +#endif + +#ifdef HAVE_ARPA_INET_H #include -#endif/*WIN32*/ +#endif #ifdef HAVE_POLL_H #include @@ -44,6 +46,10 @@ #include #endif +#ifdef HAVE_NETINET_TCP_H +#include +#endif + #ifdef HAVE_NETDB_H #include #endif @@ -74,7 +80,6 @@ #include "win32_errnowrapper.h" #endif - static int rpc_reconnect_requeue(struct rpc_context *rpc); static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s); @@ -90,6 +95,34 @@ static void set_nonblocking(int fd) #endif //FIXME } +static void set_nolinger(int fd) +{ + struct linger lng; + lng.l_onoff = 1; + lng.l_linger = 0; + setsockopt(fd, SOL_SOCKET, SO_LINGER, &lng, sizeof(lng)); +} + +#ifdef HAVE_NETINET_TCP_H +int set_tcp_sockopt(int sockfd, int optname, int value) +{ + int level; + + #if defined(__FreeBSD__) || defined(__sun) || (defined(__APPLE__) && defined(__MACH__)) + struct protoent *buf; + + if ((buf = getprotobyname("tcp")) != NULL) + level = buf->p_proto; + else + return -1; + #else + level = SOL_TCP; + #endif + + return setsockopt(sockfd, level, optname, (char *)&value, sizeof(value)); +} +#endif + int rpc_get_fd(struct rpc_context *rpc) { assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -97,6 +130,11 @@ int rpc_get_fd(struct rpc_context *rpc) return rpc->fd; } +static int rpc_has_queue(struct rpc_queue *q) +{ + return q->head != NULL; +} + int rpc_which_events(struct rpc_context *rpc) { int events; @@ -110,7 +148,7 @@ int rpc_which_events(struct rpc_context *rpc) return POLLIN; } - if (rpc->outqueue) { + if (rpc_has_queue(&rpc->outqueue)) { events |= POLLOUT; } return events; @@ -119,6 +157,7 @@ int rpc_which_events(struct rpc_context *rpc) static int rpc_write_to_socket(struct rpc_context *rpc) { int32_t count; + struct rpc_pdu *pdu; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -127,12 +166,12 @@ static int rpc_write_to_socket(struct rpc_context *rpc) return -1; } - while (rpc->outqueue != NULL) { + while ((pdu = rpc->outqueue.head) != NULL) { int64_t total; - total = rpc->outqueue->outdata.size; + total = pdu->outdata.size; - count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0); + count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0); if (count == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return 0; @@ -141,12 +180,16 @@ static int rpc_write_to_socket(struct rpc_context *rpc) return -1; } - rpc->outqueue->written += count; - if (rpc->outqueue->written == total) { - struct rpc_pdu *pdu = rpc->outqueue; + pdu->written += count; + if (pdu->written == total) { + unsigned int hash; + + rpc->outqueue.head = pdu->next; + if (pdu->next == NULL) + rpc->outqueue.tail = NULL; - SLIST_REMOVE(&rpc->outqueue, pdu); - SLIST_ADD_END(&rpc->waitpdu, pdu); + hash = rpc_hash_xid(pdu->xid); + rpc_enqueue(&rpc->waitpdu[hash], pdu); } } return 0; @@ -161,8 +204,6 @@ static int rpc_read_from_socket(struct rpc_context *rpc) assert(rpc->magic == RPC_CONTEXT_MAGIC); - assert(rpc->magic == RPC_CONTEXT_MAGIC); - if (ioctl(rpc->fd, FIONREAD, &available) != 0) { rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno); return -1; @@ -186,6 +227,7 @@ static int rpc_read_from_socket(struct rpc_context *rpc) if (count < 0) { rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno)); free(buf); + return -1; } if (rpc_process_pdu(rpc, buf, count) != 0) { rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU"); @@ -227,7 +269,7 @@ static int rpc_read_from_socket(struct rpc_context *rpc) pdu_size = rpc_get_pdu_size(rpc->inbuf); if (rpc->insize < pdu_size) { unsigned char *buf; - + buf = malloc(pdu_size); if (buf == NULL) { rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno); @@ -256,14 +298,17 @@ static int rpc_read_from_socket(struct rpc_context *rpc) rpc->inpos += count; if (rpc->inpos == rpc->insize) { - if (rpc_process_pdu(rpc, rpc->inbuf, pdu_size) != 0) { - rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket"); - return -1; - } - free(rpc->inbuf); + char *buf = rpc->inbuf; + rpc->inbuf = NULL; rpc->insize = 0; rpc->inpos = 0; + + if (rpc_process_pdu(rpc, buf, pdu_size) != 0) { + rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket"); + return -1; + } + free(buf); } return 0; @@ -341,7 +386,7 @@ int rpc_service(struct rpc_context *rpc, int revents) } } - if (revents & POLLOUT && rpc->outqueue != NULL) { + if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) { if (rpc_write_to_socket(rpc) != 0) { rpc_set_error(rpc, "write to socket failed"); return -1; @@ -365,6 +410,17 @@ void rpc_unset_autoreconnect(struct rpc_context *rpc) rpc->auto_reconnect = 0; } +void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v) +{ + assert(rpc->magic == RPC_CONTEXT_MAGIC); + + rpc->tcp_syncnt = v; +} + +#ifndef TCP_SYNCNT +#define TCP_SYNCNT 7 +#endif + static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s) { int socksize; @@ -375,6 +431,20 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s case AF_INET: socksize = sizeof(struct sockaddr_in); rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); +#ifdef HAVE_NETINET_TCP_H + if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) { + set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt); + } +#endif + break; + case AF_INET6: + socksize = sizeof(struct sockaddr_in6); + rpc->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); +#ifdef HAVE_NETINET_TCP_H + if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) { + set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt); + } +#endif break; default: rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family); @@ -405,7 +475,7 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s * binding will usually succeed. */ { - struct sockaddr_in sin; + struct sockaddr_storage ss; static int portOfs = 0; const int firstPort = 512; /* >= 512 according to Sun docs */ const int portCount = IPPORT_RESERVED - firstPort; @@ -422,12 +492,26 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s /* skip well-known ports */ if (!getservbyport(port, "tcp")) { - memset(&sin, 0, sizeof(sin)); - sin.sin_port = port; - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = 0; + memset(&ss, 0, sizeof(ss)); + + switch (s->ss_family) { + case AF_INET: + ((struct sockaddr_in *)&ss)->sin_port = port; + ((struct sockaddr_in *)&ss)->sin_family = AF_INET; +#ifdef HAVE_SOCKADDR_LEN + ((struct sockaddr_in *)&ss)->sin_len = sizeof(struct sockaddr_in); +#endif + break; + case AF_INET6: + ((struct sockaddr_in6 *)&ss)->sin6_port = port; + ((struct sockaddr_in6 *)&ss)->sin6_family = AF_INET6; +#ifdef HAVE_SOCKADDR_LEN + ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr_in6); +#endif + break; + } - rc = bind(rpc->fd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)); + rc = bind(rpc->fd, (struct sockaddr *)&ss, socksize); #if !defined(WIN32) /* we got EACCES, so don't try again */ if (rc != 0 && errno == EACCES) @@ -438,18 +522,19 @@ static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_s } set_nonblocking(rpc->fd); + set_nolinger(rpc->fd); if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) { rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno); return -1; - } + } return 0; -} +} int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data) { - struct sockaddr_in *sin = (struct sockaddr_in *)&rpc->s; + struct addrinfo *ai = NULL; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -465,18 +550,27 @@ int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc rpc->auto_reconnect = 0; - sin->sin_family = AF_INET; - sin->sin_port = htons(port); - if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) { - rpc_set_error(rpc, "Not a valid server ip address"); + if (getaddrinfo(server, NULL, NULL, &ai) != 0) { + rpc_set_error(rpc, "Invalid address:%s. " + "Can not resolv into IPv4/v6 structure.", server); return -1; - } - + } - switch (rpc->s.ss_family) { + switch (ai->ai_family) { case AF_INET: + ((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family; + ((struct sockaddr_in *)&rpc->s)->sin_port = htons(port); + ((struct sockaddr_in *)&rpc->s)->sin_addr = ((struct sockaddr_in *)(ai->ai_addr))->sin_addr; +#ifdef HAVE_SOCKADDR_LEN + ((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in); +#endif + break; + case AF_INET6: + ((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family; + ((struct sockaddr_in6 *)&rpc->s)->sin6_port = htons(port); + ((struct sockaddr_in6 *)&rpc->s)->sin6_addr = ((struct sockaddr_in6 *)(ai->ai_addr))->sin6_addr; #ifdef HAVE_SOCKADDR_LEN - sin->sin_len = sizeof(struct sockaddr_in); + ((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6); #endif break; } @@ -484,12 +578,14 @@ int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc rpc->connect_cb = cb; rpc->connect_data = private_data; + freeaddrinfo(ai); + if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) { return -1; } return 0; -} +} int rpc_disconnect(struct rpc_context *rpc, char *error) { @@ -526,6 +622,7 @@ static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, vo static int rpc_reconnect_requeue(struct rpc_context *rpc) { struct rpc_pdu *pdu; + unsigned int i; assert(rpc->magic == RPC_CONTEXT_MAGIC); @@ -539,11 +636,15 @@ static int rpc_reconnect_requeue(struct rpc_context *rpc) /* socket is closed so we will not get any replies to any commands * in flight. Move them all over from the waitpdu queue back to the out queue */ - for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) { - SLIST_REMOVE(&rpc->waitpdu, pdu); - SLIST_ADD(&rpc->outqueue, pdu); - /* we have to re-send the whole pdu again */ - pdu->written = 0; + for (i = 0; i < HASHES; i++) { + struct rpc_queue *q = &rpc->waitpdu[i]; + + for (pdu=q->head; pdu; pdu=pdu->next) { + rpc_return_to_queue(&rpc->outqueue, pdu); + /* we have to re-send the whole pdu again */ + pdu->written = 0; + } + rpc_reset_queue(q); } if (rpc->auto_reconnect != 0) { @@ -574,7 +675,7 @@ int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port) sprintf(service, "%d", port); if (getaddrinfo(addr, service, NULL, &ai) != 0) { rpc_set_error(rpc, "Invalid address:%s. " - "Can not resolv into IPv4/v6 structure."); + "Can not resolv into IPv4/v6 structure.", addr); return -1; } @@ -582,13 +683,13 @@ int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port) case AF_INET: rpc->fd = socket(ai->ai_family, SOCK_DGRAM, 0); if (rpc->fd == -1) { - rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno)); + rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno)); freeaddrinfo(ai); return -1; } if (bind(rpc->fd, (struct sockaddr *)ai->ai_addr, sizeof(struct sockaddr_in)) != 0) { - rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno)); + rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno)); freeaddrinfo(ai); return -1; } @@ -619,7 +720,7 @@ int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int i sprintf(service, "%d", port); if (getaddrinfo(addr, service, NULL, &ai) != 0) { rpc_set_error(rpc, "Invalid address:%s. " - "Can not resolv into IPv4/v6 structure."); + "Can not resolv into IPv4/v6 structure.", addr); return -1; } @@ -653,14 +754,26 @@ int rpc_queue_length(struct rpc_context *rpc) { int i=0; struct rpc_pdu *pdu; + unsigned int n; assert(rpc->magic == RPC_CONTEXT_MAGIC); - for(pdu = rpc->outqueue; pdu; pdu = pdu->next) { + for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) { i++; } - for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) { - i++; + + for (n = 0; n < HASHES; n++) { + struct rpc_queue *q = &rpc->waitpdu[n]; + + for(pdu = q->head; pdu; pdu = pdu->next) + i++; } return i; } + +void rpc_set_fd(struct rpc_context *rpc, int fd) +{ + assert(rpc->magic == RPC_CONTEXT_MAGIC); + + rpc->fd = fd; +}