2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "aros_compat.h"
26 #include "win32_compat.h"
29 #ifdef HAVE_ARPA_INET_H
30 #include <arpa/inet.h>
41 #ifdef HAVE_SYS_IOCTL_H
42 #include <sys/ioctl.h>
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
49 #ifdef HAVE_NETINET_IN_H
50 #include <netinet/in.h>
53 #ifdef HAVE_NETINET_TCP_H
54 #include <netinet/tcp.h>
61 #ifdef HAVE_SYS_FILIO_H
62 #include <sys/filio.h>
65 #ifdef HAVE_SYS_SOCKIO_H
66 #include <sys/sockio.h>
76 #include <sys/types.h>
77 #include "libnfs-zdr.h"
79 #include "libnfs-raw.h"
80 #include "libnfs-private.h"
84 //has to be included after stdlib!!
85 #include "win32_errnowrapper.h"
88 static int rpc_reconnect_requeue(struct rpc_context
*rpc
);
89 static int rpc_connect_sockaddr_async(struct rpc_context
*rpc
, struct sockaddr_storage
*s
);
91 static void set_nonblocking(int fd
)
96 v
= ioctl(fd
, FIONBIO
, &nonblocking
);
98 v
= fcntl(fd
, F_GETFL
, 0);
99 fcntl(fd
, F_SETFL
, v
| O_NONBLOCK
);
103 static void set_nolinger(int fd
)
108 setsockopt(fd
, SOL_SOCKET
, SO_LINGER
, &lng
, sizeof(lng
));
111 #ifdef HAVE_NETINET_TCP_H
112 int set_tcp_sockopt(int sockfd
, int optname
, int value
)
116 #if defined(__FreeBSD__) || defined(__sun) || (defined(__APPLE__) && defined(__MACH__))
117 struct protoent
*buf
;
119 if ((buf
= getprotobyname("tcp")) != NULL
)
120 level
= buf
->p_proto
;
127 return setsockopt(sockfd
, level
, optname
, (char *)&value
, sizeof(value
));
131 int rpc_get_fd(struct rpc_context
*rpc
)
133 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
138 static int rpc_has_queue(struct rpc_queue
*q
)
140 return q
->head
!= NULL
;
143 int rpc_which_events(struct rpc_context
*rpc
)
147 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
149 events
= rpc
->is_connected
? POLLIN
: POLLOUT
;
151 if (rpc
->is_udp
!= 0) {
152 /* for udp sockets we only wait for pollin */
156 if (rpc_has_queue(&rpc
->outqueue
)) {
162 static int rpc_write_to_socket(struct rpc_context
*rpc
)
167 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
170 rpc_set_error(rpc
, "trying to write but not connected");
174 while ((pdu
= rpc
->outqueue
.head
) != NULL
) {
177 total
= pdu
->outdata
.size
;
179 count
= send(rpc
->fd
, pdu
->outdata
.data
+ pdu
->written
, total
- pdu
->written
, 0);
181 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
184 rpc_set_error(rpc
, "Error when writing to socket :%s(%d)", strerror(errno
), errno
);
188 pdu
->written
+= count
;
189 if (pdu
->written
== total
) {
192 rpc
->outqueue
.head
= pdu
->next
;
193 if (pdu
->next
== NULL
)
194 rpc
->outqueue
.tail
= NULL
;
196 hash
= rpc_hash_xid(pdu
->xid
);
197 rpc_enqueue(&rpc
->waitpdu
[hash
], pdu
);
203 static int rpc_read_from_socket(struct rpc_context
*rpc
)
210 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
212 if (ioctl(rpc
->fd
, FIONREAD
, &available
) != 0) {
213 rpc_set_error(rpc
, "Ioctl FIONREAD returned error : %d. Closing socket.", errno
);
217 if (available
== 0) {
218 rpc_set_error(rpc
, "Socket has been closed");
224 socklen_t socklen
= sizeof(rpc
->udp_src
);
226 buf
= malloc(available
);
228 rpc_set_error(rpc
, "Failed to malloc buffer for recvfrom");
231 count
= recvfrom(rpc
->fd
, buf
, available
, MSG_DONTWAIT
, (struct sockaddr
*)&rpc
->udp_src
, &socklen
);
233 rpc_set_error(rpc
, "Failed recvfrom: %s", strerror(errno
));
237 if (rpc_process_pdu(rpc
, buf
, count
) != 0) {
238 rpc_set_error(rpc
, "Invalid/garbage pdu received from server. Ignoring PDU");
246 /* read record marker, 4 bytes at the beginning of every pdu */
247 if (rpc
->inbuf
== NULL
) {
249 rpc
->inbuf
= malloc(rpc
->insize
);
250 if (rpc
->inbuf
== NULL
) {
251 rpc_set_error(rpc
, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno
);
255 if (rpc
->inpos
< 4) {
256 size
= 4 - rpc
->inpos
;
258 count
= recv(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
, 0);
260 if (errno
== EINTR
) {
263 rpc_set_error(rpc
, "Read from socket failed, errno:%d. Closing socket.", errno
);
270 if (available
== 0) {
274 pdu_size
= rpc_get_pdu_size(rpc
->inbuf
);
275 if (rpc
->insize
< pdu_size
) {
278 buf
= malloc(pdu_size
);
280 rpc_set_error(rpc
, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size
, errno
);
283 memcpy(buf
, rpc
->inbuf
, rpc
->insize
);
286 rpc
->insize
= rpc_get_pdu_size(rpc
->inbuf
);
290 if (size
> rpc
->insize
- rpc
->inpos
) {
291 size
= rpc
->insize
- rpc
->inpos
;
294 count
= recv(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
, 0);
296 if (errno
== EINTR
) {
299 rpc_set_error(rpc
, "Read from socket failed, errno:%d. Closing socket.", errno
);
305 if (rpc
->inpos
== rpc
->insize
) {
306 char *buf
= rpc
->inbuf
;
312 if (rpc_process_pdu(rpc
, buf
, pdu_size
) != 0) {
313 rpc_set_error(rpc
, "Invalid/garbage pdu received from server. Closing socket");
324 int rpc_service(struct rpc_context
*rpc
, int revents
)
326 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
328 if (revents
& POLLERR
) {
334 socklen_t err_size
= sizeof(err
);
336 if (getsockopt(rpc
->fd
, SOL_SOCKET
, SO_ERROR
,
337 (char *)&err
, &err_size
) != 0 || err
!= 0) {
341 rpc_set_error(rpc
, "rpc_service: socket error "
345 rpc_set_error(rpc
, "rpc_service: POLLERR, "
346 "Unknown socket error.");
348 if (rpc
->connect_cb
!= NULL
) {
349 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
, rpc
->error_string
, rpc
->connect_data
);
353 if (revents
& POLLHUP
) {
354 rpc_set_error(rpc
, "Socket failed with POLLHUP");
355 if (rpc
->connect_cb
!= NULL
) {
356 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
, rpc
->error_string
, rpc
->connect_data
);
361 if (rpc
->is_connected
== 0 && rpc
->fd
!= -1 && revents
&POLLOUT
) {
363 socklen_t err_size
= sizeof(err
);
365 if (getsockopt(rpc
->fd
, SOL_SOCKET
, SO_ERROR
,
366 (char *)&err
, &err_size
) != 0 || err
!= 0) {
370 rpc_set_error(rpc
, "rpc_service: socket error "
371 "%s(%d) while connecting.",
373 if (rpc
->connect_cb
!= NULL
) {
374 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
,
375 NULL
, rpc
->connect_data
);
380 rpc
->is_connected
= 1;
381 if (rpc
->connect_cb
!= NULL
) {
382 rpc
->connect_cb(rpc
, RPC_STATUS_SUCCESS
, NULL
, rpc
->connect_data
);
387 if (revents
& POLLIN
) {
388 if (rpc_read_from_socket(rpc
) != 0) {
389 rpc_reconnect_requeue(rpc
);
394 if (revents
& POLLOUT
&& rpc_has_queue(&rpc
->outqueue
)) {
395 if (rpc_write_to_socket(rpc
) != 0) {
396 rpc_set_error(rpc
, "write to socket failed");
404 void rpc_set_autoreconnect(struct rpc_context
*rpc
)
406 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
408 rpc
->auto_reconnect
= 1;
411 void rpc_unset_autoreconnect(struct rpc_context
*rpc
)
413 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
415 rpc
->auto_reconnect
= 0;
418 void rpc_set_tcp_syncnt(struct rpc_context
*rpc
, int v
)
420 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
429 static int rpc_connect_sockaddr_async(struct rpc_context
*rpc
, struct sockaddr_storage
*s
)
433 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
435 switch (s
->ss_family
) {
437 socksize
= sizeof(struct sockaddr_in
);
438 rpc
->fd
= socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
439 #ifdef HAVE_NETINET_TCP_H
440 if (rpc
->tcp_syncnt
!= RPC_PARAM_UNDEFINED
) {
441 set_tcp_sockopt(rpc
->fd
, TCP_SYNCNT
, rpc
->tcp_syncnt
);
446 socksize
= sizeof(struct sockaddr_in6
);
447 rpc
->fd
= socket(AF_INET6
, SOCK_STREAM
, IPPROTO_TCP
);
448 #ifdef HAVE_NETINET_TCP_H
449 if (rpc
->tcp_syncnt
!= RPC_PARAM_UNDEFINED
) {
450 set_tcp_sockopt(rpc
->fd
, TCP_SYNCNT
, rpc
->tcp_syncnt
);
455 rpc_set_error(rpc
, "Can not handle AF_FAMILY:%d", s
->ss_family
);
460 rpc_set_error(rpc
, "Failed to open socket");
464 /* Some systems allow you to set capabilities on an executable
465 * to allow the file to be executed with privilege to bind to
466 * privileged system ports, even if the user is not root.
468 * Opportunistically try to bind the socket to a low numbered
469 * system port in the hope that the user is either root or the
470 * executable has the CAP_NET_BIND_SERVICE.
472 * As soon as we fail the bind() with EACCES we know we will never
473 * be able to bind to a system port so we terminate the loop.
476 * sudo setcap 'cap_net_bind_service=+ep' /path/executable
477 * to make the executable able to bind to a system port.
479 * On Windows, there is no concept of privileged ports. Thus
480 * binding will usually succeed.
483 struct sockaddr_storage ss
;
484 static int portOfs
= 0;
485 const int firstPort
= 512; /* >= 512 according to Sun docs */
486 const int portCount
= IPPORT_RESERVED
- firstPort
;
487 int startOfs
, port
, rc
;
490 portOfs
= time(NULL
) % 400;
495 port
= htons(firstPort
+ portOfs
);
496 portOfs
= (portOfs
+ 1) % portCount
;
498 /* skip well-known ports */
499 if (!getservbyport(port
, "tcp")) {
500 memset(&ss
, 0, sizeof(ss
));
502 switch (s
->ss_family
) {
504 ((struct sockaddr_in
*)&ss
)->sin_port
= port
;
505 ((struct sockaddr_in
*)&ss
)->sin_family
= AF_INET
;
506 #ifdef HAVE_SOCKADDR_LEN
507 ((struct sockaddr_in
*)&ss
)->sin_len
= sizeof(struct sockaddr_in
);
511 ((struct sockaddr_in6
*)&ss
)->sin6_port
= port
;
512 ((struct sockaddr_in6
*)&ss
)->sin6_family
= AF_INET6
;
513 #ifdef HAVE_SOCKADDR_LEN
514 ((struct sockaddr_in6
*)&ss
)->sin6_len
= sizeof(struct sockaddr_in6
);
519 rc
= bind(rpc
->fd
, (struct sockaddr
*)&ss
, socksize
);
521 /* we got EACCES, so don't try again */
522 if (rc
!= 0 && errno
== EACCES
)
526 } while (rc
!= 0 && portOfs
!= startOfs
);
529 set_nonblocking(rpc
->fd
);
530 set_nolinger(rpc
->fd
);
532 if (connect(rpc
->fd
, (struct sockaddr
*)s
, socksize
) != 0 && errno
!= EINPROGRESS
) {
533 rpc_set_error(rpc
, "connect() to server failed. %s(%d)", strerror(errno
), errno
);
540 int rpc_connect_async(struct rpc_context
*rpc
, const char *server
, int port
, rpc_cb cb
, void *private_data
)
542 struct addrinfo
*ai
= NULL
;
544 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
547 rpc_set_error(rpc
, "Trying to connect while already connected");
551 if (rpc
->is_udp
!= 0) {
552 rpc_set_error(rpc
, "Trying to connect on UDP socket");
556 rpc
->auto_reconnect
= 0;
558 if (getaddrinfo(server
, NULL
, NULL
, &ai
) != 0) {
559 rpc_set_error(rpc
, "Invalid address:%s. "
560 "Can not resolv into IPv4/v6 structure.", server
);
564 switch (ai
->ai_family
) {
566 ((struct sockaddr_in
*)&rpc
->s
)->sin_family
= ai
->ai_family
;
567 ((struct sockaddr_in
*)&rpc
->s
)->sin_port
= htons(port
);
568 ((struct sockaddr_in
*)&rpc
->s
)->sin_addr
= ((struct sockaddr_in
*)(ai
->ai_addr
))->sin_addr
;
569 #ifdef HAVE_SOCKADDR_LEN
570 ((struct sockaddr_in
*)&rpc
->s
)->sin_len
= sizeof(struct sockaddr_in
);
574 ((struct sockaddr_in6
*)&rpc
->s
)->sin6_family
= ai
->ai_family
;
575 ((struct sockaddr_in6
*)&rpc
->s
)->sin6_port
= htons(port
);
576 ((struct sockaddr_in6
*)&rpc
->s
)->sin6_addr
= ((struct sockaddr_in6
*)(ai
->ai_addr
))->sin6_addr
;
577 #ifdef HAVE_SOCKADDR_LEN
578 ((struct sockaddr_in6
*)&rpc
->s
)->sin6_len
= sizeof(struct sockaddr_in6
);
583 rpc
->connect_cb
= cb
;
584 rpc
->connect_data
= private_data
;
588 if (rpc_connect_sockaddr_async(rpc
, &rpc
->s
) != 0) {
595 int rpc_disconnect(struct rpc_context
*rpc
, char *error
)
597 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
599 rpc_unset_autoreconnect(rpc
);
606 rpc
->is_connected
= 0;
608 rpc_error_all_pdus(rpc
, error
);
613 static void reconnect_cb(struct rpc_context
*rpc
, int status
, void *data _U_
, void *private_data
)
615 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
617 if (status
!= RPC_STATUS_SUCCESS
) {
618 rpc_error_all_pdus(rpc
, "RPC ERROR: Failed to reconnect async");
622 rpc
->is_connected
= 1;
623 rpc
->connect_cb
= NULL
;
626 /* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
627 static int rpc_reconnect_requeue(struct rpc_context
*rpc
)
632 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
639 rpc
->is_connected
= 0;
641 /* socket is closed so we will not get any replies to any commands
642 * in flight. Move them all over from the waitpdu queue back to the out queue
644 for (i
= 0; i
< HASHES
; i
++) {
645 struct rpc_queue
*q
= &rpc
->waitpdu
[i
];
647 for (pdu
=q
->head
; pdu
; pdu
=pdu
->next
) {
648 rpc_return_to_queue(&rpc
->outqueue
, pdu
);
649 /* we have to re-send the whole pdu again */
655 if (rpc
->auto_reconnect
!= 0) {
656 rpc
->connect_cb
= reconnect_cb
;
658 if (rpc_connect_sockaddr_async(rpc
, &rpc
->s
) != 0) {
659 rpc_error_all_pdus(rpc
, "RPC ERROR: Failed to reconnect async");
668 int rpc_bind_udp(struct rpc_context
*rpc
, char *addr
, int port
)
670 struct addrinfo
*ai
= NULL
;
673 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
675 if (rpc
->is_udp
== 0) {
676 rpc_set_error(rpc
, "Cant not bind UDP. Not UDP context");
680 sprintf(service
, "%d", port
);
681 if (getaddrinfo(addr
, service
, NULL
, &ai
) != 0) {
682 rpc_set_error(rpc
, "Invalid address:%s. "
683 "Can not resolv into IPv4/v6 structure.", addr
);
687 switch(ai
->ai_family
) {
689 rpc
->fd
= socket(ai
->ai_family
, SOCK_DGRAM
, 0);
691 rpc_set_error(rpc
, "Failed to create UDP socket: %s", strerror(errno
));
696 if (bind(rpc
->fd
, (struct sockaddr
*)ai
->ai_addr
, sizeof(struct sockaddr_in
)) != 0) {
697 rpc_set_error(rpc
, "Failed to bind to UDP socket: %s",strerror(errno
));
703 rpc_set_error(rpc
, "Can not handle UPD sockets of family %d yet", ai
->ai_family
);
713 int rpc_set_udp_destination(struct rpc_context
*rpc
, char *addr
, int port
, int is_broadcast
)
715 struct addrinfo
*ai
= NULL
;
718 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
720 if (rpc
->is_udp
== 0) {
721 rpc_set_error(rpc
, "Can not set destination sockaddr. Not UDP context");
725 sprintf(service
, "%d", port
);
726 if (getaddrinfo(addr
, service
, NULL
, &ai
) != 0) {
727 rpc_set_error(rpc
, "Invalid address:%s. "
728 "Can not resolv into IPv4/v6 structure.", addr
);
734 rpc
->udp_dest
= NULL
;
736 rpc
->udp_dest
= malloc(ai
->ai_addrlen
);
737 if (rpc
->udp_dest
== NULL
) {
738 rpc_set_error(rpc
, "Out of memory. Failed to allocate sockaddr structure");
742 memcpy(rpc
->udp_dest
, ai
->ai_addr
, ai
->ai_addrlen
);
745 rpc
->is_broadcast
= is_broadcast
;
746 setsockopt(rpc
->fd
, SOL_SOCKET
, SO_BROADCAST
, (char *)&is_broadcast
, sizeof(is_broadcast
));
751 struct sockaddr
*rpc_get_recv_sockaddr(struct rpc_context
*rpc
)
753 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
755 return (struct sockaddr
*)&rpc
->udp_src
;
758 int rpc_queue_length(struct rpc_context
*rpc
)
764 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
766 for(pdu
= rpc
->outqueue
.head
; pdu
; pdu
= pdu
->next
) {
770 for (n
= 0; n
< HASHES
; n
++) {
771 struct rpc_queue
*q
= &rpc
->waitpdu
[n
];
773 for(pdu
= q
->head
; pdu
; pdu
= pdu
->next
)
779 void rpc_set_fd(struct rpc_context
*rpc
, int fd
)
781 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);