From b077fdeb65aca9d9bc15fcd311372816d87dc213 Mon Sep 17 00:00:00 2001 From: Ronnie Sahlberg Date: Sun, 31 Jul 2011 10:46:34 +1000 Subject: [PATCH] Add automatic reconnect after TCP session failure for the sync interface. Once a NFS TCP connection has been idle for a very long time, say 10-15 minutes, it is common that NFS servers will tear down the TCP connection. So add code to re-connect to the NFS server and re-issue any i/o that might have been in flight (unlikely unless the server has hung) While it is sufficient to just reconnect to tcp port 2049, it was much simplet to just utilize the nfs_mount() function. This means that instead of just reconnecting TCP port 2049, we do a full blown nfs mount via portmapper and mountd protocols. --- include/libnfs-private.h | 2 + lib/libnfs-sync.c | 101 ++++++++++++++++++++++++++------------- lib/libnfs.c | 25 +++++++++- lib/socket.c | 39 ++++++++++++--- 4 files changed, 125 insertions(+), 42 deletions(-) diff --git a/include/libnfs-private.h b/include/libnfs-private.h index 3131a20..c6a8820 100644 --- a/include/libnfs-private.h +++ b/include/libnfs-private.h @@ -75,6 +75,8 @@ void rpc_set_error(struct rpc_context *rpc, char *error_string, ...); void nfs_set_error(struct nfs_context *nfs, char *error_string, ...); struct rpc_context *nfs_get_rpc_context(struct nfs_context *nfs); +const char *nfs_get_server(struct nfs_context *nfs); +const char *nfs_get_export(struct nfs_context *nfs); /* we dont want to expose UDP to normal applications/users this is private to libnfs to use exclusively for broadcast RPC */ int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port); diff --git a/lib/libnfs-sync.c b/lib/libnfs-sync.c index e964c14..e1f95ff 100644 --- a/lib/libnfs-sync.c +++ b/lib/libnfs-sync.c @@ -53,13 +53,10 @@ static void wait_for_reply(struct rpc_context *rpc, struct sync_cb_data *cb_data { struct pollfd pfd; - for (;;) { - if (cb_data->is_finished) { - break; - } + while (!cb_data->is_finished) { + pfd.fd = rpc_get_fd(rpc); pfd.events = rpc_which_events(rpc); - if (poll(&pfd, 1, -1) < 0) { rpc_set_error(rpc, "Poll failed"); cb_data->status = -EIO; @@ -70,6 +67,44 @@ static void wait_for_reply(struct rpc_context *rpc, struct sync_cb_data *cb_data cb_data->status = -EIO; break; } + if (rpc_get_fd(rpc) == -1) { + rpc_set_error(rpc, "Socket closed\n"); + break; + } + } +} + +static void wait_for_nfs_reply(struct nfs_context *nfs, struct sync_cb_data *cb_data) +{ + struct pollfd pfd; + + while (!cb_data->is_finished) { + + pfd.fd = nfs_get_fd(nfs); + pfd.events = nfs_which_events(nfs); + if (poll(&pfd, 1, -1) < 0) { + nfs_set_error(nfs, "Poll failed"); + cb_data->status = -EIO; + break; + } + if (nfs_service(nfs, pfd.revents) < 0) { + nfs_set_error(nfs, "nfs_service failed"); + cb_data->status = -EIO; + break; + } + if (nfs_get_fd(nfs) == -1) { + char *server = strdup(nfs_get_server(nfs)); + char *export = strdup(nfs_get_export(nfs)); + + if (nfs_mount(nfs, server, export) != 0) { + nfs_set_error(nfs, "Failed to reconnect to nfs server %s", nfs_get_error(nfs)); + free(server); + free(export); + break; + } + free(server); + free(export); + } } } @@ -105,7 +140,7 @@ int nfs_mount(struct nfs_context *nfs, const char *server, const char *export) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -141,7 +176,7 @@ int nfs_stat(struct nfs_context *nfs, const char *path, struct stat *st) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -182,7 +217,7 @@ int nfs_open(struct nfs_context *nfs, const char *path, int mode, struct nfsfh * return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -221,7 +256,7 @@ int nfs_pread(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset, size_t return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -260,7 +295,7 @@ int nfs_close(struct nfs_context *nfs, struct nfsfh *nfsfh) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -283,7 +318,7 @@ int nfs_fstat(struct nfs_context *nfs, struct nfsfh *nfsfh, struct stat *st) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -315,7 +350,7 @@ int nfs_pwrite(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset, size_ return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -355,7 +390,7 @@ int nfs_fsync(struct nfs_context *nfs, struct nfsfh *nfsfh) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -389,7 +424,7 @@ int nfs_ftruncate(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t length) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -422,7 +457,7 @@ int nfs_truncate(struct nfs_context *nfs, const char *path, off_t length) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -457,7 +492,7 @@ int nfs_mkdir(struct nfs_context *nfs, const char *path) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -492,7 +527,7 @@ int nfs_rmdir(struct nfs_context *nfs, const char *path) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -532,7 +567,7 @@ int nfs_creat(struct nfs_context *nfs, const char *path, int mode, struct nfsfh return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -567,7 +602,7 @@ int nfs_unlink(struct nfs_context *nfs, const char *path) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -607,7 +642,7 @@ int nfs_opendir(struct nfs_context *nfs, const char *path, struct nfsdir **nfsdi return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -645,7 +680,7 @@ int nfs_lseek(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset, int wh return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -682,7 +717,7 @@ int nfs_statvfs(struct nfs_context *nfs, const char *path, struct statvfs *svfs) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -728,7 +763,7 @@ int nfs_readlink(struct nfs_context *nfs, const char *path, char *buf, int bufsi return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -762,7 +797,7 @@ int nfs_chmod(struct nfs_context *nfs, const char *path, int mode) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -797,7 +832,7 @@ int nfs_fchmod(struct nfs_context *nfs, struct nfsfh *nfsfh, int mode) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -832,7 +867,7 @@ int nfs_chown(struct nfs_context *nfs, const char *path, int uid, int gid) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -864,7 +899,7 @@ int nfs_fchown(struct nfs_context *nfs, struct nfsfh *nfsfh, int uid, int gid) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -898,7 +933,7 @@ int nfs_utimes(struct nfs_context *nfs, const char *path, struct timeval *times) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -932,7 +967,7 @@ int nfs_utime(struct nfs_context *nfs, const char *path, struct utimbuf *times) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -967,7 +1002,7 @@ int nfs_access(struct nfs_context *nfs, const char *path, int mode) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -1001,7 +1036,7 @@ int nfs_symlink(struct nfs_context *nfs, const char *oldpath, const char *newpat return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -1035,7 +1070,7 @@ int nfs_rename(struct nfs_context *nfs, const char *oldpath, const char *newpath return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } @@ -1069,7 +1104,7 @@ int nfs_link(struct nfs_context *nfs, const char *oldpath, const char *newpath) return -1; } - wait_for_reply(nfs_get_rpc_context(nfs), &cb_data); + wait_for_nfs_reply(nfs, &cb_data); return cb_data.status; } diff --git a/lib/libnfs.c b/lib/libnfs.c index 8d27a81..c3904bf 100644 --- a/lib/libnfs.c +++ b/lib/libnfs.c @@ -144,6 +144,9 @@ struct nfs_context *nfs_init_context(void) return NULL; } + nfs->server = NULL; + nfs->export = NULL; + return nfs; } @@ -470,6 +473,7 @@ static void nfs_mount_1_cb(struct rpc_context *rpc, int status, void *command_da int nfs_mount_async(struct nfs_context *nfs, const char *server, const char *export, nfs_cb cb, void *private_data) { struct nfs_cb_data *data; + char *new_server, *new_export; data = malloc(sizeof(struct nfs_cb_data)); if (data == NULL) { @@ -477,8 +481,18 @@ int nfs_mount_async(struct nfs_context *nfs, const char *server, const char *exp return -1; } bzero(data, sizeof(struct nfs_cb_data)); - nfs->server = strdup(server); - nfs->export = strdup(export); + new_server = strdup(server); + new_export = strdup(export); + if (nfs->server != NULL) { + free(nfs->server); + nfs->server = NULL; + } + nfs->server = new_server; + if (nfs->export != NULL) { + free(nfs->export); + nfs->export = NULL; + } + nfs->export = new_export; data->nfs = nfs; data->cb = cb; data->private_data = private_data; @@ -3045,3 +3059,10 @@ struct rpc_context *nfs_get_rpc_context(struct nfs_context *nfs) return nfs->rpc; } +const char *nfs_get_server(struct nfs_context *nfs) { + return nfs->server; +} + +const char *nfs_get_export(struct nfs_context *nfs) { + return nfs->export; +} diff --git a/lib/socket.c b/lib/socket.c index e4de894..503935c 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -40,6 +40,8 @@ #include "libnfs-private.h" #include "slist.h" +static int rpc_disconnect_requeue(struct rpc_context *rpc); + static void set_nonblocking(int fd) { unsigned v; @@ -267,16 +269,16 @@ int rpc_service(struct rpc_context *rpc, int revents) return 0; } - if (revents & POLLOUT && rpc->outqueue != NULL) { - if (rpc_write_to_socket(rpc) != 0) { - rpc_set_error(rpc, "write to socket failed"); - return -1; + if (revents & POLLIN) { + if (rpc_read_from_socket(rpc) != 0) { + rpc_disconnect_requeue(rpc); + return 0; } } - if (revents & POLLIN) { - if (rpc_read_from_socket(rpc) != 0) { - rpc_disconnect(rpc, rpc_get_error(rpc)); + if (revents & POLLOUT && rpc->outqueue != NULL) { + if (rpc_write_to_socket(rpc) != 0) { + rpc_set_error(rpc, "write to socket failed"); return -1; } } @@ -350,6 +352,29 @@ int rpc_disconnect(struct rpc_context *rpc, char *error) return 0; } +/* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue */ +static int rpc_disconnect_requeue(struct rpc_context *rpc) +{ + struct rpc_pdu *pdu; + + if (rpc->fd != -1) { + close(rpc->fd); + } + rpc->fd = -1; + + rpc->is_connected = 0; + + /* socket is closed so we will not get any replies to any commands + * in flight. Move them all over from the waitpdu queue back to the out queue + */ + for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) { + SLIST_REMOVE(&rpc->waitpdu, pdu); + SLIST_ADD(&rpc->outqueue, pdu); + } + + return 0; +} + int rpc_bind_udp(struct rpc_context *rpc, char *addr, int port) { -- 2.34.1