Add support in read/pread to be aware of the Maximum Read size that the server
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Sat, 18 Jun 2011 22:43:28 +0000 (08:43 +1000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Sat, 18 Jun 2011 22:43:28 +0000 (08:43 +1000)
suggested.

When a read/pread operation that is 'oversize' is issued, the read/pread command will internally chop this up into smaller chunsk and send then concurrently to the server.
As replies come back from the server we add the data to a reassembly buffer
and as soon as all replies have been received we invoke the callback and pass the reassembly buffer back to the application.

examples/nfsclient-async.c
lib/libnfs.c

index 4c7624c7d0c6450655e2b832e84fd039213a3d9b..b630f4e794c7ce0305baa332dfbec805470a52aa 100644 (file)
@@ -19,7 +19,7 @@
  */
 
 #define SERVER "10.1.1.27"
-#define EXPORT "/shared"
+#define EXPORT "/VIRTUAL"
 #define NFSFILE "/BOOKS/Classics/Dracula.djvu"
 #define NFSDIR "/BOOKS/Classics/"
 
@@ -138,8 +138,8 @@ void nfs_open_cb(int status, struct nfs_context *nfs, void *data, void *private_
        nfsfh         = data;
        client->nfsfh = nfsfh;
        printf("Got reply from server for open(%s). Handle:%p\n", NFSFILE, data);
-       printf("Read first 16 bytes\n");
-       if (nfs_pread_async(nfs, nfsfh, 0, 16, nfs_read_cb, client) != 0) {
+       printf("Read first 64 bytes\n");
+       if (nfs_pread_async(nfs, nfsfh, 0, 64, nfs_read_cb, client) != 0) {
                printf("Failed to start async nfs open\n");
                exit(10);
        }
index 06eea35a343c9a3d63a0bfe72f02b3761bd85131..6687180a66de7abeab7fb1a76e1a7049893f4fe2 100644 (file)
@@ -84,6 +84,19 @@ struct nfs_cb_data {
        int continue_int;
 
        struct nfs_fh3 fh;
+
+       /* for multi-read/write calls. */
+       int error;
+       int cancel;
+       int num_calls;
+       off_t start_offset, max_offset;
+       char *buffer;
+};
+
+struct nfs_mcb_data {
+       struct nfs_cb_data *data;
+       off_t offset;
+       size_t count;
 };
 
 static int nfs_lookup_path_async_internal(struct nfs_context *nfs, struct nfs_cb_data *data, struct nfs_fh3 *fh);
@@ -173,6 +186,11 @@ void free_nfs_cb_data(struct nfs_cb_data *data)
                data->fh.data.data_val = NULL;
        }
 
+       if (data->buffer != NULL) {
+               free(data->buffer);
+               data->buffer = NULL;
+       }
+
        free(data);
 }
 
@@ -568,7 +586,6 @@ static int nfs_lookuppath_async(struct nfs_context *nfs, const char *path, nfs_c
        if (data == NULL) {
                rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
                printf("failed to allocate memory for nfs cb data\n");
-               free_nfs_cb_data(data);
                return -2;
        }
        bzero(data, sizeof(struct nfs_cb_data));
@@ -818,6 +835,64 @@ static void nfs_pread_cb(struct rpc_context *rpc _U_, int status, void *command_
        free_nfs_cb_data(data);
 }
 
+static void nfs_pread_mcb(struct rpc_context *rpc _U_, int status, void *command_data, void *private_data)
+{
+       struct nfs_mcb_data *mdata = private_data;
+       struct nfs_cb_data *data = mdata->data;
+       struct nfs_context *nfs = data->nfs;
+       READ3res *res;
+
+       data->num_calls--;
+
+       if (status == RPC_STATUS_ERROR) {
+               /* flag the failure but do not invoke callback until we have received all responses */
+               data->error = 1;
+       }
+       if (status == RPC_STATUS_CANCEL) {
+               /* flag the cancellation but do not invoke callback until we have received all responses */
+               data->cancel = 1;
+       }
+
+       /* reassemble the data into the buffer */
+       if (status == RPC_STATUS_SUCCESS) {
+               res = command_data;
+               if (res->status != NFS3_OK) {
+                       rpc_set_error(nfs->rpc, "NFS: Read failed with %s(%d)", nfsstat3_to_str(res->status), nfsstat3_to_errno(res->status));
+                       data->error = 1;
+               } else {
+                       memcpy(&data->buffer[mdata->offset], res->READ3res_u.resok.data.data_val, res->READ3res_u.resok.count);
+                       if (data->max_offset < mdata->offset + res->READ3res_u.resok.count) {
+                               data->max_offset = mdata->offset + res->READ3res_u.resok.count;
+                       }
+               }
+       }
+
+       if (data->num_calls > 0) {
+               /* still waiting for more replies */
+               free(mdata);
+               return;
+       }
+
+
+       if (data->error != 0) {
+               data->cb(-EFAULT, nfs, command_data, data->private_data);
+               free_nfs_cb_data(data);
+               free(mdata);
+               return;
+       }
+       if (data->cancel != 0) {
+               data->cb(-EINTR, nfs, "Command was cancelled", data->private_data);
+               free_nfs_cb_data(data);
+               free(mdata);
+               return;
+       }
+
+       data->nfsfh->offset = data->max_offset;
+       data->cb(data->max_offset - data->start_offset, nfs, data->buffer, data->private_data);
+       free_nfs_cb_data(data);
+       free(mdata);
+}
+
 int nfs_pread_async(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset, size_t count, nfs_cb cb, void *private_data)
 {
        struct nfs_cb_data *data;
@@ -826,7 +901,6 @@ int nfs_pread_async(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset,
        if (data == NULL) {
                rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
                printf("failed to allocate memory for nfs cb data\n");
-               free_nfs_cb_data(data);
                return -1;
        }
        bzero(data, sizeof(struct nfs_cb_data));
@@ -836,13 +910,63 @@ int nfs_pread_async(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset,
        data->nfsfh        = nfsfh;
 
        nfsfh->offset = offset;
-       if (rpc_nfs_read_async(nfs->rpc, nfs_pread_cb, &nfsfh->fh, offset, count, data) != 0) {
-               rpc_set_error(nfs->rpc, "RPC error: Failed to send READ call for %s", data->path);
+
+       if (count <= nfs_get_readmax(nfs)) {
+               if (rpc_nfs_read_async(nfs->rpc, nfs_pread_cb, &nfsfh->fh, offset, count, data) != 0) {
+                       rpc_set_error(nfs->rpc, "RPC error: Failed to send READ call for %s", data->path);
+                       data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
+                       free_nfs_cb_data(data);
+                       return -1;
+               }
+               return 0;
+       }
+
+       /* trying to read more than maximum server read size, we has to chop it up into smaller
+        * reads and collect into a reassembly buffer.
+        * we send all reads in parallell so that performance is still good.
+        */
+       data->start_offset = offset;
+
+       data->buffer =  malloc(count);
+       if (data->buffer == NULL) {
+               rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)count);
                data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
                free_nfs_cb_data(data);
                return -1;
        }
-       return 0;
+
+       while (count > 0) {
+               size_t readcount = count;
+               struct nfs_mcb_data *mdata;
+
+               if (readcount > nfs_get_readmax(nfs)) {
+                       readcount = nfs_get_readmax(nfs);
+               }
+
+               mdata = malloc(sizeof(struct nfs_mcb_data));
+               if (mdata == NULL) {
+                       rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_mcb_data structure");
+                       printf("failed to allocate memory for nfs mcb data\n");
+                       return -1;
+               }
+               bzero(mdata, sizeof(struct nfs_mcb_data));
+               mdata->data   = data;
+               mdata->offset = offset;
+               mdata->count  = readcount;
+
+               if (rpc_nfs_read_async(nfs->rpc, nfs_pread_mcb, &nfsfh->fh, offset, readcount, mdata) != 0) {
+                       rpc_set_error(nfs->rpc, "RPC error: Failed to send READ call for %s", data->path);
+                       data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
+                       free(mdata);
+                       return -1;
+               }
+
+               count               -= readcount;
+               offset              += readcount;
+               data->num_calls++;
+        }
+
+        return 0;
 }
 
 /*
@@ -896,7 +1020,6 @@ int nfs_pwrite_async(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset,
        if (data == NULL) {
                rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
                printf("failed to allocate memory for nfs cb data\n");
-               free_nfs_cb_data(data);
                return -1;
        }
        bzero(data, sizeof(struct nfs_cb_data));
@@ -957,7 +1080,6 @@ int nfs_fstat_async(struct nfs_context *nfs, struct nfsfh *nfsfh, nfs_cb cb, voi
        if (data == NULL) {
                rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
                printf("failed to allocate memory for nfs cb data\n");
-               free_nfs_cb_data(data);
                return -1;
        }
        bzero(data, sizeof(struct nfs_cb_data));
@@ -1016,7 +1138,6 @@ int nfs_fsync_async(struct nfs_context *nfs, struct nfsfh *nfsfh, nfs_cb cb, voi
        if (data == NULL) {
                rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
                printf("failed to allocate memory for nfs cb data\n");
-               free_nfs_cb_data(data);
                return -1;
        }
        bzero(data, sizeof(struct nfs_cb_data));
@@ -1077,7 +1198,6 @@ int nfs_ftruncate_async(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t leng
        if (data == NULL) {
                rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_cb_data structure");
                printf("failed to allocate memory for nfs cb data\n");
-               free_nfs_cb_data(data);
                return -1;
        }
        bzero(data, sizeof(struct nfs_cb_data));