nfs_pread_async: handle short reads
authorPeter Lieven <pl@kamp.de>
Sun, 16 Mar 2014 19:15:48 +0000 (20:15 +0100)
committerPeter Lieven <pl@kamp.de>
Sun, 16 Mar 2014 19:15:48 +0000 (20:15 +0100)
the RFC allows the server to read less bytes than requested even
if not at the EOF.
this patch implements a reissue logic for the reminder of the
read request(s).

Signed-off-by: Peter Lieven <pl@kamp.de>
lib/libnfs.c

index 2da52e4368abc2c95e27df905c9b03f62dd1f6ed..9fdbd0d549e393a4353fe3ac975494ccec9e70be 100644 (file)
@@ -130,9 +130,11 @@ struct nfs_cb_data {
        /* for multi-read/write calls. */
        int error;
        int cancel;
+       int oom;
        int num_calls;
        uint64_t start_offset, max_offset;
        char *buffer;
+       size_t request_size;
        char *usrbuf;
 };
 
@@ -1529,38 +1531,6 @@ int nfs_chdir_async(struct nfs_context *nfs, const char *path, nfs_cb cb, void *
 /*
  * Async pread()
  */
-static void nfs_pread_cb(struct rpc_context *rpc, int status, void *command_data, void *private_data)
-{
-       struct nfs_cb_data *data = private_data;
-       struct nfs_context *nfs = data->nfs;
-       READ3res *res;
-
-       assert(rpc->magic == RPC_CONTEXT_MAGIC);
-
-       if (status == RPC_STATUS_ERROR) {
-               data->cb(-EFAULT, nfs, command_data, data->private_data);
-               free_nfs_cb_data(data);
-               return;
-       }
-       if (status == RPC_STATUS_CANCEL) {
-               data->cb(-EINTR, nfs, "Command was cancelled", data->private_data);
-               free_nfs_cb_data(data);
-               return;
-       }
-
-       res = command_data;
-       if (res->status != NFS3_OK) {
-               rpc_set_error(nfs->rpc, "NFS: Read failed with %s(%d)", nfsstat3_to_str(res->status), nfsstat3_to_errno(res->status));
-               data->cb(nfsstat3_to_errno(res->status), nfs, rpc_get_error(nfs->rpc), data->private_data);
-               free_nfs_cb_data(data);
-               return;
-       }
-
-       data->nfsfh->offset += res->READ3res_u.resok.count;
-       data->cb(res->READ3res_u.resok.count, nfs, res->READ3res_u.resok.data.data_val, data->private_data);
-       free_nfs_cb_data(data);
-}
-
 static void nfs_fill_READ3args(READ3args *args, struct nfsfh *fh, uint64_t offset, uint64_t count)
 {
        memset(args, 0, sizeof(READ3args));
@@ -1589,16 +1559,28 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat
                data->cancel = 1;
        }
 
-       /* reassemble the data into the buffer */
        if (status == RPC_STATUS_SUCCESS) {
                res = command_data;
                if (res->status != NFS3_OK) {
                        rpc_set_error(nfs->rpc, "NFS: Read failed with %s(%d)", nfsstat3_to_str(res->status), nfsstat3_to_errno(res->status));
                        data->error = 1;
                } else  {
+                       /* if we have more than one call or we have received a short read we need a reassembly buffer */
+                       if (data->num_calls || (res->READ3res_u.resok.count < mdata->count && !res->READ3res_u.resok.eof)) {
+                               if (data->buffer == NULL) {
+                                       data->buffer =  malloc(data->request_size);
+                                       if (data->buffer == NULL) {
+                                               rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)data->request_size);
+                                               data->oom = 1;
+                                       }
+                               }
+                       }
                        if (res->READ3res_u.resok.count > 0) {
                                if (res->READ3res_u.resok.count <= mdata->count) {
-                                       memcpy(&data->buffer[mdata->offset - data->start_offset], res->READ3res_u.resok.data.data_val, res->READ3res_u.resok.count);
+                                       /* copy data into reassembly buffer if we have one */
+                                       if (data->buffer != NULL) {
+                                               memcpy(&data->buffer[mdata->offset - data->start_offset], res->READ3res_u.resok.data.data_val, res->READ3res_u.resok.count);
+                                       }
                                        if (data->max_offset < mdata->offset + res->READ3res_u.resok.count) {
                                                data->max_offset = mdata->offset + res->READ3res_u.resok.count;
                                        }
@@ -1607,33 +1589,59 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat
                                        data->error = 1;
                                }
                        }
+                       /* check if we have received a short read */
+                       if (res->READ3res_u.resok.count < mdata->count && !res->READ3res_u.resok.eof) {
+                               if (res->READ3res_u.resok.count == 0) {
+                                       rpc_set_error(nfs->rpc, "NFS: Read failed. No bytes read and not at EOF!");
+                                       data->error = 1;
+                               } else {
+                                       /* reissue reminder of this read request */
+                                       READ3args args;
+                                       mdata->offset += res->READ3res_u.resok.count;
+                                       mdata->count -= res->READ3res_u.resok.count;
+                                       nfs_fill_READ3args(&args, data->nfsfh, mdata->offset, mdata->count);
+                                       if (rpc_nfs3_read_async(nfs->rpc, nfs_pread_mcb, &args, mdata) == 0) {
+                                               data->num_calls++;
+                                               return;
+                                       } else {
+                                               rpc_set_error(nfs->rpc, "RPC error: Failed to send READ call for %s", data->path);
+                                               data->error = 1;
+                                       }
+                               }
+                       }
                }
        }
 
