#ifdef WIN32
#include "win32_compat.h"
-#else
+#endif
+
+#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#endif/*WIN32*/
+#endif
#ifdef HAVE_POLL_H
#include <poll.h>
#include <sys/ioctl.h>
#endif
-#include <stdio.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <fcntl.h>
-#include <string.h>
-#include <errno.h>
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#endif
+
+#ifdef HAVE_NETINET_TCP_H
+#include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_NETDB_H
+#include <netdb.h>
+#endif
+
#ifdef HAVE_SYS_FILIO_H
#include <sys/filio.h>
#endif
+
#ifdef HAVE_SYS_SOCKIO_H
#include <sys/sockio.h>
#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <fcntl.h>
+#include <string.h>
+#include <errno.h>
+#include <time.h>
#include <sys/types.h>
#include "libnfs-zdr.h"
#include "libnfs.h"
#include "win32_errnowrapper.h"
#endif
-
static int rpc_reconnect_requeue(struct rpc_context *rpc);
static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
#endif //FIXME
}
+static void set_nolinger(int fd)
+{
+ struct linger lng;
+ lng.l_onoff = 1;
+ lng.l_linger = 0;
+ setsockopt(fd, SOL_SOCKET, SO_LINGER, &lng, sizeof(lng));
+}
+
+#ifdef HAVE_NETINET_TCP_H
+int set_tcp_sockopt(int sockfd, int optname, int value)
+{
+ int level;
+
+ #if defined(__FreeBSD__) || defined(__sun) || (defined(__APPLE__) && defined(__MACH__))
+ struct protoent *buf;
+
+ if ((buf = getprotobyname("tcp")) != NULL)
+ level = buf->p_proto;
+ else
+ return -1;
+ #else
+ level = SOL_TCP;
+ #endif
+
+ return setsockopt(sockfd, level, optname, (char *)&value, sizeof(value));
+}
+#endif
+
int rpc_get_fd(struct rpc_context *rpc)
{
assert(rpc->magic == RPC_CONTEXT_MAGIC);
return rpc->fd;
}
+static int rpc_has_queue(struct rpc_queue *q)
+{
+ return q->head != NULL;
+}
+
int rpc_which_events(struct rpc_context *rpc)
{
int events;
return POLLIN;
}
- if (rpc->outqueue) {
+ if (rpc_has_queue(&rpc->outqueue)) {
events |= POLLOUT;
}
return events;
static int rpc_write_to_socket(struct rpc_context *rpc)
{
int32_t count;
+ struct rpc_pdu *pdu;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
return -1;
}
- while (rpc->outqueue != NULL) {
+ while ((pdu = rpc->outqueue.head) != NULL) {
int64_t total;
- total = rpc->outqueue->outdata.size;
+ total = pdu->outdata.size;
- count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+ count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
return -1;
}
- rpc->outqueue->written += count;
- if (rpc->outqueue->written == total) {
- struct rpc_pdu *pdu = rpc->outqueue;
+ pdu->written += count;
+ if (pdu->written == total) {
+ unsigned int hash;
- SLIST_REMOVE(&rpc->outqueue, pdu);
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+ rpc->outqueue.head = pdu->next;
+ if (pdu->next == NULL)
+ rpc->outqueue.tail = NULL;
+
+ hash = rpc_hash_xid(pdu->xid);
+ rpc_enqueue(&rpc->waitpdu[hash], pdu);
}
}
return 0;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- assert(rpc->magic == RPC_CONTEXT_MAGIC);
-
if (ioctl(rpc->fd, FIONREAD, &available) != 0) {
rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno);
return -1;
if (count < 0) {
rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno));
free(buf);
+ return -1;
}
if (rpc_process_pdu(rpc, buf, count) != 0) {
rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU");
pdu_size = rpc_get_pdu_size(rpc->inbuf);
if (rpc->insize < pdu_size) {
unsigned char *buf;
-
+
buf = malloc(pdu_size);
if (buf == NULL) {
rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
rpc->inpos += count;
if (rpc->inpos == rpc->insize) {
- if (rpc_process_pdu(rpc, rpc->inbuf, pdu_size) != 0) {
- rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
- return -1;
- }
- free(rpc->inbuf);
+ char *buf = rpc->inbuf;
+
rpc->inbuf = NULL;
rpc->insize = 0;
rpc->inpos = 0;
+
+ if (rpc_process_pdu(rpc, buf, pdu_size) != 0) {
+ rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
+ return -1;
+ }
+ free(buf);
}
return 0;
}
}
- if (revents & POLLOUT && rpc->outqueue != NULL) {
+ if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
if (rpc_write_to_socket(rpc) != 0) {
rpc_set_error(rpc, "write to socket failed");
return -1;
rpc->auto_reconnect = 0;
}
+void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v)
+{
+ assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+ rpc->tcp_syncnt = v;
+}
+
+#ifndef TCP_SYNCNT
+#define TCP_SYNCNT 7
+#endif
+
static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s)
{
int socksize;
case AF_INET:
socksize = sizeof(struct sockaddr_in);
rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+#ifdef HAVE_NETINET_TCP_H
+ if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
+ set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
+ }
+#endif
+ break;
+ case AF_INET6:
+ socksize = sizeof(struct sockaddr_in6);
+ rpc->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
+#ifdef HAVE_NETINET_TCP_H
+ if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
+ set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
+ }
+#endif
break;
default:
rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family);
* binding will usually succeed.
*/
{
- struct sockaddr_in sin;
+ struct sockaddr_storage ss;
static int portOfs = 0;
const int firstPort = 512; /* >= 512 according to Sun docs */
const int portCount = IPPORT_RESERVED - firstPort;
/* skip well-known ports */
if (!getservbyport(port, "tcp")) {
- memset(&sin, 0, sizeof(sin));
- sin.sin_port = port;
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = 0;
+ memset(&ss, 0, sizeof(ss));
- rc = bind(rpc->fd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in));
+ switch (s->ss_family) {
+ case AF_INET:
+ ((struct sockaddr_in *)&ss)->sin_port = port;
+ ((struct sockaddr_in *)&ss)->sin_family = AF_INET;
+#ifdef HAVE_SOCKADDR_LEN
+ ((struct sockaddr_in *)&ss)->sin_len = sizeof(struct sockaddr_in);
+#endif
+ break;
+ case AF_INET6:
+ ((struct sockaddr_in6 *)&ss)->sin6_port = port;
+ ((struct sockaddr_in6 *)&ss)->sin6_family = AF_INET6;
+#ifdef HAVE_SOCKADDR_LEN
+ ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr_in6);
+#endif
+ break;
+ }
+
+ rc = bind(rpc->fd, (struct sockaddr *)&ss, socksize);
#if !defined(WIN32)
/* we got EACCES, so don't try again */
if (rc != 0 && errno == EACCES)
}
set_nonblocking(rpc->fd);
+ set_nolinger(rpc->fd);
if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
return -1;
- }
+ }
return 0;
-}
+}
int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
{
- struct sockaddr_in *sin = (struct sockaddr_in *)&rpc->s;
+ struct addrinfo *ai = NULL;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
rpc->auto_reconnect = 0;
- sin->sin_family = AF_INET;
- sin->sin_port = htons(port);
- if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
- rpc_set_error(rpc, "Not a valid server ip address");
+ if (getaddrinfo(server, NULL, NULL, &ai) != 0) {
+ rpc_set_error(rpc, "Invalid address:%s. "
+ "Can not resolv into IPv4/v6 structure.", server);
return -1;
- }
-
+ }
- switch (rpc->s.ss_family) {
+ switch (ai->ai_family) {
case AF_INET:
+ ((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family;
+ ((struct sockaddr_in *)&rpc->s)->sin_port = htons(port);
+ ((struct sockaddr_in *)&rpc->s)->sin_addr = ((struct sockaddr_in *)(ai->ai_addr))->sin_addr;
+#ifdef HAVE_SOCKADDR_LEN
+ ((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in);
+#endif
+ break;
+ case AF_INET6:
+ ((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family;
+ ((struct sockaddr_in6 *)&rpc->s)->sin6_port = htons(port);
+ ((struct sockaddr_in6 *)&rpc->s)->sin6_addr = ((struct sockaddr_in6 *)(ai->ai_addr))->sin6_addr;
#ifdef HAVE_SOCKADDR_LEN
- sin->sin_len = sizeof(struct sockaddr_in);
+ ((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6);
#endif
break;
}
rpc->connect_cb = cb;
rpc->connect_data = private_data;
+ freeaddrinfo(ai);
+
if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
return -1;
}
return 0;
-}
+}
int rpc_disconnect(struct rpc_context *rpc, char *error)
{
static int rpc_reconnect_requeue(struct rpc_context *rpc)
{
struct rpc_pdu *pdu;
+ unsigned int i;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
/* socket is closed so we will not get any replies to any commands
* in flight. Move them all over from the waitpdu queue back to the out queue
*/
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
- SLIST_ADD(&rpc->outqueue, pdu);
- /* we have to re-send the whole pdu again */
- pdu->written = 0;
+ for (i = 0; i < HASHES; i++) {
+ struct rpc_queue *q = &rpc->waitpdu[i];
+
+ for (pdu=q->head; pdu; pdu=pdu->next) {
+ rpc_return_to_queue(&rpc->outqueue, pdu);
+ /* we have to re-send the whole pdu again */
+ pdu->written = 0;
+ }
+ rpc_reset_queue(q);
}
if (rpc->auto_reconnect != 0) {
sprintf(service, "%d", port);
if (getaddrinfo(addr, service, NULL, &ai) != 0) {
rpc_set_error(rpc, "Invalid address:%s. "
- "Can not resolv into IPv4/v6 structure.");
+ "Can not resolv into IPv4/v6 structure.", addr);
return -1;
}
case AF_INET:
rpc->fd = socket(ai->ai_family, SOCK_DGRAM, 0);
if (rpc->fd == -1) {
- rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno));
+ rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno));
freeaddrinfo(ai);
return -1;
}
if (bind(rpc->fd, (struct sockaddr *)ai->ai_addr, sizeof(struct sockaddr_in)) != 0) {
- rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno));
+ rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno));
freeaddrinfo(ai);
return -1;
}
sprintf(service, "%d", port);
if (getaddrinfo(addr, service, NULL, &ai) != 0) {
rpc_set_error(rpc, "Invalid address:%s. "
- "Can not resolv into IPv4/v6 structure.");
+ "Can not resolv into IPv4/v6 structure.", addr);
return -1;
}
{
int i=0;
struct rpc_pdu *pdu;
+ unsigned int n;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+ for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
i++;
}
- for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
- i++;
+
+ for (n = 0; n < HASHES; n++) {
+ struct rpc_queue *q = &rpc->waitpdu[n];
+
+ for(pdu = q->head; pdu; pdu = pdu->next)
+ i++;
}
return i;
}
+
+void rpc_set_fd(struct rpc_context *rpc, int fd)
+{
+ assert(rpc->magic == RPC_CONTEXT_MAGIC);
+
+ rpc->fd = fd;
+}