Optimisations to the pdu queues
[deb_libnfs.git] / lib / socket.c
1 /*
2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
16 */
17 #ifdef HAVE_CONFIG_H
18 #include "config.h"
19 #endif
20
21 #ifdef AROS
22 #include "aros_compat.h"
23 #endif
24
25 #ifdef WIN32
26 #include "win32_compat.h"
27 #endif
28
29 #ifdef HAVE_ARPA_INET_H
30 #include <arpa/inet.h>
31 #endif
32
33 #ifdef HAVE_POLL_H
34 #include <poll.h>
35 #endif
36
37 #ifdef HAVE_UNISTD_H
38 #include <unistd.h>
39 #endif
40
41 #ifdef HAVE_SYS_IOCTL_H
42 #include <sys/ioctl.h>
43 #endif
44
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
47 #endif
48
49 #ifdef HAVE_NETINET_TCP_H
50 #include <netinet/tcp.h>
51 #endif
52
53 #ifdef HAVE_NETDB_H
54 #include <netdb.h>
55 #endif
56
57 #ifdef HAVE_SYS_FILIO_H
58 #include <sys/filio.h>
59 #endif
60
61 #ifdef HAVE_SYS_SOCKIO_H
62 #include <sys/sockio.h>
63 #endif
64
65 #include <stdio.h>
66 #include <stdlib.h>
67 #include <assert.h>
68 #include <fcntl.h>
69 #include <string.h>
70 #include <errno.h>
71 #include <sys/types.h>
72 #include "libnfs-zdr.h"
73 #include "libnfs.h"
74 #include "libnfs-raw.h"
75 #include "libnfs-private.h"
76 #include "slist.h"
77
78 #ifdef WIN32
79 //has to be included after stdlib!!
80 #include "win32_errnowrapper.h"
81 #endif
82
83 static int rpc_reconnect_requeue(struct rpc_context *rpc);
84 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
85
86 static void set_nonblocking(int fd)
87 {
88 int v = 0;
89 #if defined(WIN32)
90 long nonblocking=1;
91 v = ioctl(fd, FIONBIO, &nonblocking);
92 #else
93 v = fcntl(fd, F_GETFL, 0);
94 fcntl(fd, F_SETFL, v | O_NONBLOCK);
95 #endif //FIXME
96 }
97
98 #ifdef HAVE_NETINET_TCP_H
99 int set_tcp_sockopt(int sockfd, int optname, int value)
100 {
101 int level;
102
103 #if defined(__FreeBSD__) || defined(__sun) || (defined(__APPLE__) && defined(__MACH__))
104 struct protoent *buf;
105
106 if ((buf = getprotobyname("tcp")) != NULL)
107 level = buf->p_proto;
108 else
109 return -1;
110 #else
111 level = SOL_TCP;
112 #endif
113
114 return setsockopt(sockfd, level, optname, (char *)&value, sizeof(value));
115 }
116 #endif
117
118 int rpc_get_fd(struct rpc_context *rpc)
119 {
120 assert(rpc->magic == RPC_CONTEXT_MAGIC);
121
122 return rpc->fd;
123 }
124
125 static int rpc_has_queue(struct rpc_queue *q)
126 {
127 return q->head != NULL;
128 }
129
130 int rpc_which_events(struct rpc_context *rpc)
131 {
132 int events;
133
134 assert(rpc->magic == RPC_CONTEXT_MAGIC);
135
136 events = rpc->is_connected ? POLLIN : POLLOUT;
137
138 if (rpc->is_udp != 0) {
139 /* for udp sockets we only wait for pollin */
140 return POLLIN;
141 }
142
143 if (rpc_has_queue(&rpc->outqueue)) {
144 events |= POLLOUT;
145 }
146 return events;
147 }
148
149 static int rpc_write_to_socket(struct rpc_context *rpc)
150 {
151 int32_t count;
152 struct rpc_pdu *pdu;
153
154 assert(rpc->magic == RPC_CONTEXT_MAGIC);
155
156 if (rpc->fd == -1) {
157 rpc_set_error(rpc, "trying to write but not connected");
158 return -1;
159 }
160
161 while ((pdu = rpc->outqueue.head) != NULL) {
162 int64_t total;
163
164 total = pdu->outdata.size;
165
166 count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
167 if (count == -1) {
168 if (errno == EAGAIN || errno == EWOULDBLOCK) {
169 return 0;
170 }
171 rpc_set_error(rpc, "Error when writing to socket :%s(%d)", strerror(errno), errno);
172 return -1;
173 }
174
175 pdu->written += count;
176 if (pdu->written == total) {
177 rpc->outqueue.head = pdu->next;
178 if (pdu->next == NULL)
179 rpc->outqueue.tail = NULL;
180 rpc_enqueue(&rpc->waitpdu, pdu);
181 }
182 }
183 return 0;
184 }
185
186 static int rpc_read_from_socket(struct rpc_context *rpc)
187 {
188 int available;
189 int size;
190 int pdu_size;
191 int32_t count;
192
193 assert(rpc->magic == RPC_CONTEXT_MAGIC);
194
195 if (ioctl(rpc->fd, FIONREAD, &available) != 0) {
196 rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno);
197 return -1;
198 }
199
200 if (available == 0) {
201 rpc_set_error(rpc, "Socket has been closed");
202 return -1;
203 }
204
205 if (rpc->is_udp) {
206 char *buf;
207 socklen_t socklen = sizeof(rpc->udp_src);
208
209 buf = malloc(available);
210 if (buf == NULL) {
211 rpc_set_error(rpc, "Failed to malloc buffer for recvfrom");
212 return -1;
213 }
214 count = recvfrom(rpc->fd, buf, available, MSG_DONTWAIT, (struct sockaddr *)&rpc->udp_src, &socklen);
215 if (count < 0) {
216 rpc_set_error(rpc, "Failed recvfrom: %s", strerror(errno));
217 free(buf);
218 return -1;
219 }
220 if (rpc_process_pdu(rpc, buf, count) != 0) {
221 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Ignoring PDU");
222 free(buf);
223 return -1;
224 }
225 free(buf);
226 return 0;
227 }
228
229 /* read record marker, 4 bytes at the beginning of every pdu */
230 if (rpc->inbuf == NULL) {
231 rpc->insize = 4;
232 rpc->inbuf = malloc(rpc->insize);
233 if (rpc->inbuf == NULL) {
234 rpc_set_error(rpc, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno);
235 return -1;
236 }
237 }
238 if (rpc->inpos < 4) {
239 size = 4 - rpc->inpos;
240
241 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
242 if (count == -1) {
243 if (errno == EINTR) {
244 return 0;
245 }
246 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
247 return -1;
248 }
249 available -= count;
250 rpc->inpos += count;
251 }
252
253 if (available == 0) {
254 return 0;
255 }
256
257 pdu_size = rpc_get_pdu_size(rpc->inbuf);
258 if (rpc->insize < pdu_size) {
259 unsigned char *buf;
260
261 buf = malloc(pdu_size);
262 if (buf == NULL) {
263 rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
264 return -1;
265 }
266 memcpy(buf, rpc->inbuf, rpc->insize);
267 free(rpc->inbuf);
268 rpc->inbuf = buf;
269 rpc->insize = rpc_get_pdu_size(rpc->inbuf);
270 }
271
272 size = available;
273 if (size > rpc->insize - rpc->inpos) {
274 size = rpc->insize - rpc->inpos;
275 }
276
277 count = recv(rpc->fd, rpc->inbuf + rpc->inpos, size, 0);
278 if (count == -1) {
279 if (errno == EINTR) {
280 return 0;
281 }
282 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
283 return -1;
284 }
285 available -= count;
286 rpc->inpos += count;
287
288 if (rpc->inpos == rpc->insize) {
289 char *buf = rpc->inbuf;
290
291 rpc->inbuf = NULL;
292 rpc->insize = 0;
293 rpc->inpos = 0;
294
295 if (rpc_process_pdu(rpc, buf, pdu_size) != 0) {
296 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
297 return -1;
298 }
299 free(buf);
300 }
301
302 return 0;
303 }
304
305
306
307 int rpc_service(struct rpc_context *rpc, int revents)
308 {
309 assert(rpc->magic == RPC_CONTEXT_MAGIC);
310
311 if (revents & POLLERR) {
312 #ifdef WIN32
313 char err = 0;
314 #else
315 int err = 0;
316 #endif
317 socklen_t err_size = sizeof(err);
318
319 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
320 (char *)&err, &err_size) != 0 || err != 0) {
321 if (err == 0) {
322 err = errno;
323 }
324 rpc_set_error(rpc, "rpc_service: socket error "
325 "%s(%d).",
326 strerror(err), err);
327 } else {
328 rpc_set_error(rpc, "rpc_service: POLLERR, "
329 "Unknown socket error.");
330 }
331 if (rpc->connect_cb != NULL) {
332 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
333 }
334 return -1;
335 }
336 if (revents & POLLHUP) {
337 rpc_set_error(rpc, "Socket failed with POLLHUP");
338 if (rpc->connect_cb != NULL) {
339 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
340 }
341 return -1;
342 }
343
344 if (rpc->is_connected == 0 && rpc->fd != -1 && revents&POLLOUT) {
345 int err = 0;
346 socklen_t err_size = sizeof(err);
347
348 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
349 (char *)&err, &err_size) != 0 || err != 0) {
350 if (err == 0) {
351 err = errno;
352 }
353 rpc_set_error(rpc, "rpc_service: socket error "
354 "%s(%d) while connecting.",
355 strerror(err), err);
356 if (rpc->connect_cb != NULL) {
357 rpc->connect_cb(rpc, RPC_STATUS_ERROR,
358 NULL, rpc->connect_data);
359 }
360 return -1;
361 }
362
363 rpc->is_connected = 1;
364 if (rpc->connect_cb != NULL) {
365 rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
366 }
367 return 0;
368 }
369
370 if (revents & POLLIN) {
371 if (rpc_read_from_socket(rpc) != 0) {
372 rpc_reconnect_requeue(rpc);
373 return 0;
374 }
375 }
376
377 if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
378 if (rpc_write_to_socket(rpc) != 0) {
379 rpc_set_error(rpc, "write to socket failed");
380 return -1;
381 }
382 }
383
384 return 0;
385 }
386
387 void rpc_set_autoreconnect(struct rpc_context *rpc)
388 {
389 assert(rpc->magic == RPC_CONTEXT_MAGIC);
390
391 rpc->auto_reconnect = 1;
392 }
393
394 void rpc_unset_autoreconnect(struct rpc_context *rpc)
395 {
396 assert(rpc->magic == RPC_CONTEXT_MAGIC);
397
398 rpc->auto_reconnect = 0;
399 }
400
401 void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v)
402 {
403 assert(rpc->magic == RPC_CONTEXT_MAGIC);
404
405 rpc->tcp_syncnt = v;
406 }
407
408 #ifndef TCP_SYNCNT
409 #define TCP_SYNCNT 7
410 #endif
411
412 static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s)
413 {
414 int socksize;
415
416 assert(rpc->magic == RPC_CONTEXT_MAGIC);
417
418 switch (s->ss_family) {
419 case AF_INET:
420 socksize = sizeof(struct sockaddr_in);
421 rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
422 #ifdef HAVE_NETINET_TCP_H
423 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
424 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
425 }
426 #endif
427 break;
428 case AF_INET6:
429 socksize = sizeof(struct sockaddr_in6);
430 rpc->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
431 #ifdef HAVE_NETINET_TCP_H
432 if (rpc->tcp_syncnt != RPC_PARAM_UNDEFINED) {
433 set_tcp_sockopt(rpc->fd, TCP_SYNCNT, rpc->tcp_syncnt);
434 }
435 #endif
436 break;
437 default:
438 rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family);
439 return -1;
440 }
441
442 if (rpc->fd == -1) {
443 rpc_set_error(rpc, "Failed to open socket");
444 return -1;
445 }
446
447 /* Some systems allow you to set capabilities on an executable
448 * to allow the file to be executed with privilege to bind to
449 * privileged system ports, even if the user is not root.
450 *
451 * Opportunistically try to bind the socket to a low numbered
452 * system port in the hope that the user is either root or the
453 * executable has the CAP_NET_BIND_SERVICE.
454 *
455 * As soon as we fail the bind() with EACCES we know we will never
456 * be able to bind to a system port so we terminate the loop.
457 *
458 * On linux, use
459 * sudo setcap 'cap_net_bind_service=+ep' /path/executable
460 * to make the executable able to bind to a system port.
461 *
462 * On Windows, there is no concept of privileged ports. Thus
463 * binding will usually succeed.
464 */
465 {
466 struct sockaddr_storage ss;
467 static int portOfs = 0;
468 const int firstPort = 512; /* >= 512 according to Sun docs */
469 const int portCount = IPPORT_RESERVED - firstPort;
470 int startOfs, port, rc;
471
472 if (portOfs == 0) {
473 portOfs = time(NULL) % 400;
474 }
475 startOfs = portOfs;
476 do {
477 rc = -1;
478 port = htons(firstPort + portOfs);
479 portOfs = (portOfs + 1) % portCount;
480
481 /* skip well-known ports */
482 if (!getservbyport(port, "tcp")) {
483 memset(&ss, 0, sizeof(ss));
484
485 switch (s->ss_family) {
486 case AF_INET:
487 ((struct sockaddr_in *)&ss)->sin_port = port;
488 ((struct sockaddr_in *)&ss)->sin_family = AF_INET;
489 #ifdef HAVE_SOCKADDR_LEN
490 ((struct sockaddr_in *)&ss)->sin_len = sizeof(struct sockaddr_in);
491 #endif
492 break;
493 case AF_INET6:
494 ((struct sockaddr_in6 *)&ss)->sin6_port = port;
495 ((struct sockaddr_in6 *)&ss)->sin6_family = AF_INET6;
496 #ifdef HAVE_SOCKADDR_LEN
497 ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr_in);
498 #endif
499 break;
500 }
501
502 rc = bind(rpc->fd, (struct sockaddr *)&ss, socksize);
503 #if !defined(WIN32)
504 /* we got EACCES, so don't try again */
505 if (rc != 0 && errno == EACCES)
506 break;
507 #endif
508 }
509 } while (rc != 0 && portOfs != startOfs);
510 }
511
512 set_nonblocking(rpc->fd);
513
514 if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS) {
515 rpc_set_error(rpc, "connect() to server failed. %s(%d)", strerror(errno), errno);
516 return -1;
517 }
518
519 return 0;
520 }
521
522 int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
523 {
524 struct addrinfo *ai = NULL;
525
526 assert(rpc->magic == RPC_CONTEXT_MAGIC);
527
528 if (rpc->fd != -1) {
529 rpc_set_error(rpc, "Trying to connect while already connected");
530 return -1;
531 }
532
533 if (rpc->is_udp != 0) {
534 rpc_set_error(rpc, "Trying to connect on UDP socket");
535 return -1;
536 }
537
538 rpc->auto_reconnect = 0;
539
540 if (getaddrinfo(server, NULL, NULL, &ai) != 0) {
541 rpc_set_error(rpc, "Invalid address:%s. "
542 "Can not resolv into IPv4/v6 structure.", server);
543 return -1;
544 }
545
546 switch (ai->ai_family) {
547 case AF_INET:
548 ((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family;
549 ((struct sockaddr_in *)&rpc->s)->sin_port = htons(port);
550 #ifdef HAVE_SOCKADDR_LEN
551 ((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in);
552 #endif
553 break;
554 case AF_INET6:
555 ((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family;
556 ((struct sockaddr_in6 *)&rpc->s)->sin6_port = htons(port);
557 #ifdef HAVE_SOCKADDR_LEN
558 ((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6);
559 #endif
560 break;
561 }
562
563 rpc->connect_cb = cb;
564 rpc->connect_data = private_data;
565
566 freeaddrinfo(ai);
567
568 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
569 return -1;
570 }
571
572 return 0;
573 }
574
575 int rpc_disconnect(struct rpc_context *rpc, char *error)
576 {
577 assert(rpc->magic == RPC_CONTEXT_MAGIC);
578
579 rpc_unset_autoreconnect(rpc);
580
581 if (rpc->fd != -1) {
582 close(rpc->fd);
583 }
584 rpc->fd = -1;
585
586 rpc->is_connected = 0;
587
588 rpc_error_all_pdus(rpc, error);
589
590 return 0;
591 }
592
593 static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, void *private_data)
594 {
595 assert(rpc->magic == RPC_CONTEXT_MAGIC);
596
597 if (status != RPC_STATUS_SUCCESS) {
598 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
599 return;
600 }
601
602 rpc->is_connected = 1;
603 rpc->connect_cb = NULL;
604 }
605
606 /* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
607 static int rpc_reconnect_requeue(struct rpc_context *rpc)
608 {
609 struct rpc_pdu *pdu;
610
611 assert(rpc->magic == RPC_CONTEXT_MAGIC);
612
613 if (rpc->fd != -1) {
614 close(rpc->fd);
615 }
616 rpc->fd = -1;
617
618 rpc->is_connected = 0;
619
620 /* socket is closed so we will not get any replies to any commands
621 * in flight. Move them all over from the waitpdu queue back to the out queue
622 */
623 for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
624 rpc_return_to_queue(&rpc->outqueue, pdu);
625 /* we have to re-send the whole pdu again */
626 pdu->written = 0;
627 }
628 rpc_reset_queue(&rpc->waitpdu);
629
630 if (rpc->auto_reconnect != 0) {
631 rpc->connect_cb = reconnect_cb;
632
633 if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
634 rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
635 return -1;
636 }
637 }
638
639 return 0;
640 }
641
642
643 int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port)
644 {
645 struct addrinfo *ai = NULL;
646 char service[6];
647
648 assert(rpc->magic == RPC_CONTEXT_MAGIC);
649
650 if (rpc->is_udp == 0) {
651 rpc_set_error(rpc, "Cant not bind UDP. Not UDP context");
652 return -1;
653 }
654
655 sprintf(service, "%d", port);
656 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
657 rpc_set_error(rpc, "Invalid address:%s. "
658 "Can not resolv into IPv4/v6 structure.", addr);
659 return -1;
660 }
661
662 switch(ai->ai_family) {
663 case AF_INET:
664 rpc->fd = socket(ai->ai_family, SOCK_DGRAM, 0);
665 if (rpc->fd == -1) {
666 rpc_set_error(rpc, "Failed to create UDP socket: %s", strerror(errno));
667 freeaddrinfo(ai);
668 return -1;
669 }
670
671 if (bind(rpc->fd, (struct sockaddr *)ai->ai_addr, sizeof(struct sockaddr_in)) != 0) {
672 rpc_set_error(rpc, "Failed to bind to UDP socket: %s",strerror(errno));
673 freeaddrinfo(ai);
674 return -1;
675 }
676 break;
677 default:
678 rpc_set_error(rpc, "Can not handle UPD sockets of family %d yet", ai->ai_family);
679 freeaddrinfo(ai);
680 return -1;
681 }
682
683 freeaddrinfo(ai);
684
685 return 0;
686 }
687
688 int rpc_set_udp_destination(struct rpc_context *rpc, char *addr, int port, int is_broadcast)
689 {
690 struct addrinfo *ai = NULL;
691 char service[6];
692
693 assert(rpc->magic == RPC_CONTEXT_MAGIC);
694
695 if (rpc->is_udp == 0) {
696 rpc_set_error(rpc, "Can not set destination sockaddr. Not UDP context");
697 return -1;
698 }
699
700 sprintf(service, "%d", port);
701 if (getaddrinfo(addr, service, NULL, &ai) != 0) {
702 rpc_set_error(rpc, "Invalid address:%s. "
703 "Can not resolv into IPv4/v6 structure.", addr);
704 return -1;
705 }
706
707 if (rpc->udp_dest) {
708 free(rpc->udp_dest);
709 rpc->udp_dest = NULL;
710 }
711 rpc->udp_dest = malloc(ai->ai_addrlen);
712 if (rpc->udp_dest == NULL) {
713 rpc_set_error(rpc, "Out of memory. Failed to allocate sockaddr structure");
714 freeaddrinfo(ai);
715 return -1;
716 }
717 memcpy(rpc->udp_dest, ai->ai_addr, ai->ai_addrlen);
718 freeaddrinfo(ai);
719
720 rpc->is_broadcast = is_broadcast;
721 setsockopt(rpc->fd, SOL_SOCKET, SO_BROADCAST, (char *)&is_broadcast, sizeof(is_broadcast));
722
723 return 0;
724 }
725
726 struct sockaddr *rpc_get_recv_sockaddr(struct rpc_context *rpc)
727 {
728 assert(rpc->magic == RPC_CONTEXT_MAGIC);
729
730 return (struct sockaddr *)&rpc->udp_src;
731 }
732
733 int rpc_queue_length(struct rpc_context *rpc)
734 {
735 int i=0;
736 struct rpc_pdu *pdu;
737
738 assert(rpc->magic == RPC_CONTEXT_MAGIC);
739
740 for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
741 i++;
742 }
743 for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {
744 i++;
745 }
746 return i;
747 }
748
749 void rpc_set_fd(struct rpc_context *rpc, int fd)
750 {
751 assert(rpc->magic == RPC_CONTEXT_MAGIC);
752
753 rpc->fd = fd;
754 }