+       free(mdata);
+
        if (data->num_calls > 0) {
                /* still waiting for more replies */
-               free(mdata);
                return;
        }
-
+       if (data->oom != 0) {
+               data->cb(-ENOMEM, nfs, command_data, data->private_data);
+               free_nfs_cb_data(data);
+               return;
+       }
        if (data->error != 0) {
                data->cb(-EFAULT, nfs, command_data, data->private_data);
                free_nfs_cb_data(data);
-               free(mdata);
                return;
        }
        if (data->cancel != 0) {
                data->cb(-EINTR, nfs, "Command was cancelled", data->private_data);
                free_nfs_cb_data(data);
-               free(mdata);
                return;
        }
 
        data->nfsfh->offset = data->max_offset;
-       data->cb(data->max_offset - data->start_offset, nfs, data->buffer, data->private_data);
+       if (data->buffer) {
+               data->cb(data->max_offset - data->start_offset, nfs, data->buffer, data->private_data);
+       } else {
+               data->cb(res->READ3res_u.resok.count, nfs, res->READ3res_u.resok.data.data_val, data->private_data);
+       }
 
        free_nfs_cb_data(data);
-       free(mdata);
 }
 
 int nfs_pread_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offset, uint64_t count, nfs_cb cb, void *private_data)
@@ -1650,23 +1658,10 @@ int nfs_pread_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offse
        data->cb           = cb;
        data->private_data = private_data;
        data->nfsfh        = nfsfh;
+       data->request_size = count;
 
        nfsfh->offset = offset;
 
-       if (count <= nfs_get_readmax(nfs)) {
-               READ3args args;
-               nfs_fill_READ3args(&args, nfsfh, offset, count);
-
-               if (rpc_nfs3_read_async(nfs->rpc, nfs_pread_cb, &args, data) != 0) {
-                       rpc_set_error(nfs->rpc, "RPC error: Failed to send READ call for %s", data->path);
-                       data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
-                       free_nfs_cb_data(data);
-                       return -1;
-               }
-               return 0;
-       }
-
-       assert(count > 0);
        assert(data->num_calls == 0);
 
        /* trying to read more than maximum server read size, we has to chop it up into smaller
@@ -1676,15 +1671,7 @@ int nfs_pread_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offse
        data->max_offset = offset;
        data->start_offset = offset;
 
-       data->buffer =  malloc(count);
-       if (data->buffer == NULL) {
-               rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)count);
-               data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
-               free_nfs_cb_data(data);
-               return -1;
-       }
-
-       while (count > 0) {
+       do {
                uint64_t readcount = count;
                struct nfs_mcb_data *mdata;
                READ3args args;
@@ -1719,7 +1706,7 @@ int nfs_pread_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offse
                count               -= readcount;
                offset              += readcount;
                data->num_calls++;
-        }
+        } while (count > 0);
 
         return 0;
 }