2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
18 #include "win32_compat.h"
21 #include <arpa/inet.h>
22 #include <sys/ioctl.h>
23 #include <sys/socket.h>
32 #include "aros_compat.h"
45 #ifdef HAVE_SYS_FILIO_H
46 #include <sys/filio.h>
48 #ifdef HAVE_SYS_SOCKIO_H
49 #include <sys/sockio.h>
51 #include <sys/types.h>
52 #include "libnfs-zdr.h"
54 #include "libnfs-raw.h"
55 #include "libnfs-private.h"
59 //has to be included after stdlib!!
60 #include "win32_errnowrapper.h"
64 static int rpc_reconnect_requeue(struct rpc_context
*rpc
);
65 static int rpc_connect_sockaddr_async(struct rpc_context
*rpc
, struct sockaddr_storage
*s
);
67 static void set_nonblocking(int fd
)
72 v
= ioctlsocket(fd
, FIONBIO
,&nonblocking
);
74 v
= fcntl(fd
, F_GETFL
, 0);
75 fcntl(fd
, F_SETFL
, v
| O_NONBLOCK
);
79 int rpc_get_fd(struct rpc_context
*rpc
)
81 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
86 int rpc_which_events(struct rpc_context
*rpc
)
90 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
92 events
= rpc
->is_connected
? POLLIN
: POLLOUT
;
94 if (rpc
->is_udp
!= 0) {
95 /* for udp sockets we only wait for pollin */
105 static int rpc_write_to_socket(struct rpc_context
*rpc
)
109 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
112 rpc_set_error(rpc
, "trying to write but not connected");
116 while (rpc
->outqueue
!= NULL
) {
119 total
= rpc
->outqueue
->outdata
.size
;
122 count
= send(rpc
->fd
, rpc
->outqueue
->outdata
.data
+ rpc
->outqueue
->written
, total
- rpc
->outqueue
->written
, 0);
124 count
= write(rpc
->fd
, rpc
->outqueue
->outdata
.data
+ rpc
->outqueue
->written
, total
- rpc
->outqueue
->written
);
127 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
130 rpc_set_error(rpc
, "Error when writing to socket :%s(%d)", strerror(errno
), errno
);
134 rpc
->outqueue
->written
+= count
;
135 if (rpc
->outqueue
->written
== total
) {
136 struct rpc_pdu
*pdu
= rpc
->outqueue
;
138 SLIST_REMOVE(&rpc
->outqueue
, pdu
);
139 SLIST_ADD_END(&rpc
->waitpdu
, pdu
);
145 static int rpc_read_from_socket(struct rpc_context
*rpc
)
152 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
154 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
157 if (ioctlsocket(rpc
->fd
, FIONREAD
, &available
) != 0) {
159 if (ioctl(rpc
->fd
, FIONREAD
, &available
) != 0) {
161 rpc_set_error(rpc
, "Ioctl FIONREAD returned error : %d. Closing socket.", errno
);
165 if (available
== 0) {
166 rpc_set_error(rpc
, "Socket has been closed");
172 socklen_t socklen
= sizeof(rpc
->udp_src
);
174 buf
= malloc(available
);
176 rpc_set_error(rpc
, "Failed to malloc buffer for recvfrom");
179 count
= recvfrom(rpc
->fd
, buf
, available
, MSG_DONTWAIT
, (struct sockaddr
*)&rpc
->udp_src
, &socklen
);
181 rpc_set_error(rpc
, "Failed recvfrom: %s", strerror(errno
));
184 if (rpc_process_pdu(rpc
, buf
, count
) != 0) {
185 rpc_set_error(rpc
, "Invalid/garbage pdu received from server. Ignoring PDU");
193 /* read record marker, 4 bytes at the beginning of every pdu */
194 if (rpc
->inbuf
== NULL
) {
196 rpc
->inbuf
= malloc(rpc
->insize
);
197 if (rpc
->inbuf
== NULL
) {
198 rpc_set_error(rpc
, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno
);
202 if (rpc
->inpos
< 4) {
203 size
= 4 - rpc
->inpos
;
206 count
= recv(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
, 0);
208 count
= read(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
);
211 if (errno
== EINTR
) {
214 rpc_set_error(rpc
, "Read from socket failed, errno:%d. Closing socket.", errno
);
221 if (available
== 0) {
225 pdu_size
= rpc_get_pdu_size(rpc
->inbuf
);
226 if (rpc
->insize
< pdu_size
) {
229 buf
= malloc(pdu_size
);
231 rpc_set_error(rpc
, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size
, errno
);
234 memcpy(buf
, rpc
->inbuf
, rpc
->insize
);
237 rpc
->insize
= rpc_get_pdu_size(rpc
->inbuf
);
241 if (size
> rpc
->insize
- rpc
->inpos
) {
242 size
= rpc
->insize
- rpc
->inpos
;
246 count
= recv(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
, 0);
248 count
= read(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
);
251 if (errno
== EINTR
) {
254 rpc_set_error(rpc
, "Read from socket failed, errno:%d. Closing socket.", errno
);
260 if (rpc
->inpos
== rpc
->insize
) {
261 if (rpc_process_pdu(rpc
, rpc
->inbuf
, pdu_size
) != 0) {
262 rpc_set_error(rpc
, "Invalid/garbage pdu received from server. Closing socket");
276 int rpc_service(struct rpc_context
*rpc
, int revents
)
278 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
280 if (revents
& POLLERR
) {
286 socklen_t err_size
= sizeof(err
);
288 if (getsockopt(rpc
->fd
, SOL_SOCKET
, SO_ERROR
,
289 (char *)&err
, &err_size
) != 0 || err
!= 0) {
293 rpc_set_error(rpc
, "rpc_service: socket error "
297 rpc_set_error(rpc
, "rpc_service: POLLERR, "
298 "Unknown socket error.");
300 if (rpc
->connect_cb
!= NULL
) {
301 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
, rpc
->error_string
, rpc
->connect_data
);
305 if (revents
& POLLHUP
) {
306 rpc_set_error(rpc
, "Socket failed with POLLHUP");
307 if (rpc
->connect_cb
!= NULL
) {
308 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
, rpc
->error_string
, rpc
->connect_data
);
313 if (rpc
->is_connected
== 0 && rpc
->fd
!= -1 && revents
&POLLOUT
) {
315 socklen_t err_size
= sizeof(err
);
317 if (getsockopt(rpc
->fd
, SOL_SOCKET
, SO_ERROR
,
318 (char *)&err
, &err_size
) != 0 || err
!= 0) {
322 rpc_set_error(rpc
, "rpc_service: socket error "
323 "%s(%d) while connecting.",
325 if (rpc
->connect_cb
!= NULL
) {
326 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
,
327 NULL
, rpc
->connect_data
);
332 rpc
->is_connected
= 1;
333 if (rpc
->connect_cb
!= NULL
) {
334 rpc
->connect_cb(rpc
, RPC_STATUS_SUCCESS
, NULL
, rpc
->connect_data
);
339 if (revents
& POLLIN
) {
340 if (rpc_read_from_socket(rpc
) != 0) {
341 rpc_reconnect_requeue(rpc
);
346 if (revents
& POLLOUT
&& rpc
->outqueue
!= NULL
) {
347 if (rpc_write_to_socket(rpc
) != 0) {
348 rpc_set_error(rpc
, "write to socket failed");
356 void rpc_set_autoreconnect(struct rpc_context
*rpc
)
358 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
360 rpc
->auto_reconnect
= 1;
363 void rpc_unset_autoreconnect(struct rpc_context
*rpc
)
365 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
367 rpc
->auto_reconnect
= 0;
370 static int rpc_connect_sockaddr_async(struct rpc_context
*rpc
, struct sockaddr_storage
*s
)
374 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
376 switch (s
->ss_family
) {
378 socksize
= sizeof(struct sockaddr_in
);
379 rpc
->fd
= socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
382 rpc_set_error(rpc
, "Can not handle AF_FAMILY:%d", s
->ss_family
);
387 rpc_set_error(rpc
, "Failed to open socket");
391 /* Some systems allow you to set capabilities on an executable
392 * to allow the file to be executed with privilege to bind to
393 * privileged system ports, even if the user is not root.
395 * Opportunistically try to bind the socket to a low numbered
396 * system port in the hope that the user is either root or the
397 * executable has the CAP_NET_BIND_SERVICE.
399 * As soon as we fail the bind() with EACCES we know we will never
400 * be able to bind to a system port so we terminate the loop.
403 * sudo setcap 'cap_net_bind_service=+ep' /path/executable
404 * to make the executable able to bind to a system port.
406 * On Windows, there is no concept of privileged ports. Thus
407 * binding will usually succeed.
410 struct sockaddr_in sin
;
411 static int portOfs
= 0;
412 const int firstPort
= 512; /* >= 512 according to Sun docs */
413 const int portCount
= IPPORT_RESERVED
- firstPort
;
414 int startOfs
= portOfs
, port
, rc
;
418 port
= htons(firstPort
+ portOfs
);
419 portOfs
= (portOfs
+ 1) % portCount
;
421 /* skip well-known ports */
422 if (!getservbyport(port
, "tcp")) {
423 memset(&sin
, 0, sizeof(sin
));
425 sin
.sin_family
= AF_INET
;
426 sin
.sin_addr
.s_addr
= 0;
428 rc
= bind(rpc
->fd
, (struct sockaddr
*)&sin
, sizeof(struct sockaddr_in
));
430 /* we got EACCES, so don't try again */
431 if (rc
!= 0 && errno
== EACCES
)
435 } while (rc
!= 0 && portOfs
!= startOfs
);
438 set_nonblocking(rpc
->fd
);
440 if (connect(rpc
->fd
, (struct sockaddr
*)s
, socksize
) != 0 && errno
!= EINPROGRESS
) {
441 rpc_set_error(rpc
, "connect() to server failed. %s(%d)", strerror(errno
), errno
);
448 int rpc_connect_async(struct rpc_context
*rpc
, const char *server
, int port
, rpc_cb cb
, void *private_data
)
450 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&rpc
->s
;
452 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
455 rpc_set_error(rpc
, "Trying to connect while already connected");
459 if (rpc
->is_udp
!= 0) {
460 rpc_set_error(rpc
, "Trying to connect on UDP socket");
464 rpc
->auto_reconnect
= 0;
466 sin
->sin_family
= AF_INET
;
467 sin
->sin_port
= htons(port
);
468 if (inet_pton(AF_INET
, server
, &sin
->sin_addr
) != 1) {
469 rpc_set_error(rpc
, "Not a valid server ip address");
474 switch (rpc
->s
.ss_family
) {
476 #ifdef HAVE_SOCKADDR_LEN
477 sin
->sin_len
= sizeof(struct sockaddr_in
);
482 rpc
->connect_cb
= cb
;
483 rpc
->connect_data
= private_data
;
485 if (rpc_connect_sockaddr_async(rpc
, &rpc
->s
) != 0) {
492 int rpc_disconnect(struct rpc_context
*rpc
, char *error
)
494 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
496 rpc_unset_autoreconnect(rpc
);
500 closesocket(rpc
->fd
);
507 rpc
->is_connected
= 0;
509 rpc_error_all_pdus(rpc
, error
);
514 static void reconnect_cb(struct rpc_context
*rpc
, int status
, void *data _U_
, void *private_data
)
516 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
518 if (status
!= RPC_STATUS_SUCCESS
) {
519 rpc_error_all_pdus(rpc
, "RPC ERROR: Failed to reconnect async");
523 rpc
->is_connected
= 1;
524 rpc
->connect_cb
= NULL
;
527 /* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
528 static int rpc_reconnect_requeue(struct rpc_context
*rpc
)
532 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
536 closesocket(rpc
->fd
);
543 rpc
->is_connected
= 0;
545 /* socket is closed so we will not get any replies to any commands
546 * in flight. Move them all over from the waitpdu queue back to the out queue
548 for (pdu
=rpc
->waitpdu
; pdu
; pdu
=pdu
->next
) {
549 SLIST_REMOVE(&rpc
->waitpdu
, pdu
);
550 SLIST_ADD(&rpc
->outqueue
, pdu
);
551 /* we have to re-send the whole pdu again */
555 if (rpc
->auto_reconnect
!= 0) {
556 rpc
->connect_cb
= reconnect_cb
;
558 if (rpc_connect_sockaddr_async(rpc
, &rpc
->s
) != 0) {
559 rpc_error_all_pdus(rpc
, "RPC ERROR: Failed to reconnect async");
568 int rpc_bind_udp(struct rpc_context
*rpc
, char *addr
, int port
)
570 struct addrinfo
*ai
= NULL
;
573 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
575 if (rpc
->is_udp
== 0) {
576 rpc_set_error(rpc
, "Cant not bind UDP. Not UDP context");
580 sprintf(service
, "%d", port
);
581 if (getaddrinfo(addr
, service
, NULL
, &ai
) != 0) {
582 rpc_set_error(rpc
, "Invalid address:%s. "
583 "Can not resolv into IPv4/v6 structure.");
587 switch(ai
->ai_family
) {
589 rpc
->fd
= socket(ai
->ai_family
, SOCK_DGRAM
, 0);
591 rpc_set_error(rpc
, "Failed to create UDP socket: %s", strerror(errno
));
596 if (bind(rpc
->fd
, (struct sockaddr
*)ai
->ai_addr
, sizeof(struct sockaddr_in
)) != 0) {
597 rpc_set_error(rpc
, "Failed to bind to UDP socket: %s",strerror(errno
));
603 rpc_set_error(rpc
, "Can not handle UPD sockets of family %d yet", ai
->ai_family
);
613 int rpc_set_udp_destination(struct rpc_context
*rpc
, char *addr
, int port
, int is_broadcast
)
615 struct addrinfo
*ai
= NULL
;
618 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
620 if (rpc
->is_udp
== 0) {
621 rpc_set_error(rpc
, "Can not set destination sockaddr. Not UDP context");
625 sprintf(service
, "%d", port
);
626 if (getaddrinfo(addr
, service
, NULL
, &ai
) != 0) {
627 rpc_set_error(rpc
, "Invalid address:%s. "
628 "Can not resolv into IPv4/v6 structure.");
634 rpc
->udp_dest
= NULL
;
636 rpc
->udp_dest
= malloc(ai
->ai_addrlen
);
637 if (rpc
->udp_dest
== NULL
) {
638 rpc_set_error(rpc
, "Out of memory. Failed to allocate sockaddr structure");
642 memcpy(rpc
->udp_dest
, ai
->ai_addr
, ai
->ai_addrlen
);
645 rpc
->is_broadcast
= is_broadcast
;
646 setsockopt(rpc
->fd
, SOL_SOCKET
, SO_BROADCAST
, (char *)&is_broadcast
, sizeof(is_broadcast
));
651 struct sockaddr
*rpc_get_recv_sockaddr(struct rpc_context
*rpc
)
653 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
655 return (struct sockaddr
*)&rpc
->udp_src
;
658 int rpc_queue_length(struct rpc_context
*rpc
)
663 assert(rpc
->magic
== RPC_CONTEXT_MAGIC
);
665 for(pdu
= rpc
->outqueue
; pdu
; pdu
= pdu
->next
) {
668 for(pdu
= rpc
->waitpdu
; pdu
; pdu
= pdu
->next
) {