503935c0a059e4dfd2da33eb356b3628f484b4e5
2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
30 #include <arpa/inet.h>
31 #ifdef HAVE_SYS_FILIO_H
32 #include <sys/filio.h>
34 #include <sys/ioctl.h>
35 #include <sys/types.h>
36 #include <sys/socket.h>
39 #include "libnfs-raw.h"
40 #include "libnfs-private.h"
43 static int rpc_disconnect_requeue(struct rpc_context
*rpc
);
45 static void set_nonblocking(int fd
)
48 v
= fcntl(fd
, F_GETFL
, 0);
49 fcntl(fd
, F_SETFL
, v
| O_NONBLOCK
);
52 int rpc_get_fd(struct rpc_context
*rpc
)
57 int rpc_which_events(struct rpc_context
*rpc
)
59 int events
= rpc
->is_connected
? POLLIN
: POLLOUT
;
61 if (rpc
->is_udp
!= 0) {
62 /* for udp sockets we only wait for pollin */
72 static int rpc_write_to_socket(struct rpc_context
*rpc
)
80 rpc_set_error(rpc
, "trying to write but not connected");
84 while (rpc
->outqueue
!= NULL
) {
87 total
= rpc
->outqueue
->outdata
.size
;
89 count
= write(rpc
->fd
, rpc
->outqueue
->outdata
.data
+ rpc
->outqueue
->written
, total
- rpc
->outqueue
->written
);
91 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
94 rpc_set_error(rpc
, "Error when writing to socket :%s(%d)", strerror(errno
), errno
);
98 rpc
->outqueue
->written
+= count
;
99 if (rpc
->outqueue
->written
== total
) {
100 struct rpc_pdu
*pdu
= rpc
->outqueue
;
102 SLIST_REMOVE(&rpc
->outqueue
, pdu
);
103 SLIST_ADD_END(&rpc
->waitpdu
, pdu
);
109 static int rpc_read_from_socket(struct rpc_context
*rpc
)
116 if (ioctl(rpc
->fd
, FIONREAD
, &available
) != 0) {
117 rpc_set_error(rpc
, "Ioctl FIONREAD returned error : %d. Closing socket.", errno
);
120 if (available
== 0) {
121 rpc_set_error(rpc
, "Socket has been closed");
127 socklen_t socklen
= sizeof(rpc
->udp_src
);
129 buf
= malloc(available
);
131 rpc_set_error(rpc
, "Failed to malloc buffer for recvfrom");
134 count
= recvfrom(rpc
->fd
, buf
, available
, MSG_DONTWAIT
, (struct sockaddr
*)&rpc
->udp_src
, &socklen
);
136 rpc_set_error(rpc
, "Failed recvfrom: %s", strerror(errno
));
139 if (rpc_process_pdu(rpc
, buf
, count
) != 0) {
140 rpc_set_error(rpc
, "Invalid/garbage pdu received from server. Ignoring PDU");
148 /* read record marker, 4 bytes at the beginning of every pdu */
149 if (rpc
->inbuf
== NULL
) {
151 rpc
->inbuf
= malloc(rpc
->insize
);
152 if (rpc
->inbuf
== NULL
) {
153 rpc_set_error(rpc
, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno
);
157 if (rpc
->inpos
< 4) {
158 size
= 4 - rpc
->inpos
;
160 count
= read(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
);
162 if (errno
== EINTR
) {
165 rpc_set_error(rpc
, "Read from socket failed, errno:%d. Closing socket.", errno
);
172 if (available
== 0) {
176 pdu_size
= rpc_get_pdu_size(rpc
->inbuf
);
177 if (rpc
->insize
< pdu_size
) {
180 buf
= malloc(pdu_size
);
182 rpc_set_error(rpc
, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size
, errno
);
185 memcpy(buf
, rpc
->inbuf
, rpc
->insize
);
188 rpc
->insize
= rpc_get_pdu_size(rpc
->inbuf
);
192 if (size
> rpc
->insize
- rpc
->inpos
) {
193 size
= rpc
->insize
- rpc
->inpos
;
196 count
= read(rpc
->fd
, rpc
->inbuf
+ rpc
->inpos
, size
);
198 if (errno
== EINTR
) {
201 rpc_set_error(rpc
, "Read from socket failed, errno:%d. Closing socket.", errno
);
207 if (rpc
->inpos
== rpc
->insize
) {
208 if (rpc_process_pdu(rpc
, rpc
->inbuf
, pdu_size
) != 0) {
209 rpc_set_error(rpc
, "Invalid/garbage pdu received from server. Closing socket");
223 int rpc_service(struct rpc_context
*rpc
, int revents
)
225 if (revents
& POLLERR
) {
227 socklen_t err_size
= sizeof(err
);
229 if (getsockopt(rpc
->fd
, SOL_SOCKET
, SO_ERROR
,
230 &err
, &err_size
) != 0 || err
!= 0) {
234 rpc_set_error(rpc
, "rpc_service: socket error "
238 rpc_set_error(rpc
, "rpc_service: POLLERR, "
239 "Unknown socket error.");
241 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
, rpc
->error_string
, rpc
->connect_data
);
244 if (revents
& POLLHUP
) {
245 rpc_set_error(rpc
, "Socket failed with POLLHUP");
246 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
, rpc
->error_string
, rpc
->connect_data
);
250 if (rpc
->is_connected
== 0 && rpc
->fd
!= -1 && revents
&POLLOUT
) {
252 socklen_t err_size
= sizeof(err
);
254 if (getsockopt(rpc
->fd
, SOL_SOCKET
, SO_ERROR
,
255 &err
, &err_size
) != 0 || err
!= 0) {
259 rpc_set_error(rpc
, "rpc_service: socket error "
260 "%s(%d) while connecting.",
262 rpc
->connect_cb(rpc
, RPC_STATUS_ERROR
,
263 NULL
, rpc
->connect_data
);
267 rpc
->is_connected
= 1;
268 rpc
->connect_cb(rpc
, RPC_STATUS_SUCCESS
, NULL
, rpc
->connect_data
);
272 if (revents
& POLLIN
) {
273 if (rpc_read_from_socket(rpc
) != 0) {
274 rpc_disconnect_requeue(rpc
);
279 if (revents
& POLLOUT
&& rpc
->outqueue
!= NULL
) {
280 if (rpc_write_to_socket(rpc
) != 0) {
281 rpc_set_error(rpc
, "write to socket failed");
290 int rpc_connect_async(struct rpc_context
*rpc
, const char *server
, int port
, rpc_cb cb
, void *private_data
)
292 struct sockaddr_storage s
;
293 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&s
;
297 rpc_set_error(rpc
, "Trying to connect while already connected");
301 if (rpc
->is_udp
!= 0) {
302 rpc_set_error(rpc
, "Trying to connect on UDP socket");
306 sin
->sin_family
= AF_INET
;
307 sin
->sin_port
= htons(port
);
308 if (inet_pton(AF_INET
, server
, &sin
->sin_addr
) != 1) {
309 rpc_set_error(rpc
, "Not a valid server ip address");
313 switch (s
.ss_family
) {
315 socksize
= sizeof(struct sockaddr_in
);
316 #ifdef HAVE_SOCKADDR_LEN
317 sin
->sin_len
= socksize
;
319 rpc
->fd
= socket(AF_INET
, SOCK_STREAM
, 0);
324 rpc_set_error(rpc
, "Failed to open socket");
328 rpc
->connect_cb
= cb
;
329 rpc
->connect_data
= private_data
;
331 set_nonblocking(rpc
->fd
);
333 if (connect(rpc
->fd
, (struct sockaddr
*)&s
, socksize
) != 0 && errno
!= EINPROGRESS
) {
334 rpc_set_error(rpc
, "connect() to server failed");
341 int rpc_disconnect(struct rpc_context
*rpc
, char *error
)
348 rpc
->is_connected
= 0;
350 rpc_error_all_pdus(rpc
, error
);
355 /* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue */
356 static int rpc_disconnect_requeue(struct rpc_context
*rpc
)
365 rpc
->is_connected
= 0;
367 /* socket is closed so we will not get any replies to any commands
368 * in flight. Move them all over from the waitpdu queue back to the out queue
370 for (pdu
=rpc
->waitpdu
; pdu
; pdu
=pdu
->next
) {
371 SLIST_REMOVE(&rpc
->waitpdu
, pdu
);
372 SLIST_ADD(&rpc
->outqueue
, pdu
);
379 int rpc_bind_udp(struct rpc_context
*rpc
, char *addr
, int port
)
381 struct addrinfo
*ai
= NULL
;
384 if (rpc
->is_udp
== 0) {
385 rpc_set_error(rpc
, "Cant not bind UDP. Not UDP context");
389 snprintf(service
, 6, "%d", port
);
390 if (getaddrinfo(addr
, service
, NULL
, &ai
) != 0) {
391 rpc_set_error(rpc
, "Invalid address:%s. "
392 "Can not resolv into IPv4/v6 structure.");
396 switch(ai
->ai_family
) {
398 rpc
->fd
= socket(ai
->ai_family
, SOCK_DGRAM
, 0);
400 rpc_set_error(rpc
, "Failed to create UDP socket: %s", strerror(errno
));
405 if (bind(rpc
->fd
, (struct sockaddr
*)ai
->ai_addr
, sizeof(struct sockaddr_in
)) != 0) {
406 rpc_set_error(rpc
, "Failed to bind to UDP socket: %s",strerror(errno
));
412 rpc_set_error(rpc
, "Can not handle UPD sockets of family %d yet", ai
->ai_family
);
422 int rpc_set_udp_destination(struct rpc_context
*rpc
, char *addr
, int port
, int is_broadcast
)
424 struct addrinfo
*ai
= NULL
;
427 if (rpc
->is_udp
== 0) {
428 rpc_set_error(rpc
, "Can not set destination sockaddr. Not UDP context");
432 snprintf(service
, 6, "%d", port
);
433 if (getaddrinfo(addr
, service
, NULL
, &ai
) != 0) {
434 rpc_set_error(rpc
, "Invalid address:%s. "
435 "Can not resolv into IPv4/v6 structure.");
441 rpc
->udp_dest
= NULL
;
443 rpc
->udp_dest
= malloc(ai
->ai_addrlen
);
444 if (rpc
->udp_dest
== NULL
) {
445 rpc_set_error(rpc
, "Out of memory. Failed to allocate sockaddr structure");
448 memcpy(rpc
->udp_dest
, ai
->ai_addr
, ai
->ai_addrlen
);
451 rpc
->is_broadcast
= is_broadcast
;
452 setsockopt(rpc
->fd
, SOL_SOCKET
, SO_BROADCAST
, &is_broadcast
, sizeof(is_broadcast
));
457 struct sockaddr
*rpc_get_recv_sockaddr(struct rpc_context
*rpc
)
459 return (struct sockaddr
*)&rpc
->udp_src
;