7a291346b7bb4d4f0f39f8cc728899441407c488
[deb_libnfs.git] / lib / socket.c
1 /*
2 Copyright (C) 2010 by Ronnie Sahlberg <ronniesahlberg@gmail.com>
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 2.1 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with this program; if not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <poll.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <rpc/rpc.h>
26 #include <rpc/xdr.h>
27 #include <arpa/inet.h>
28 #include <sys/ioctl.h>
29 #include "libnfs.h"
30 #include "libnfs-raw.h"
31 #include "libnfs-private.h"
32 #include "slist.h"
33
34 static void set_nonblocking(int fd)
35 {
36 unsigned v;
37 v = fcntl(fd, F_GETFL, 0);
38 fcntl(fd, F_SETFL, v | O_NONBLOCK);
39 }
40
41 int rpc_get_fd(struct rpc_context *rpc)
42 {
43 return rpc->fd;
44 }
45
46 int rpc_which_events(struct rpc_context *rpc)
47 {
48 int events = rpc->is_connected ? POLLIN : POLLOUT;
49
50 if (rpc->outqueue) {
51 events |= POLLOUT;
52 }
53 return events;
54 }
55
56 static int rpc_write_to_socket(struct rpc_context *rpc)
57 {
58 ssize_t count;
59
60 if (rpc == NULL) {
61 return -1;
62 }
63 if (rpc->fd == -1) {
64 rpc_set_error(rpc, "trying to write but not connected");
65 return -1;
66 }
67
68 while (rpc->outqueue != NULL) {
69 ssize_t total;
70
71 total = rpc->outqueue->outdata.size;
72
73 count = write(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written);
74 if (count == -1) {
75 if (errno == EAGAIN || errno == EWOULDBLOCK) {
76 return 0;
77 }
78 rpc_set_error(rpc, "Error when writing to socket :%s(%d)", strerror(errno), errno);
79 return -1;
80 }
81
82 rpc->outqueue->written += count;
83 if (rpc->outqueue->written == total) {
84 struct rpc_pdu *pdu = rpc->outqueue;
85
86 SLIST_REMOVE(&rpc->outqueue, pdu);
87 SLIST_ADD_END(&rpc->waitpdu, pdu);
88 }
89 }
90 return 0;
91 }
92
93 static int rpc_read_from_socket(struct rpc_context *rpc)
94 {
95 int available;
96 int size;
97 int pdu_size;
98 ssize_t count;
99
100 if (ioctl(rpc->fd, FIONREAD, &available) != 0) {
101 rpc_set_error(rpc, "Ioctl FIONREAD returned error : %d. Closing socket.", errno);
102 return -1;
103 }
104 if (available == 0) {
105 rpc_set_error(rpc, "Socket has been closed");
106 return -1;
107 }
108
109 /* read record marker, 4 bytes at the beginning of every pdu */
110 if (rpc->inbuf == NULL) {
111 rpc->insize = 4;
112 rpc->inbuf = malloc(rpc->insize);
113 if (rpc->inbuf == NULL) {
114 rpc_set_error(rpc, "Failed to allocate buffer for record marker, errno:%d. Closing socket.", errno);
115 return -1;
116 }
117 }
118 if (rpc->inpos < 4) {
119 size = 4 - rpc->inpos;
120
121 count = read(rpc->fd, rpc->inbuf + rpc->inpos, size);
122 if (count == -1) {
123 if (errno == EINTR) {
124 return 0;
125 }
126 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
127 return -1;
128 }
129 available -= count;
130 rpc->inpos += count;
131 }
132
133 if (available == 0) {
134 return 0;
135 }
136
137 pdu_size = rpc_get_pdu_size(rpc->inbuf);
138 if (rpc->insize < pdu_size) {
139 unsigned char *buf;
140
141 buf = malloc(pdu_size);
142 if (buf == NULL) {
143 rpc_set_error(rpc, "Failed to allocate buffer of %d bytes for pdu, errno:%d. Closing socket.", pdu_size, errno);
144 return -1;
145 }
146 memcpy(buf, rpc->inbuf, rpc->insize);
147 free(rpc->inbuf);
148 rpc->inbuf = buf;
149 rpc->insize = rpc_get_pdu_size(rpc->inbuf);
150 }
151
152 size = available;
153 if (size > rpc->insize - rpc->inpos) {
154 size = rpc->insize - rpc->inpos;
155 }
156
157 count = read(rpc->fd, rpc->inbuf + rpc->inpos, size);
158 if (count == -1) {
159 if (errno == EINTR) {
160 return 0;
161 }
162 rpc_set_error(rpc, "Read from socket failed, errno:%d. Closing socket.", errno);
163 return -1;
164 }
165 available -= count;
166 rpc->inpos += count;
167
168 if (rpc->inpos == rpc->insize) {
169 if (rpc_process_pdu(rpc, rpc->inbuf, pdu_size) != 0) {
170 rpc_set_error(rpc, "Invalid/garbage pdu received from server. Closing socket");
171 return -1;
172 }
173 free(rpc->inbuf);
174 rpc->inbuf = NULL;
175 rpc->insize = 0;
176 rpc->inpos = 0;
177 }
178
179 return 0;
180 }
181
182
183
184 int rpc_service(struct rpc_context *rpc, int revents)
185 {
186 if (revents & POLLERR) {
187 int err = 0;
188 socklen_t err_size = sizeof(err);
189
190 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
191 &err, &err_size) != 0 || err != 0) {
192 if (err == 0) {
193 err = errno;
194 }
195 rpc_set_error(rpc, "rpc_service: socket error "
196 "%s(%d).",
197 strerror(err), err);
198 } else {
199 rpc_set_error(rpc, "rpc_service: POLLERR, "
200 "Unknown socket error.");
201 }
202 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
203 return -1;
204 }
205 if (revents & POLLHUP) {
206 rpc_set_error(rpc, "Socket failed with POLLHUP");
207 rpc->connect_cb(rpc, RPC_STATUS_ERROR, rpc->error_string, rpc->connect_data);
208 return -1;
209 }
210
211 if (rpc->is_connected == 0 && rpc->fd != -1 && revents&POLLOUT) {
212 int err = 0;
213 socklen_t err_size = sizeof(err);
214
215 if (getsockopt(rpc->fd, SOL_SOCKET, SO_ERROR,
216 &err, &err_size) != 0 || err != 0) {
217 if (err == 0) {
218 err = errno;
219 }
220 rpc_set_error(rpc, "rpc_service: socket error "
221 "%s(%d) while connecting.",
222 strerror(err), err);
223 rpc->connect_cb(rpc, RPC_STATUS_ERROR,
224 NULL, rpc->connect_data);
225 return -1;
226 }
227
228 rpc->is_connected = 1;
229 rpc->connect_cb(rpc, RPC_STATUS_SUCCESS, NULL, rpc->connect_data);
230 return 0;
231 }
232
233 if (revents & POLLOUT && rpc->outqueue != NULL) {
234 if (rpc_write_to_socket(rpc) != 0) {
235 rpc_set_error(rpc, "write to socket failed");
236 return -1;
237 }
238 }
239
240 if (revents & POLLIN) {
241 if (rpc_read_from_socket(rpc) != 0) {
242 rpc_disconnect(rpc, rpc_get_error(rpc));
243 return -1;
244 }
245 }
246
247 return 0;
248 }
249
250
251 int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
252 {
253 struct sockaddr_storage s;
254 struct sockaddr_in *sin = (struct sockaddr_in *)&s;
255 int socksize;
256
257 if (rpc->fd != -1) {
258 rpc_set_error(rpc, "Trying to connect while already connected");
259 return -1;
260 }
261
262 sin->sin_family = AF_INET;
263 sin->sin_port = htons(port);
264 if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
265 rpc_set_error(rpc, "Not a valid server ip address");
266 return -1;
267 }
268
269 switch (s.ss_family) {
270 case AF_INET:
271 socksize = sizeof(struct sockaddr_in);
272 #ifdef HAVE_SOCK_SIN_LEN
273 sin->sin_len = socksize;
274 #endif
275 rpc->fd = socket(AF_INET, SOCK_STREAM, 0);
276 break;
277 }
278
279 if (rpc->fd == -1) {
280 rpc_set_error(rpc, "Failed to open socket");
281 return -1;
282 }
283
284 rpc->connect_cb = cb;
285 rpc->connect_data = private_data;
286
287 set_nonblocking(rpc->fd);
288
289 if (connect(rpc->fd, (struct sockaddr *)&s, socksize) != 0 && errno != EINPROGRESS) {
290 rpc_set_error(rpc, "connect() to server failed");
291 return -1;
292 }
293
294 return 0;
295 }
296
297 int rpc_disconnect(struct rpc_context *rpc, char *error)
298 {
299 if (rpc->fd != -1) {
300 close(rpc->fd);
301 }
302 rpc->fd = -1;
303
304 rpc->is_connected = 0;
305
306 rpc_error_all_pdus(rpc, error);
307
308 return 0;
309 }