Print usage when missing mandatory arguments
[deb_libnfs.git] / lib / socket.c
1 /*
2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
16 */
17 #ifdef HAVE_CONFIG_H
18 #include "config.h"
19 #endif
20
21 #ifdef AROS
22 #include "aros_compat.h"
23 #endif
24
25 #ifdef WIN32
26 #include "win32_compat.h"
27 #endif
28
29 #ifdef HAVE_ARPA_INET_H
30 #include <arpa/inet.h>
31 #endif
32
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #endif
36
37 #ifdef HAVE_UNISTD_H
38 #include <unistd.h>
39 #endif
40
41 #ifdef HAVE_SYS_IOCTL_H
42 #include <sys/ioctl.h>
43 #endif
44
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
47 #endif
48
49 #ifdef HAVE_NETINET_IN_H
50 #include <netinet/in.h>
51 #endif
52
53 #ifdef HAVE_NETINET_TCP_H
54 #include <netinet/tcp.h>
55 #endif
56
57 #ifdef HAVE_NETDB_H
58 #include <netdb.h>
59 #endif
60
61 #ifdef HAVE_SYS_FILIO_H
62 #include <sys/filio.h>
63 #endif
64
65 #ifdef HAVE_SYS_SOCKIO_H
66 #include <sys/sockio.h>
67 #endif
68
69 #include <stdio.h>
70 #include <stdlib.h>
71 #include <assert.h>
72 #include <fcntl.h>
73 #include <string.h>
74 #include <errno.h>
75 #include <time.h>
76 #include <sys/types.h>
77 #include "libnfs-zdr.h"
78 #include "libnfs.h"
79 #include "libnfs-raw.h"
80 #include "libnfs-private.h"
81 #include "slist.h"
82
83 #ifdef WIN32
84 //has to be included after stdlib!!
85 #include "win32_errnowrapper.h"
86 #endif
87
88 static int rpc_reconnect_requeue(struct rpc_context *rpc);
89 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
90
91 static void set_nonblocking(int fd)
92 {
93 int v = 0;
94 #if defined(WIN32)
95 long nonblocking=1;
96 v = ioctl(fd, FIONBIO, &nonblocking);
97 #else
98 v = fcntl(fd, F_GETFL, 0);
99 fcntl(fd, F_SETFL, v | O_NONBLOCK);
100 #endif //FIXME
101 }
102
103 static void set_nolinger(int fd)
104 {
105 struct linger lng;
106 lng.l_onoff = 1;
107 lng.l_linger = 0;
108 setsockopt(fd, SOL_SOCKET, SO_LINGER, &lng, sizeof(lng));
109 }
110
111 #ifdef HAVE_NETINET_TCP_H
112 int set_tcp_sockopt(int sockfd, int optname, int value)
113 {
114 int level;
115
116 #if defined(__FreeBSD__) || defined(__sun) || (defined(__APPLE__) && defined(__MACH__))
117 struct protoent *buf;
118
119 if ((buf = getprotobyname("tcp")) != NULL)
120 level = buf->p_proto;
121 else
122 return -1;
123 #else
124 level = SOL_TCP;
125 #endif
126
127 return setsockopt(sockfd, level, optname, (char *)&value, sizeof(value));
128 }
129 #endif
130
131 int rpc_get_fd(struct rpc_context *rpc)
132 {
133 assert(rpc->magic == RPC_CONTEXT_MAGIC);
134
135 return rpc->fd;
136 }
137
138 static int rpc_has_queue(struct rpc_queue *q)
139 {
140 return q->head != NULL;
141 }
142
143 int rpc_which_events(struct rpc_context *rpc)
144 {
145 int events;
146
147 assert(rpc->magic == RPC_CONTEXT_MAGIC);
148
149 events = rpc->is_connected ? POLLIN : POLLOUT;
150
151 if (rpc->is_udp != 0) {
152 /* for udp sockets we only wait for pollin */
153 return POLLIN;
154 }
155
156 if (rpc_has_queue(&rpc->outqueue)) {
157 events |= POLLOUT;
158 }
159 return events;
160 }
161
162 static int rpc_write_to_socket(struct rpc_context *rpc)
163 {
164 int32_t count;
165 struct rpc_pdu *pdu;
166
167 assert(rpc->magic == RPC_CONTEXT_MAGIC);
168
169 if (rpc->fd == -1) {
170 rpc_set_error(rpc, "trying to write but not connected");
171 return -1;
172 }
173
174 while ((pdu = rpc->outqueue.head) != NULL) {
175 int64_t total;
176
177 total = pdu->outdata.size;
178
179 count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
180 if (count == -1) {
181 if (errno == EAGAIN || errno == EWOULDBLOCK) {
182 return 0;
183 }
184 rpc_set_error(rpc, "Error when writing to socket :%s(%d)", strerror(errno), errno);
185 return -1;
186 }
187
188 pdu->written += count;
189 if (pdu->written == total) {
190 unsigned int hash;
191
192 rpc->outqueue.head = pdu->next;
193 if (pdu->next == NULL)
194 rpc->outqueue.tail = NULL;
195
196 hash = rpc_hash_xid(pdu->xid);
197 rpc_enqueue(&rpc->waitpdu[hash], pdu);
198 }
199 }
200 return 0;
201 }
202
203 static int rpc_read_from_socket(struct rpc_context *rpc)
204 {
205 int available;
206 int size;
207 int pdu_size;
208 int32_t count;
209
210 assert(rpc->magic == RPC_CONTEXT_MAGIC);
211
212 if (ioctl(rpc->fd, FIONREAD, &available) != 0) {
213 rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno);
214 return -1;
215 }
216
217 if (available == 0) {
218 rpc_set_error(rpc, "Socket has been closed");
219 return -1;
220 }
221
222 if (rpc->is_udp) {
223 char *buf;
224 socklen_t socklen = sizeof(rpc->udp_src);
225
226 buf = malloc(available);
227 if (buf == NULL) {
228 rpc_set_error(rpc, "Failed to malloc buffer for recvfrom");
229 return -1;
230 }
231 count = recvfrom(rpc->fd, buf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
232 if (count < 0) {
233 rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno));
234 free(buf);
235 return -1;
236 }
237 if (rpc_process_pdu(rpc, buf, count) != 0) {
238 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU");
239 free(buf);
240 return -1;
241 }
242 free(buf);
243 return 0;
244 }
245
246 /* read record marker, 4 bytes at the beginning of every pdu */
247 if (rpc->inbuf == NULL) {
248 rpc->insize = 4;
249 rpc->inbuf = malloc(rpc->insize);
250 if (rpc->inbuf == NULL) {
251 rpc_set_error(rpc, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno);
252 return -1;
253 }
254 }
255 if (rpc->inpos < 4) {
256 size = 4 - rpc->inpos;
257
258 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
259 if (count == -1) {
260 if (errno == EINTR) {
261 return 0;
262 }
263 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
264 return -1;
265 }
266 available -= count;
267 rpc->inpos += count;
268 }
269
270 if (available == 0) {
271 return 0;
272 }
273
274 pdu_size = rpc_get_pdu_size(rpc->inbuf);
275 if (rpc->insize < pdu_size) {
276 unsigned char *buf;
277
278 buf = malloc(pdu_size);
279 if (buf == NULL) {
280 rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
281 return -1;
282 }
283 memcpy(buf, rpc->inbuf, rpc->insize);
284 free(rpc->inbuf);
285 rpc->inbuf = buf;
286 rpc->insize = rpc_get_pdu_size(rpc->inbuf);
287 }
288
289 size = available;
290 if (size > rpc->insize - rpc->inpos) {
291 size = rpc->insize - rpc->inpos;
292 }
293
294 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
295 if (count == -1) {
296 if (errno == EINTR) {
297 return 0;
298 }
299 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
300 return -1;
301 }
302 available -= count;
303 rpc->inpos += count;
304
305 if (rpc->inpos == rpc->insize) {
306 char *buf = rpc->inbuf;
307
308 rpc->inbuf = NULL;
309 rpc->insize = 0;
310 rpc->inpos = 0;
311
312 if (rpc_process_pdu(rpc, buf, pdu_size) != 0) {
313 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
314 return -1;
315 }
316 free(buf);
317 }
318
319 return 0;
320 }
321
322
323
324 int rpc_service(struct rpc_context *rpc, int revents)
325 {
326 assert(rpc->magic == RPC_CONTEXT_MAGIC);
327
328 if (revents & POLLERR) {
329 #ifdef WIN32
330 char err = 0;
331 #else
332 int err = 0;
333 #endif
334 socklen_t err_size = sizeof(err);
335
336 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
337 (char *)&err, &err_size) != 0 || err != 0) {
338 if (err == 0) {
339 err = errno;
340 }
341 rpc_set_error(rpc, "rpc_service: socket error "
342 "%s(%d).",
343 strerror(err), err);
344 } else {
345 rpc_set_error(rpc, "rpc_service: POLLERR, "
346 "Unknown socket error.");
347 }
348 if (rpc->connect_cb != NULL) {
349 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
350 }
351 return -1;
352 }
353 if (revents & POLLHUP) {
354 rpc_set_error(rpc, "Socket failed with POLLHUP");
355 if (rpc->connect_cb != NULL) {
356 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
357 }
358 return -1;
359 }
360
361 if (rpc->is_connected == 0 && rpc->fd != -1 && revents&POLLOUT) {
362 int err = 0;
363 socklen_t err_size = sizeof(err);
364
365 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
366 (char *)&err, &err_size) != 0 || err != 0) {
367 if (err == 0) {
368 err = errno;
369 }
370 rpc_set_error(rpc, "rpc_service: socket error "
371 "%s(%d) while connecting.",
372 strerror(err), err);
373 if (rpc->connect_cb != NULL) {
374 rpc->connect_cb(rpc, RPC_STATUS_ERROR,
375 NULL, rpc->connect_data);
376 }
377 return -1;
378 }
379
380 rpc->is_connected = 1;
381 if (rpc->connect_cb != NULL) {
382 rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
383 }
384 return 0;
385 }
386
387 if (revents & POLLIN) {
388 if (rpc_read_from_socket(rpc) != 0) {
389 rpc_reconnect_requeue(rpc);
390 return 0;
391 }
392 }
393
394 if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
395 if (rpc_write_to_socket(rpc) != 0) {
396 rpc_set_error(rpc, "write to socket failed");
397 return -1;
398 }
399 }
400
401 return 0;
402 }
403
404 void rpc_set_autoreconnect(struct rpc_context *rpc)
405 {
406 assert(rpc->magic == RPC_CONTEXT_MAGIC);
407
408 rpc->auto_reconnect = 1;
409 }
410
411 void rpc_unset_autoreconnect(struct rpc_context *rpc)
412 {
413 assert(rpc->magic == RPC_CONTEXT_MAGIC);
414
415 rpc->auto_reconnect = 0;
416 }
417
418 void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v)
419 {
420 assert(rpc->magic == RPC_CONTEXT_MAGIC);
421
422 rpc->tcp_syncnt = v;
423 }
424
425 #ifndef TCP_SYNCNT
426 #define TCP_SYNCNT 7
427 #endif
428
429 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s)
430 {
431 int socksize;
432
433 assert(rpc->magic == RPC_CONTEXT_MAGIC);
434
435 switch (s->ss_family) {
436 case AF_INET:
437 socksize = sizeof(struct sockaddr_in);
438 rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
439 #ifdef HAVE_NETINET_TCP_H
440 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
441 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
442 }
443 #endif
444 break;
445 case AF_INET6:
446 socksize = sizeof(struct sockaddr_in6);
447 rpc->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
448 #ifdef HAVE_NETINET_TCP_H
449 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
450 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
451 }
452 #endif
453 break;
454 default:
455 rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family);
456 return -1;
457 }
458
459 if (rpc->fd == -1) {
460 rpc_set_error(rpc, "Failed to open socket");
461 return -1;
462 }
463
464 /* Some systems allow you to set capabilities on an executable
465 * to allow the file to be executed with privilege to bind to
466 * privileged system ports, even if the user is not root.
467 *
468 * Opportunistically try to bind the socket to a low numbered
469 * system port in the hope that the user is either root or the
470 * executable has the CAP_NET_BIND_SERVICE.
471 *
472 * As soon as we fail the bind() with EACCES we know we will never
473 * be able to bind to a system port so we terminate the loop.
474 *
475 * On linux, use
476 * sudo setcap 'cap_net_bind_service=+ep' /path/executable
477 * to make the executable able to bind to a system port.
478 *
479 * On Windows, there is no concept of privileged ports. Thus
480 * binding will usually succeed.
481 */
482 {
483 struct sockaddr_storage ss;
484 static int portOfs = 0;
485 const int firstPort = 512; /* >= 512 according to Sun docs */
486 const int portCount = IPPORT_RESERVED - firstPort;
487 int startOfs, port, rc;
488
489 if (portOfs == 0) {
490 portOfs = time(NULL) % 400;
491 }
492 startOfs = portOfs;
493 do {
494 rc = -1;
495 port = htons(firstPort + portOfs);
496 portOfs = (portOfs + 1) % portCount;
497
498 /* skip well-known ports */
499 if (!getservbyport(port, "tcp")) {
500 memset(&ss, 0, sizeof(ss));
501
502 switch (s->ss_family) {
503 case AF_INET:
504 ((struct sockaddr_in *)&ss)->sin_port = port;
505 ((struct sockaddr_in *)&ss)->sin_family = AF_INET;
506 #ifdef HAVE_SOCKADDR_LEN
507 ((struct sockaddr_in *)&ss)->sin_len = sizeof(struct sockaddr_in);
508 #endif
509 break;
510 case AF_INET6:
511 ((struct sockaddr_in6 *)&ss)->sin6_port = port;
512 ((struct sockaddr_in6 *)&ss)->sin6_family = AF_INET6;
513 #ifdef HAVE_SOCKADDR_LEN
514 ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr_in6);
515 #endif
516 break;
517 }
518
519 rc = bind(rpc->fd, (struct sockaddr *)&ss, socksize);
520 #if !defined(WIN32)
521 /* we got EACCES, so don't try again */
522 if (rc != 0 && errno == EACCES)
523 break;
524 #endif
525 }
526 } while (rc != 0 && portOfs != startOfs);
527 }
528
529 set_nonblocking(rpc->fd);
530 set_nolinger(rpc->fd);
531
532 if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
533 rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
534 return -1;
535 }
536
537 return 0;
538 }
539
540 int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
541 {
542 struct addrinfo *ai = NULL;
543
544 assert(rpc->magic == RPC_CONTEXT_MAGIC);
545
546 if (rpc->fd != -1) {
547 rpc_set_error(rpc, "Trying to connect while already connected");
548 return -1;
549 }
550
551 if (rpc->is_udp != 0) {
552 rpc_set_error(rpc, "Trying to connect on UDP socket");
553 return -1;
554 }
555
556 rpc->auto_reconnect = 0;
557
558 if (getaddrinfo(server, NULL, NULL, &ai) != 0) {
559 rpc_set_error(rpc, "Invalid address:%s. "
560 "Can not resolv into IPv4/v6 structure.", server);
561 return -1;
562 }
563
564 switch (ai->ai_family) {
565 case AF_INET:
566 ((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family;
567 ((struct sockaddr_in *)&rpc->s)->sin_port = htons(port);
568 ((struct sockaddr_in *)&rpc->s)->sin_addr = ((struct sockaddr_in *)(ai->ai_addr))->sin_addr;
569 #ifdef HAVE_SOCKADDR_LEN
570 ((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in);
571 #endif
572 break;
573 case AF_INET6:
574 ((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family;
575 ((struct sockaddr_in6 *)&rpc->s)->sin6_port = htons(port);
576 ((struct sockaddr_in6 *)&rpc->s)->sin6_addr = ((struct sockaddr_in6 *)(ai->ai_addr))->sin6_addr;
577 #ifdef HAVE_SOCKADDR_LEN
578 ((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6);
579 #endif
580 break;
581 }
582
583 rpc->connect_cb = cb;
584 rpc->connect_data = private_data;
585
586 freeaddrinfo(ai);
587
588 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
589 return -1;
590 }
591
592 return 0;
593 }
594
595 int rpc_disconnect(struct rpc_context *rpc, char *error)
596 {
597 assert(rpc->magic == RPC_CONTEXT_MAGIC);
598
599 rpc_unset_autoreconnect(rpc);
600
601 if (rpc->fd != -1) {
602 close(rpc->fd);
603 }
604 rpc->fd = -1;
605
606 rpc->is_connected = 0;
607
608 rpc_error_all_pdus(rpc, error);
609
610 return 0;
611 }
612
613 static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, void *private_data)
614 {
615 assert(rpc->magic == RPC_CONTEXT_MAGIC);
616
617 if (status != RPC_STATUS_SUCCESS) {
618 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
619 return;
620 }
621
622 rpc->is_connected = 1;
623 rpc->connect_cb = NULL;
624 }
625
626 /* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
627 static int rpc_reconnect_requeue(struct rpc_context *rpc)
628 {
629 struct rpc_pdu *pdu;
630 unsigned int i;
631
632 assert(rpc->magic == RPC_CONTEXT_MAGIC);
633
634 if (rpc->fd != -1) {
635 close(rpc->fd);
636 }
637 rpc->fd = -1;
638
639 rpc->is_connected = 0;
640
641 /* socket is closed so we will not get any replies to any commands
642 * in flight. Move them all over from the waitpdu queue back to the out queue
643 */
644 for (i = 0; i < HASHES; i++) {
645 struct rpc_queue *q = &rpc->waitpdu[i];
646
647 for (pdu=q->head; pdu; pdu=pdu->next) {
648 rpc_return_to_queue(&rpc->outqueue, pdu);
649 /* we have to re-send the whole pdu again */
650 pdu->written = 0;
651 }
652 rpc_reset_queue(q);
653 }
654
655 if (rpc->auto_reconnect != 0) {
656 rpc->connect_cb = reconnect_cb;
657
658 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
659 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
660 return -1;
661 }
662 }
663
664 return 0;
665 }
666
667
668 int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port)
669 {
670 struct addrinfo *ai = NULL;
671 char service[6];
672
673 assert(rpc->magic == RPC_CONTEXT_MAGIC);
674
675 if (rpc->is_udp == 0) {
676 rpc_set_error(rpc, "Cant not bind UDP. Not UDP context");
677 return -1;
678 }
679
680 sprintf(service, "%d", port);
681 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
682 rpc_set_error(rpc, "Invalid address:%s. "
683 "Can not resolv into IPv4/v6 structure.", addr);
684 return -1;
685 }
686
687 switch(ai->ai_family) {
688 case AF_INET:
689 rpc->fd = socket(ai->ai_family, SOCK_DGRAM, 0);
690 if (rpc->fd == -1) {
691 rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno));
692 freeaddrinfo(ai);
693 return -1;
694 }
695
696 if (bind(rpc->fd, (struct sockaddr *)ai->ai_addr, sizeof(struct sockaddr_in)) != 0) {
697 rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno));
698 freeaddrinfo(ai);
699 return -1;
700 }
701 break;
702 default:
703 rpc_set_error(rpc, "Can not handle UPD sockets of family %d yet", ai->ai_family);
704 freeaddrinfo(ai);
705 return -1;
706 }
707
708 freeaddrinfo(ai);
709
710 return 0;
711 }
712
713 int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int is_broadcast)
714 {
715 struct addrinfo *ai = NULL;
716 char service[6];
717
718 assert(rpc->magic == RPC_CONTEXT_MAGIC);
719
720 if (rpc->is_udp == 0) {
721 rpc_set_error(rpc, "Can not set destination sockaddr. Not UDP context");
722 return -1;
723 }
724
725 sprintf(service, "%d", port);
726 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
727 rpc_set_error(rpc, "Invalid address:%s. "
728 "Can not resolv into IPv4/v6 structure.", addr);
729 return -1;
730 }
731
732 if (rpc->udp_dest) {
733 free(rpc->udp_dest);
734 rpc->udp_dest = NULL;
735 }
736 rpc->udp_dest = malloc(ai->ai_addrlen);
737 if (rpc->udp_dest == NULL) {
738 rpc_set_error(rpc, "Out of memory. Failed to allocate sockaddr structure");
739 freeaddrinfo(ai);
740 return -1;
741 }
742 memcpy(rpc->udp_dest, ai->ai_addr, ai->ai_addrlen);
743 freeaddrinfo(ai);
744
745 rpc->is_broadcast = is_broadcast;
746 setsockopt(rpc->fd, SOL_SOCKET, SO_BROADCAST, (char *)&is_broadcast, sizeof(is_broadcast));
747
748 return 0;
749 }
750
751 struct sockaddr *rpc_get_recv_sockaddr(struct rpc_context *rpc)
752 {
753 assert(rpc->magic == RPC_CONTEXT_MAGIC);
754
755 return (struct sockaddr *)&rpc->udp_src;
756 }
757
758 int rpc_queue_length(struct rpc_context *rpc)
759 {
760 int i=0;
761 struct rpc_pdu *pdu;
762 unsigned int n;
763
764 assert(rpc->magic == RPC_CONTEXT_MAGIC);
765
766 for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
767 i++;
768 }
769
770 for (n = 0; n < HASHES; n++) {
771 struct rpc_queue *q = &rpc->waitpdu[n];
772
773 for(pdu = q->head; pdu; pdu = pdu->next)
774 i++;
775 }
776 return i;
777 }
778
779 void rpc_set_fd(struct rpc_context *rpc, int fd)
780 {
781 assert(rpc->magic == RPC_CONTEXT_MAGIC);
782
783 rpc->fd = fd;
784 }