Track waiting requests in a hash table, by xid
[deb_libnfs.git] / lib / socket.c
1 /*
2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
16 */
17 #ifdef HAVE_CONFIG_H
18 #include "config.h"
19 #endif
20
21 #ifdef AROS
22 #include "aros_compat.h"
23 #endif
24
25 #ifdef WIN32
26 #include "win32_compat.h"
27 #endif
28
29 #ifdef HAVE_ARPA_INET_H
30 #include <arpa/inet.h>
31 #endif
32
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #endif
36
37 #ifdef HAVE_UNISTD_H
38 #include <unistd.h>
39 #endif
40
41 #ifdef HAVE_SYS_IOCTL_H
42 #include <sys/ioctl.h>
43 #endif
44
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
47 #endif
48
49 #ifdef HAVE_NETINET_TCP_H
50 #include <netinet/tcp.h>
51 #endif
52
53 #ifdef HAVE_NETDB_H
54 #include <netdb.h>
55 #endif
56
57 #ifdef HAVE_SYS_FILIO_H
58 #include <sys/filio.h>
59 #endif
60
61 #ifdef HAVE_SYS_SOCKIO_H
62 #include <sys/sockio.h>
63 #endif
64
65 #include <stdio.h>
66 #include <stdlib.h>
67 #include <assert.h>
68 #include <fcntl.h>
69 #include <string.h>
70 #include <errno.h>
71 #include <sys/types.h>
72 #include "libnfs-zdr.h"
73 #include "libnfs.h"
74 #include "libnfs-raw.h"
75 #include "libnfs-private.h"
76 #include "slist.h"
77
78 #ifdef WIN32
79 //has to be included after stdlib!!
80 #include "win32_errnowrapper.h"
81 #endif
82
83 static int rpc_reconnect_requeue(struct rpc_context *rpc);
84 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
85
86 static void set_nonblocking(int fd)
87 {
88 int v = 0;
89 #if defined(WIN32)
90 long nonblocking=1;
91 v = ioctl(fd, FIONBIO, &nonblocking);
92 #else
93 v = fcntl(fd, F_GETFL, 0);
94 fcntl(fd, F_SETFL, v | O_NONBLOCK);
95 #endif //FIXME
96 }
97
98 #ifdef HAVE_NETINET_TCP_H
99 int set_tcp_sockopt(int sockfd, int optname, int value)
100 {
101 int level;
102
103 #if defined(__FreeBSD__) || defined(__sun) || (defined(__APPLE__) && defined(__MACH__))
104 struct protoent *buf;
105
106 if ((buf = getprotobyname("tcp")) != NULL)
107 level = buf->p_proto;
108 else
109 return -1;
110 #else
111 level = SOL_TCP;
112 #endif
113
114 return setsockopt(sockfd, level, optname, (char *)&value, sizeof(value));
115 }
116 #endif
117
118 int rpc_get_fd(struct rpc_context *rpc)
119 {
120 assert(rpc->magic == RPC_CONTEXT_MAGIC);
121
122 return rpc->fd;
123 }
124
125 static int rpc_has_queue(struct rpc_queue *q)
126 {
127 return q->head != NULL;
128 }
129
130 int rpc_which_events(struct rpc_context *rpc)
131 {
132 int events;
133
134 assert(rpc->magic == RPC_CONTEXT_MAGIC);
135
136 events = rpc->is_connected ? POLLIN : POLLOUT;
137
138 if (rpc->is_udp != 0) {
139 /* for udp sockets we only wait for pollin */
140 return POLLIN;
141 }
142
143 if (rpc_has_queue(&rpc->outqueue)) {
144 events |= POLLOUT;
145 }
146 return events;
147 }
148
149 static int rpc_write_to_socket(struct rpc_context *rpc)
150 {
151 int32_t count;
152 struct rpc_pdu *pdu;
153
154 assert(rpc->magic == RPC_CONTEXT_MAGIC);
155
156 if (rpc->fd == -1) {
157 rpc_set_error(rpc, "trying to write but not connected");
158 return -1;
159 }
160
161 while ((pdu = rpc->outqueue.head) != NULL) {
162 int64_t total;
163
164 total = pdu->outdata.size;
165
166 count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
167 if (count == -1) {
168 if (errno == EAGAIN || errno == EWOULDBLOCK) {
169 return 0;
170 }
171 rpc_set_error(rpc, "Error when writing to socket :%s(%d)", strerror(errno), errno);
172 return -1;
173 }
174
175 pdu->written += count;
176 if (pdu->written == total) {
177 unsigned int hash;
178
179 rpc->outqueue.head = pdu->next;
180 if (pdu->next == NULL)
181 rpc->outqueue.tail = NULL;
182
183 hash = rpc_hash_xid(pdu->xid);
184 rpc_enqueue(&rpc->waitpdu[hash], pdu);
185 }
186 }
187 return 0;
188 }
189
190 static int rpc_read_from_socket(struct rpc_context *rpc)
191 {
192 int available;
193 int size;
194 int pdu_size;
195 int32_t count;
196
197 assert(rpc->magic == RPC_CONTEXT_MAGIC);
198
199 if (ioctl(rpc->fd, FIONREAD, &available) != 0) {
200 rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno);
201 return -1;
202 }
203
204 if (available == 0) {
205 rpc_set_error(rpc, "Socket has been closed");
206 return -1;
207 }
208
209 if (rpc->is_udp) {
210 char *buf;
211 socklen_t socklen = sizeof(rpc->udp_src);
212
213 buf = malloc(available);
214 if (buf == NULL) {
215 rpc_set_error(rpc, "Failed to malloc buffer for recvfrom");
216 return -1;
217 }
218 count = recvfrom(rpc->fd, buf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
219 if (count < 0) {
220 rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno));
221 free(buf);
222 return -1;
223 }
224 if (rpc_process_pdu(rpc, buf, count) != 0) {
225 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU");
226 free(buf);
227 return -1;
228 }
229 free(buf);
230 return 0;
231 }
232
233 /* read record marker, 4 bytes at the beginning of every pdu */
234 if (rpc->inbuf == NULL) {
235 rpc->insize = 4;
236 rpc->inbuf = malloc(rpc->insize);
237 if (rpc->inbuf == NULL) {
238 rpc_set_error(rpc, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno);
239 return -1;
240 }
241 }
242 if (rpc->inpos < 4) {
243 size = 4 - rpc->inpos;
244
245 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
246 if (count == -1) {
247 if (errno == EINTR) {
248 return 0;
249 }
250 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
251 return -1;
252 }
253 available -= count;
254 rpc->inpos += count;
255 }
256
257 if (available == 0) {
258 return 0;
259 }
260
261 pdu_size = rpc_get_pdu_size(rpc->inbuf);
262 if (rpc->insize < pdu_size) {
263 unsigned char *buf;
264
265 buf = malloc(pdu_size);
266 if (buf == NULL) {
267 rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
268 return -1;
269 }
270 memcpy(buf, rpc->inbuf, rpc->insize);
271 free(rpc->inbuf);
272 rpc->inbuf = buf;
273 rpc->insize = rpc_get_pdu_size(rpc->inbuf);
274 }
275
276 size = available;
277 if (size > rpc->insize - rpc->inpos) {
278 size = rpc->insize - rpc->inpos;
279 }
280
281 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
282 if (count == -1) {
283 if (errno == EINTR) {
284 return 0;
285 }
286 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
287 return -1;
288 }
289 available -= count;
290 rpc->inpos += count;
291
292 if (rpc->inpos == rpc->insize) {
293 char *buf = rpc->inbuf;
294
295 rpc->inbuf = NULL;
296 rpc->insize = 0;
297 rpc->inpos = 0;
298
299 if (rpc_process_pdu(rpc, buf, pdu_size) != 0) {
300 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
301 return -1;
302 }
303 free(buf);
304 }
305
306 return 0;
307 }
308
309
310
311 int rpc_service(struct rpc_context *rpc, int revents)
312 {
313 assert(rpc->magic == RPC_CONTEXT_MAGIC);
314
315 if (revents & POLLERR) {
316 #ifdef WIN32
317 char err = 0;
318 #else
319 int err = 0;
320 #endif
321 socklen_t err_size = sizeof(err);
322
323 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
324 (char *)&err, &err_size) != 0 || err != 0) {
325 if (err == 0) {
326 err = errno;
327 }
328 rpc_set_error(rpc, "rpc_service: socket error "
329 "%s(%d).",
330 strerror(err), err);
331 } else {
332 rpc_set_error(rpc, "rpc_service: POLLERR, "
333 "Unknown socket error.");
334 }
335 if (rpc->connect_cb != NULL) {
336 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
337 }
338 return -1;
339 }
340 if (revents & POLLHUP) {
341 rpc_set_error(rpc, "Socket failed with POLLHUP");
342 if (rpc->connect_cb != NULL) {
343 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
344 }
345 return -1;
346 }
347
348 if (rpc->is_connected == 0 && rpc->fd != -1 && revents&POLLOUT) {
349 int err = 0;
350 socklen_t err_size = sizeof(err);
351
352 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
353 (char *)&err, &err_size) != 0 || err != 0) {
354 if (err == 0) {
355 err = errno;
356 }
357 rpc_set_error(rpc, "rpc_service: socket error "
358 "%s(%d) while connecting.",
359 strerror(err), err);
360 if (rpc->connect_cb != NULL) {
361 rpc->connect_cb(rpc, RPC_STATUS_ERROR,
362 NULL, rpc->connect_data);
363 }
364 return -1;
365 }
366
367 rpc->is_connected = 1;
368 if (rpc->connect_cb != NULL) {
369 rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
370 }
371 return 0;
372 }
373
374 if (revents & POLLIN) {
375 if (rpc_read_from_socket(rpc) != 0) {
376 rpc_reconnect_requeue(rpc);
377 return 0;
378 }
379 }
380
381 if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
382 if (rpc_write_to_socket(rpc) != 0) {
383 rpc_set_error(rpc, "write to socket failed");
384 return -1;
385 }
386 }
387
388 return 0;
389 }
390
391 void rpc_set_autoreconnect(struct rpc_context *rpc)
392 {
393 assert(rpc->magic == RPC_CONTEXT_MAGIC);
394
395 rpc->auto_reconnect = 1;
396 }
397
398 void rpc_unset_autoreconnect(struct rpc_context *rpc)
399 {
400 assert(rpc->magic == RPC_CONTEXT_MAGIC);
401
402 rpc->auto_reconnect = 0;
403 }
404
405 void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v)
406 {
407 assert(rpc->magic == RPC_CONTEXT_MAGIC);
408
409 rpc->tcp_syncnt = v;
410 }
411
412 #ifndef TCP_SYNCNT
413 #define TCP_SYNCNT 7
414 #endif
415
416 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s)
417 {
418 int socksize;
419
420 assert(rpc->magic == RPC_CONTEXT_MAGIC);
421
422 switch (s->ss_family) {
423 case AF_INET:
424 socksize = sizeof(struct sockaddr_in);
425 rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
426 #ifdef HAVE_NETINET_TCP_H
427 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
428 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
429 }
430 #endif
431 break;
432 case AF_INET6:
433 socksize = sizeof(struct sockaddr_in6);
434 rpc->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
435 #ifdef HAVE_NETINET_TCP_H
436 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
437 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
438 }
439 #endif
440 break;
441 default:
442 rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family);
443 return -1;
444 }
445
446 if (rpc->fd == -1) {
447 rpc_set_error(rpc, "Failed to open socket");
448 return -1;
449 }
450
451 /* Some systems allow you to set capabilities on an executable
452 * to allow the file to be executed with privilege to bind to
453 * privileged system ports, even if the user is not root.
454 *
455 * Opportunistically try to bind the socket to a low numbered
456 * system port in the hope that the user is either root or the
457 * executable has the CAP_NET_BIND_SERVICE.
458 *
459 * As soon as we fail the bind() with EACCES we know we will never
460 * be able to bind to a system port so we terminate the loop.
461 *
462 * On linux, use
463 * sudo setcap 'cap_net_bind_service=+ep' /path/executable
464 * to make the executable able to bind to a system port.
465 *
466 * On Windows, there is no concept of privileged ports. Thus
467 * binding will usually succeed.
468 */
469 {
470 struct sockaddr_storage ss;
471 static int portOfs = 0;
472 const int firstPort = 512; /* >= 512 according to Sun docs */
473 const int portCount = IPPORT_RESERVED - firstPort;
474 int startOfs, port, rc;
475
476 if (portOfs == 0) {
477 portOfs = time(NULL) % 400;
478 }
479 startOfs = portOfs;
480 do {
481 rc = -1;
482 port = htons(firstPort + portOfs);
483 portOfs = (portOfs + 1) % portCount;
484
485 /* skip well-known ports */
486 if (!getservbyport(port, "tcp")) {
487 memset(&ss, 0, sizeof(ss));
488
489 switch (s->ss_family) {
490 case AF_INET:
491 ((struct sockaddr_in *)&ss)->sin_port = port;
492 ((struct sockaddr_in *)&ss)->sin_family = AF_INET;
493 #ifdef HAVE_SOCKADDR_LEN
494 ((struct sockaddr_in *)&ss)->sin_len = sizeof(struct sockaddr_in);
495 #endif
496 break;
497 case AF_INET6:
498 ((struct sockaddr_in6 *)&ss)->sin6_port = port;
499 ((struct sockaddr_in6 *)&ss)->sin6_family = AF_INET6;
500 #ifdef HAVE_SOCKADDR_LEN
501 ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr_in);
502 #endif
503 break;
504 }
505
506 rc = bind(rpc->fd, (struct sockaddr *)&ss, socksize);
507 #if !defined(WIN32)
508 /* we got EACCES, so don't try again */
509 if (rc != 0 && errno == EACCES)
510 break;
511 #endif
512 }
513 } while (rc != 0 && portOfs != startOfs);
514 }
515
516 set_nonblocking(rpc->fd);
517
518 if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
519 rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
520 return -1;
521 }
522
523 return 0;
524 }
525
526 int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
527 {
528 struct addrinfo *ai = NULL;
529
530 assert(rpc->magic == RPC_CONTEXT_MAGIC);
531
532 if (rpc->fd != -1) {
533 rpc_set_error(rpc, "Trying to connect while already connected");
534 return -1;
535 }
536
537 if (rpc->is_udp != 0) {
538 rpc_set_error(rpc, "Trying to connect on UDP socket");
539 return -1;
540 }
541
542 rpc->auto_reconnect = 0;
543
544 if (getaddrinfo(server, NULL, NULL, &ai) != 0) {
545 rpc_set_error(rpc, "Invalid address:%s. "
546 "Can not resolv into IPv4/v6 structure.", server);
547 return -1;
548 }
549
550 switch (ai->ai_family) {
551 case AF_INET:
552 ((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family;
553 ((struct sockaddr_in *)&rpc->s)->sin_port = htons(port);
554 #ifdef HAVE_SOCKADDR_LEN
555 ((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in);
556 #endif
557 break;
558 case AF_INET6:
559 ((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family;
560 ((struct sockaddr_in6 *)&rpc->s)->sin6_port = htons(port);
561 #ifdef HAVE_SOCKADDR_LEN
562 ((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6);
563 #endif
564 break;
565 }
566
567 rpc->connect_cb = cb;
568 rpc->connect_data = private_data;
569
570 freeaddrinfo(ai);
571
572 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
573 return -1;
574 }
575
576 return 0;
577 }
578
579 int rpc_disconnect(struct rpc_context *rpc, char *error)
580 {
581 assert(rpc->magic == RPC_CONTEXT_MAGIC);
582
583 rpc_unset_autoreconnect(rpc);
584
585 if (rpc->fd != -1) {
586 close(rpc->fd);
587 }
588 rpc->fd = -1;
589
590 rpc->is_connected = 0;
591
592 rpc_error_all_pdus(rpc, error);
593
594 return 0;
595 }
596
597 static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, void *private_data)
598 {
599 assert(rpc->magic == RPC_CONTEXT_MAGIC);
600
601 if (status != RPC_STATUS_SUCCESS) {
602 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
603 return;
604 }
605
606 rpc->is_connected = 1;
607 rpc->connect_cb = NULL;
608 }
609
610 /* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
611 static int rpc_reconnect_requeue(struct rpc_context *rpc)
612 {
613 struct rpc_pdu *pdu;
614 unsigned int i;
615
616 assert(rpc->magic == RPC_CONTEXT_MAGIC);
617
618 if (rpc->fd != -1) {
619 close(rpc->fd);
620 }
621 rpc->fd = -1;
622
623 rpc->is_connected = 0;
624
625 /* socket is closed so we will not get any replies to any commands
626 * in flight. Move them all over from the waitpdu queue back to the out queue
627 */
628 for (i = 0; i < HASHES; i++) {
629 struct rpc_queue *q = &rpc->waitpdu[i];
630
631 for (pdu=q->head; pdu; pdu=pdu->next) {
632 rpc_return_to_queue(&rpc->outqueue, pdu);
633 /* we have to re-send the whole pdu again */
634 pdu->written = 0;
635 }
636 rpc_reset_queue(q);
637 }
638
639 if (rpc->auto_reconnect != 0) {
640 rpc->connect_cb = reconnect_cb;
641
642 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
643 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
644 return -1;
645 }
646 }
647
648 return 0;
649 }
650
651
652 int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port)
653 {
654 struct addrinfo *ai = NULL;
655 char service[6];
656
657 assert(rpc->magic == RPC_CONTEXT_MAGIC);
658
659 if (rpc->is_udp == 0) {
660 rpc_set_error(rpc, "Cant not bind UDP. Not UDP context");
661 return -1;
662 }
663
664 sprintf(service, "%d", port);
665 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
666 rpc_set_error(rpc, "Invalid address:%s. "
667 "Can not resolv into IPv4/v6 structure.", addr);
668 return -1;
669 }
670
671 switch(ai->ai_family) {
672 case AF_INET:
673 rpc->fd = socket(ai->ai_family, SOCK_DGRAM, 0);
674 if (rpc->fd == -1) {
675 rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno));
676 freeaddrinfo(ai);
677 return -1;
678 }
679
680 if (bind(rpc->fd, (struct sockaddr *)ai->ai_addr, sizeof(struct sockaddr_in)) != 0) {
681 rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno));
682 freeaddrinfo(ai);
683 return -1;
684 }
685 break;
686 default:
687 rpc_set_error(rpc, "Can not handle UPD sockets of family %d yet", ai->ai_family);
688 freeaddrinfo(ai);
689 return -1;
690 }
691
692 freeaddrinfo(ai);
693
694 return 0;
695 }
696
697 int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int is_broadcast)
698 {
699 struct addrinfo *ai = NULL;
700 char service[6];
701
702 assert(rpc->magic == RPC_CONTEXT_MAGIC);
703
704 if (rpc->is_udp == 0) {
705 rpc_set_error(rpc, "Can not set destination sockaddr. Not UDP context");
706 return -1;
707 }
708
709 sprintf(service, "%d", port);
710 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
711 rpc_set_error(rpc, "Invalid address:%s. "
712 "Can not resolv into IPv4/v6 structure.", addr);
713 return -1;
714 }
715
716 if (rpc->udp_dest) {
717 free(rpc->udp_dest);
718 rpc->udp_dest = NULL;
719 }
720 rpc->udp_dest = malloc(ai->ai_addrlen);
721 if (rpc->udp_dest == NULL) {
722 rpc_set_error(rpc, "Out of memory. Failed to allocate sockaddr structure");
723 freeaddrinfo(ai);
724 return -1;
725 }
726 memcpy(rpc->udp_dest, ai->ai_addr, ai->ai_addrlen);
727 freeaddrinfo(ai);
728
729 rpc->is_broadcast = is_broadcast;
730 setsockopt(rpc->fd, SOL_SOCKET, SO_BROADCAST, (char *)&is_broadcast, sizeof(is_broadcast));
731
732 return 0;
733 }
734
735 struct sockaddr *rpc_get_recv_sockaddr(struct rpc_context *rpc)
736 {
737 assert(rpc->magic == RPC_CONTEXT_MAGIC);
738
739 return (struct sockaddr *)&rpc->udp_src;
740 }
741
742 int rpc_queue_length(struct rpc_context *rpc)
743 {
744 int i=0;
745 struct rpc_pdu *pdu;
746 unsigned int n;
747
748 assert(rpc->magic == RPC_CONTEXT_MAGIC);
749
750 for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
751 i++;
752 }
753
754 for (n = 0; n < HASHES; n++) {
755 struct rpc_queue *q = &rpc->waitpdu[n];
756
757 for(pdu = q->head; pdu; pdu = pdu->next)
758 i++;
759 }
760 return i;
761 }
762
763 void rpc_set_fd(struct rpc_context *rpc, int fd)
764 {
765 assert(rpc->magic == RPC_CONTEXT_MAGIC);
766
767 rpc->fd = fd;
768 }