initial libnfs checkin
[deb_libnfs.git] / lib / socket.c
1 /*
2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #include <stdio.h>
19 #include <unistd.h>
20 #include <fcntl.h>
21 #include <poll.h>
22 #include <string.h>
23 #include <errno.h>
24 #include <rpc/xdr.h>
25 #include <arpa/inet.h>
26 #include <sys/ioctl.h>
27 #include "libnfs.h"
28 #include "libnfs-raw.h"
29 #include "libnfs-private.h"
30 #include "slist.h"
31
32 static void set_nonblocking(int fd)
33 {
34 unsigned v;
35 v = fcntl(fd, F_GETFL, 0);
36 fcntl(fd, F_SETFL, v | O_NONBLOCK);
37 }
38
39 int rpc_get_fd(struct rpc_context *rpc)
40 {
41 return rpc->fd;
42 }
43
44 int rpc_which_events(struct rpc_context *rpc)
45 {
46 int events = POLLIN;
47
48 if (rpc->is_connected == 0) {
49 events |= POLLOUT;
50 }
51
52 if (rpc->outqueue) {
53 events |= POLLOUT;
54 }
55 return events;
56 }
57
58 static int rpc_write_to_socket(struct rpc_context *rpc)
59 {
60 ssize_t count;
61
62 if (rpc == NULL) {
63 printf("trying to write to socket for NULL context\n");
64 return -1;
65 }
66 if (rpc->fd == -1) {
67 printf("trying to write but not connected\n");
68 return -2;
69 }
70
71 while (rpc->outqueue != NULL) {
72 ssize_t total;
73
74 total = rpc->outqueue->outdata.size;
75
76 count = write(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written);
77 if (count == -1) {
78 if (errno == EAGAIN || errno == EWOULDBLOCK) {
79 printf("socket would block, return from write to socket\n");
80 return 0;
81 }
82 printf("Error when writing to socket :%s(%d)\n", strerror(errno), errno);
83 return -3;
84 }
85
86 rpc->outqueue->written += count;
87 if (rpc->outqueue->written == total) {
88 struct rpc_pdu *pdu = rpc->outqueue;
89
90 SLIST_REMOVE(&rpc->outqueue, pdu);
91 SLIST_ADD_END(&rpc->waitpdu, pdu);
92 }
93 }
94 return 0;
95 }
96
97 static int rpc_read_from_socket(struct rpc_context *rpc)
98 {
99 int available;
100 int size;
101 unsigned char *buf;
102 ssize_t count;
103
104 if (ioctl(rpc->fd, FIONREAD, &available) != 0) {
105 rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno);
106 return -1;
107 }
108 if (available == 0) {
109 rpc_set_error(rpc, "Socket has been closed");
110 return -2;
111 }
112 size = rpc->insize - rpc->inpos + available;
113 buf = malloc(size);
114 if (buf == NULL) {
115 rpc_set_error(rpc, "Out of memory: failed to allocate %d bytes for input buffer. Closing socket.", size);
116 return -3;
117 }
118 if (rpc->insize > rpc->inpos) {
119 memcpy(buf, rpc->inbuf + rpc->inpos, rpc->insize - rpc->inpos);
120 rpc->insize -= rpc->inpos;
121 rpc->inpos = 0;
122 }
123
124 count = read(rpc->fd, buf + rpc->insize, available);
125 if (count == -1) {
126 if (errno == EINTR) {
127 free(buf);
128 buf = NULL;
129 return 0;
130 }
131 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
132 free(buf);
133 buf = NULL;
134 return -4;
135 }
136
137 if (rpc->inbuf != NULL) {
138 free(rpc->inbuf);
139 }
140 rpc->inbuf = (char *)buf;
141 rpc->insize += count;
142
143 while (1) {
144 if (rpc->insize - rpc->inpos < 4) {
145 return 0;
146 }
147 count = rpc_get_pdu_size(rpc->inbuf + rpc->inpos);
148 if (rpc->insize + rpc->inpos < count) {
149 return 0;
150 }
151 if (rpc_process_pdu(rpc, rpc->inbuf + rpc->inpos, count) != 0) {
152 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
153 return -5;
154 }
155 rpc->inpos += count;
156 if (rpc->inpos == rpc->insize) {
157 free(rpc->inbuf);
158 rpc->inbuf = NULL;
159 rpc->insize = 0;
160 rpc->inpos = 0;
161 }
162 }
163 return 0;
164 }
165
166
167
168 int rpc_service(struct rpc_context *rpc, int revents)
169 {
170 if (revents & POLLERR) {
171 printf("rpc_service: POLLERR, socket error\n");
172 if (rpc->is_connected == 0) {
173 rpc_set_error(rpc, "Failed to connect to server socket.");
174 } else {
175 rpc_set_error(rpc, "Socket closed with POLLERR");
176 }
177 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
178 return -1;
179 }
180 if (revents & POLLHUP) {
181 printf("rpc_service: POLLHUP, socket error\n");
182 rpc_set_error(rpc, "Socket failed with POLLHUP");
183 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
184 return -2;
185 }
186
187 if (rpc->is_connected == 0 && rpc->fd != -1 && revents&POLLOUT) {
188 rpc->is_connected = 1;
189 rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
190 return 0;
191 }
192
193 if (revents & POLLOUT && rpc->outqueue != NULL) {
194 if (rpc_write_to_socket(rpc) != 0) {
195 printf("write to socket failed\n");
196 return -3;
197 }
198 }
199
200 if (revents & POLLIN) {
201 if (rpc_read_from_socket(rpc) != 0) {
202 rpc_disconnect(rpc, rpc_get_error(rpc));
203 return -4;
204 }
205 }
206
207 return 0;
208 }
209
210
211 int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
212 {
213 struct sockaddr_storage s;
214 struct sockaddr_in *sin = (struct sockaddr_in *)&s;
215 int socksize;
216
217 if (rpc->fd != -1) {
218 rpc_set_error(rpc, "Trying to connect while already connected");
219 printf("%s\n", rpc->error_string);
220 return -1;
221 }
222
223 sin->sin_family = AF_INET;
224 sin->sin_port = htons(port);
225 if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
226 rpc_set_error(rpc, "Not a valid server ip address");
227 printf("%s\n", rpc->error_string);
228 return -2;
229 }
230
231 switch (s.ss_family) {
232 case AF_INET:
233 rpc->fd = socket(AF_INET, SOCK_STREAM, 0);
234 socksize = sizeof(struct sockaddr_in);
235 break;
236 }
237
238 if (rpc->fd == -1) {
239 rpc_set_error(rpc, "Failed to open socket");
240 printf("%s\n", rpc->error_string);
241 return -3;
242 }
243
244 rpc->connect_cb = cb;
245 rpc->connect_data = private_data;
246
247 set_nonblocking(rpc->fd);
248
249 if (connect(rpc->fd, (struct sockaddr *)&s, socksize) != 0 && errno != EINPROGRESS) {
250 rpc_set_error(rpc, "connect() to server failed");
251 printf("%s\n", rpc->error_string);
252 return -4;
253 }
254
255 return 0;
256 }
257
258 int rpc_disconnect(struct rpc_context *rpc, char *error)
259 {
260 if (rpc->fd != -1) {
261 close(rpc->fd);
262 }
263 rpc->fd = -1;
264
265 rpc->is_connected = 0;
266
267 rpc_error_all_pdus(rpc, error);
268
269 return 0;
270 }