[socket] - disable linger by setting SO_LINGER to 0 seconds
[deb_libnfs.git] / lib / socket.c
1 /*
2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
16 */
17 #ifdef HAVE_CONFIG_H
18 #include "config.h"
19 #endif
20
21 #ifdef AROS
22 #include "aros_compat.h"
23 #endif
24
25 #ifdef WIN32
26 #include "win32_compat.h"
27 #endif
28
29 #ifdef HAVE_ARPA_INET_H
30 #include <arpa/inet.h>
31 #endif
32
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #endif
36
37 #ifdef HAVE_UNISTD_H
38 #include <unistd.h>
39 #endif
40
41 #ifdef HAVE_SYS_IOCTL_H
42 #include <sys/ioctl.h>
43 #endif
44
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
47 #endif
48
49 #ifdef HAVE_NETINET_TCP_H
50 #include <netinet/tcp.h>
51 #endif
52
53 #ifdef HAVE_NETDB_H
54 #include <netdb.h>
55 #endif
56
57 #ifdef HAVE_SYS_FILIO_H
58 #include <sys/filio.h>
59 #endif
60
61 #ifdef HAVE_SYS_SOCKIO_H
62 #include <sys/sockio.h>
63 #endif
64
65 #include <stdio.h>
66 #include <stdlib.h>
67 #include <assert.h>
68 #include <fcntl.h>
69 #include <string.h>
70 #include <errno.h>
71 #include <sys/types.h>
72 #include "libnfs-zdr.h"
73 #include "libnfs.h"
74 #include "libnfs-raw.h"
75 #include "libnfs-private.h"
76 #include "slist.h"
77
78 #ifdef WIN32
79 //has to be included after stdlib!!
80 #include "win32_errnowrapper.h"
81 #endif
82
83 static int rpc_reconnect_requeue(struct rpc_context *rpc);
84 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
85
86 static void set_nonblocking(int fd)
87 {
88 int v = 0;
89 #if defined(WIN32)
90 long nonblocking=1;
91 v = ioctl(fd, FIONBIO, &nonblocking);
92 #else
93 v = fcntl(fd, F_GETFL, 0);
94 fcntl(fd, F_SETFL, v | O_NONBLOCK);
95 #endif //FIXME
96 }
97
98 static void set_nolinger(int fd)
99 {
100 struct linger lng;
101 lng.l_onoff = 1;
102 lng.l_linger = 0;
103 setsockopt(fd, SOL_SOCKET, SO_LINGER, &lng, sizeof(lng));
104 }
105
106 #ifdef HAVE_NETINET_TCP_H
107 int set_tcp_sockopt(int sockfd, int optname, int value)
108 {
109 int level;
110
111 #if defined(__FreeBSD__) || defined(__sun) || (defined(__APPLE__) && defined(__MACH__))
112 struct protoent *buf;
113
114 if ((buf = getprotobyname("tcp")) != NULL)
115 level = buf->p_proto;
116 else
117 return -1;
118 #else
119 level = SOL_TCP;
120 #endif
121
122 return setsockopt(sockfd, level, optname, (char *)&value, sizeof(value));
123 }
124 #endif
125
126 int rpc_get_fd(struct rpc_context *rpc)
127 {
128 assert(rpc->magic == RPC_CONTEXT_MAGIC);
129
130 return rpc->fd;
131 }
132
133 int rpc_which_events(struct rpc_context *rpc)
134 {
135 int events;
136
137 assert(rpc->magic == RPC_CONTEXT_MAGIC);
138
139 events = rpc->is_connected ? POLLIN : POLLOUT;
140
141 if (rpc->is_udp != 0) {
142 /* for udp sockets we only wait for pollin */
143 return POLLIN;
144 }
145
146 if (rpc->outqueue) {
147 events |= POLLOUT;
148 }
149 return events;
150 }
151
152 static int rpc_write_to_socket(struct rpc_context *rpc)
153 {
154 int32_t count;
155
156 assert(rpc->magic == RPC_CONTEXT_MAGIC);
157
158 if (rpc->fd == -1) {
159 rpc_set_error(rpc, "trying to write but not connected");
160 return -1;
161 }
162
163 while (rpc->outqueue != NULL) {
164 int64_t total;
165
166 total = rpc->outqueue->outdata.size;
167
168 count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
169 if (count == -1) {
170 if (errno == EAGAIN || errno == EWOULDBLOCK) {
171 return 0;
172 }
173 rpc_set_error(rpc, "Error when writing to socket :%s(%d)", strerror(errno), errno);
174 return -1;
175 }
176
177 rpc->outqueue->written += count;
178 if (rpc->outqueue->written == total) {
179 struct rpc_pdu *pdu = rpc->outqueue;
180
181 SLIST_REMOVE(&rpc->outqueue, pdu);
182 SLIST_ADD_END(&rpc->waitpdu, pdu);
183 }
184 }
185 return 0;
186 }
187
188 static int rpc_read_from_socket(struct rpc_context *rpc)
189 {
190 int available;
191 int size;
192 int pdu_size;
193 int32_t count;
194
195 assert(rpc->magic == RPC_CONTEXT_MAGIC);
196
197 if (ioctl(rpc->fd, FIONREAD, &available) != 0) {
198 rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno);
199 return -1;
200 }
201
202 if (available == 0) {
203 rpc_set_error(rpc, "Socket has been closed");
204 return -1;
205 }
206
207 if (rpc->is_udp) {
208 char *buf;
209 socklen_t socklen = sizeof(rpc->udp_src);
210
211 buf = malloc(available);
212 if (buf == NULL) {
213 rpc_set_error(rpc, "Failed to malloc buffer for recvfrom");
214 return -1;
215 }
216 count = recvfrom(rpc->fd, buf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
217 if (count < 0) {
218 rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno));
219 free(buf);
220 return -1;
221 }
222 if (rpc_process_pdu(rpc, buf, count) != 0) {
223 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU");
224 free(buf);
225 return -1;
226 }
227 free(buf);
228 return 0;
229 }
230
231 /* read record marker, 4 bytes at the beginning of every pdu */
232 if (rpc->inbuf == NULL) {
233 rpc->insize = 4;
234 rpc->inbuf = malloc(rpc->insize);
235 if (rpc->inbuf == NULL) {
236 rpc_set_error(rpc, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno);
237 return -1;
238 }
239 }
240 if (rpc->inpos < 4) {
241 size = 4 - rpc->inpos;
242
243 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
244 if (count == -1) {
245 if (errno == EINTR) {
246 return 0;
247 }
248 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
249 return -1;
250 }
251 available -= count;
252 rpc->inpos += count;
253 }
254
255 if (available == 0) {
256 return 0;
257 }
258
259 pdu_size = rpc_get_pdu_size(rpc->inbuf);
260 if (rpc->insize < pdu_size) {
261 unsigned char *buf;
262
263 buf = malloc(pdu_size);
264 if (buf == NULL) {
265 rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
266 return -1;
267 }
268 memcpy(buf, rpc->inbuf, rpc->insize);
269 free(rpc->inbuf);
270 rpc->inbuf = buf;
271 rpc->insize = rpc_get_pdu_size(rpc->inbuf);
272 }
273
274 size = available;
275 if (size > rpc->insize - rpc->inpos) {
276 size = rpc->insize - rpc->inpos;
277 }
278
279 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
280 if (count == -1) {
281 if (errno == EINTR) {
282 return 0;
283 }
284 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
285 return -1;
286 }
287 available -= count;
288 rpc->inpos += count;
289
290 if (rpc->inpos == rpc->insize) {
291 char *buf = rpc->inbuf;
292
293 rpc->inbuf = NULL;
294 rpc->insize = 0;
295 rpc->inpos = 0;
296
297 if (rpc_process_pdu(rpc, buf, pdu_size) != 0) {
298 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
299 return -1;
300 }
301 free(buf);
302 }
303
304 return 0;
305 }
306
307
308
309 int rpc_service(struct rpc_context *rpc, int revents)
310 {
311 assert(rpc->magic == RPC_CONTEXT_MAGIC);
312
313 if (revents & POLLERR) {
314 #ifdef WIN32
315 char err = 0;
316 #else
317 int err = 0;
318 #endif
319 socklen_t err_size = sizeof(err);
320
321 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
322 (char *)&err, &err_size) != 0 || err != 0) {
323 if (err == 0) {
324 err = errno;
325 }
326 rpc_set_error(rpc, "rpc_service: socket error "
327 "%s(%d).",
328 strerror(err), err);
329 } else {
330 rpc_set_error(rpc, "rpc_service: POLLERR, "
331 "Unknown socket error.");
332 }
333 if (rpc->connect_cb != NULL) {
334 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
335 }
336 return -1;
337 }
338 if (revents & POLLHUP) {
339 rpc_set_error(rpc, "Socket failed with POLLHUP");
340 if (rpc->connect_cb != NULL) {
341 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
342 }
343 return -1;
344 }
345
346 if (rpc->is_connected == 0 && rpc->fd != -1 && revents&POLLOUT) {
347 int err = 0;
348 socklen_t err_size = sizeof(err);
349
350 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
351 (char *)&err, &err_size) != 0 || err != 0) {
352 if (err == 0) {
353 err = errno;
354 }
355 rpc_set_error(rpc, "rpc_service: socket error "
356 "%s(%d) while connecting.",
357 strerror(err), err);
358 if (rpc->connect_cb != NULL) {
359 rpc->connect_cb(rpc, RPC_STATUS_ERROR,
360 NULL, rpc->connect_data);
361 }
362 return -1;
363 }
364
365 rpc->is_connected = 1;
366 if (rpc->connect_cb != NULL) {
367 rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
368 }
369 return 0;
370 }
371
372 if (revents & POLLIN) {
373 if (rpc_read_from_socket(rpc) != 0) {
374 rpc_reconnect_requeue(rpc);
375 return 0;
376 }
377 }
378
379 if (revents & POLLOUT && rpc->outqueue != NULL) {
380 if (rpc_write_to_socket(rpc) != 0) {
381 rpc_set_error(rpc, "write to socket failed");
382 return -1;
383 }
384 }
385
386 return 0;
387 }
388
389 void rpc_set_autoreconnect(struct rpc_context *rpc)
390 {
391 assert(rpc->magic == RPC_CONTEXT_MAGIC);
392
393 rpc->auto_reconnect = 1;
394 }
395
396 void rpc_unset_autoreconnect(struct rpc_context *rpc)
397 {
398 assert(rpc->magic == RPC_CONTEXT_MAGIC);
399
400 rpc->auto_reconnect = 0;
401 }
402
403 void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v)
404 {
405 assert(rpc->magic == RPC_CONTEXT_MAGIC);
406
407 rpc->tcp_syncnt = v;
408 }
409
410 #ifndef TCP_SYNCNT
411 #define TCP_SYNCNT 7
412 #endif
413
414 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s)
415 {
416 int socksize;
417
418 assert(rpc->magic == RPC_CONTEXT_MAGIC);
419
420 switch (s->ss_family) {
421 case AF_INET:
422 socksize = sizeof(struct sockaddr_in);
423 rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
424 #ifdef HAVE_NETINET_TCP_H
425 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
426 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
427 }
428 #endif
429 break;
430 case AF_INET6:
431 socksize = sizeof(struct sockaddr_in6);
432 rpc->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
433 #ifdef HAVE_NETINET_TCP_H
434 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
435 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
436 }
437 #endif
438 break;
439 default:
440 rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family);
441 return -1;
442 }
443
444 if (rpc->fd == -1) {
445 rpc_set_error(rpc, "Failed to open socket");
446 return -1;
447 }
448
449 /* Some systems allow you to set capabilities on an executable
450 * to allow the file to be executed with privilege to bind to
451 * privileged system ports, even if the user is not root.
452 *
453 * Opportunistically try to bind the socket to a low numbered
454 * system port in the hope that the user is either root or the
455 * executable has the CAP_NET_BIND_SERVICE.
456 *
457 * As soon as we fail the bind() with EACCES we know we will never
458 * be able to bind to a system port so we terminate the loop.
459 *
460 * On linux, use
461 * sudo setcap 'cap_net_bind_service=+ep' /path/executable
462 * to make the executable able to bind to a system port.
463 *
464 * On Windows, there is no concept of privileged ports. Thus
465 * binding will usually succeed.
466 */
467 {
468 struct sockaddr_storage ss;
469 static int portOfs = 0;
470 const int firstPort = 512; /* >= 512 according to Sun docs */
471 const int portCount = IPPORT_RESERVED - firstPort;
472 int startOfs, port, rc;
473
474 if (portOfs == 0) {
475 portOfs = time(NULL) % 400;
476 }
477 startOfs = portOfs;
478 do {
479 rc = -1;
480 port = htons(firstPort + portOfs);
481 portOfs = (portOfs + 1) % portCount;
482
483 /* skip well-known ports */
484 if (!getservbyport(port, "tcp")) {
485 memset(&ss, 0, sizeof(ss));
486
487 switch (s->ss_family) {
488 case AF_INET:
489 ((struct sockaddr_in *)&ss)->sin_port = port;
490 ((struct sockaddr_in *)&ss)->sin_family = AF_INET;
491 #ifdef HAVE_SOCKADDR_LEN
492 ((struct sockaddr_in *)&ss)->sin_len = sizeof(struct sockaddr_in);
493 #endif
494 break;
495 case AF_INET6:
496 ((struct sockaddr_in6 *)&ss)->sin6_port = port;
497 ((struct sockaddr_in6 *)&ss)->sin6_family = AF_INET6;
498 #ifdef HAVE_SOCKADDR_LEN
499 ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr_in);
500 #endif
501 break;
502 }
503
504 rc = bind(rpc->fd, (struct sockaddr *)&ss, socksize);
505 #if !defined(WIN32)
506 /* we got EACCES, so don't try again */
507 if (rc != 0 && errno == EACCES)
508 break;
509 #endif
510 }
511 } while (rc != 0 && portOfs != startOfs);
512 }
513
514 set_nonblocking(rpc->fd);
515 set_nolinger(rpc->fd);
516
517 if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
518 rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
519 return -1;
520 }
521
522 return 0;
523 }
524
525 int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
526 {
527 struct addrinfo *ai = NULL;
528
529 assert(rpc->magic == RPC_CONTEXT_MAGIC);
530
531 if (rpc->fd != -1) {
532 rpc_set_error(rpc, "Trying to connect while already connected");
533 return -1;
534 }
535
536 if (rpc->is_udp != 0) {
537 rpc_set_error(rpc, "Trying to connect on UDP socket");
538 return -1;
539 }
540
541 rpc->auto_reconnect = 0;
542
543 if (getaddrinfo(server, NULL, NULL, &ai) != 0) {
544 rpc_set_error(rpc, "Invalid address:%s. "
545 "Can not resolv into IPv4/v6 structure.", server);
546 return -1;
547 }
548
549 switch (ai->ai_family) {
550 case AF_INET:
551 ((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family;
552 ((struct sockaddr_in *)&rpc->s)->sin_port = htons(port);
553 #ifdef HAVE_SOCKADDR_LEN
554 ((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in);
555 #endif
556 break;
557 case AF_INET6:
558 ((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family;
559 ((struct sockaddr_in6 *)&rpc->s)->sin6_port = htons(port);
560 #ifdef HAVE_SOCKADDR_LEN
561 ((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6);
562 #endif
563 break;
564 }
565
566 rpc->connect_cb = cb;
567 rpc->connect_data = private_data;
568
569 freeaddrinfo(ai);
570
571 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
572 return -1;
573 }
574
575 return 0;
576 }
577
578 int rpc_disconnect(struct rpc_context *rpc, char *error)
579 {
580 assert(rpc->magic == RPC_CONTEXT_MAGIC);
581
582 rpc_unset_autoreconnect(rpc);
583
584 if (rpc->fd != -1) {
585 close(rpc->fd);
586 }
587 rpc->fd = -1;
588
589 rpc->is_connected = 0;
590
591 rpc_error_all_pdus(rpc, error);
592
593 return 0;
594 }
595
596 static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, void *private_data)
597 {
598 assert(rpc->magic == RPC_CONTEXT_MAGIC);
599
600 if (status != RPC_STATUS_SUCCESS) {
601 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
602 return;
603 }
604
605 rpc->is_connected = 1;
606 rpc->connect_cb = NULL;
607 }
608
609 /* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
610 static int rpc_reconnect_requeue(struct rpc_context *rpc)
611 {
612 struct rpc_pdu *pdu;
613
614 assert(rpc->magic == RPC_CONTEXT_MAGIC);
615
616 if (rpc->fd != -1) {
617 close(rpc->fd);
618 }
619 rpc->fd = -1;
620
621 rpc->is_connected = 0;
622
623 /* socket is closed so we will not get any replies to any commands
624 * in flight. Move them all over from the waitpdu queue back to the out queue
625 */
626 for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
627 SLIST_REMOVE(&rpc->waitpdu, pdu);
628 SLIST_ADD(&rpc->outqueue, pdu);
629 /* we have to re-send the whole pdu again */
630 pdu->written = 0;
631 }
632
633 if (rpc->auto_reconnect != 0) {
634 rpc->connect_cb = reconnect_cb;
635
636 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
637 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
638 return -1;
639 }
640 }
641
642 return 0;
643 }
644
645
646 int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port)
647 {
648 struct addrinfo *ai = NULL;
649 char service[6];
650
651 assert(rpc->magic == RPC_CONTEXT_MAGIC);
652
653 if (rpc->is_udp == 0) {
654 rpc_set_error(rpc, "Cant not bind UDP. Not UDP context");
655 return -1;
656 }
657
658 sprintf(service, "%d", port);
659 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
660 rpc_set_error(rpc, "Invalid address:%s. "
661 "Can not resolv into IPv4/v6 structure.", addr);
662 return -1;
663 }
664
665 switch(ai->ai_family) {
666 case AF_INET:
667 rpc->fd = socket(ai->ai_family, SOCK_DGRAM, 0);
668 if (rpc->fd == -1) {
669 rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno));
670 freeaddrinfo(ai);
671 return -1;
672 }
673
674 if (bind(rpc->fd, (struct sockaddr *)ai->ai_addr, sizeof(struct sockaddr_in)) != 0) {
675 rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno));
676 freeaddrinfo(ai);
677 return -1;
678 }
679 break;
680 default:
681 rpc_set_error(rpc, "Can not handle UPD sockets of family %d yet", ai->ai_family);
682 freeaddrinfo(ai);
683 return -1;
684 }
685
686 freeaddrinfo(ai);
687
688 return 0;
689 }
690
691 int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int is_broadcast)
692 {
693 struct addrinfo *ai = NULL;
694 char service[6];
695
696 assert(rpc->magic == RPC_CONTEXT_MAGIC);
697
698 if (rpc->is_udp == 0) {
699 rpc_set_error(rpc, "Can not set destination sockaddr. Not UDP context");
700 return -1;
701 }
702
703 sprintf(service, "%d", port);
704 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
705 rpc_set_error(rpc, "Invalid address:%s. "
706 "Can not resolv into IPv4/v6 structure.", addr);
707 return -1;
708 }
709
710 if (rpc->udp_dest) {
711 free(rpc->udp_dest);
712 rpc->udp_dest = NULL;
713 }
714 rpc->udp_dest = malloc(ai->ai_addrlen);
715 if (rpc->udp_dest == NULL) {
716 rpc_set_error(rpc, "Out of memory. Failed to allocate sockaddr structure");
717 freeaddrinfo(ai);
718 return -1;
719 }
720 memcpy(rpc->udp_dest, ai->ai_addr, ai->ai_addrlen);
721 freeaddrinfo(ai);
722
723 rpc->is_broadcast = is_broadcast;
724 setsockopt(rpc->fd, SOL_SOCKET, SO_BROADCAST, (char *)&is_broadcast, sizeof(is_broadcast));
725
726 return 0;
727 }
728
729 struct sockaddr *rpc_get_recv_sockaddr(struct rpc_context *rpc)
730 {
731 assert(rpc->magic == RPC_CONTEXT_MAGIC);
732
733 return (struct sockaddr *)&rpc->udp_src;
734 }
735
736 int rpc_queue_length(struct rpc_context *rpc)
737 {
738 int i=0;
739 struct rpc_pdu *pdu;
740
741 assert(rpc->magic == RPC_CONTEXT_MAGIC);
742
743 for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
744 i++;
745 }
746 for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
747 i++;
748 }
749 return i;
750 }
751
752 void rpc_set_fd(struct rpc_context *rpc, int fd)
753 {
754 assert(rpc->magic == RPC_CONTEXT_MAGIC);
755
756 rpc->fd = fd;
757 }