Large-Writes: just like for large-reads, chop large writes up into the smallest
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Sun, 11 Sep 2011 12:13:28 +0000 (22:13 +1000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Sun, 11 Sep 2011 12:13:28 +0000 (22:13 +1000)
of 32kb and what the server responded as the max write size and send them out in parallell to the server.

32kb is a common limitation in XDR libraries so even if the server says IT can handle writes larget than that, our XDR library might not.

lib/libnfs.c

index 8c408b2cd3394a63f6c5af13664f36f3fbd3f55f..ba7b64893f4ede24fc3b6be4f4eab5774dac09fc 100644 (file)
@@ -1042,6 +1042,65 @@ static void nfs_pwrite_cb(struct rpc_context *rpc _U_, int status, void *command
        free_nfs_cb_data(data);
 }
 
+static void nfs_pwrite_mcb(struct rpc_context *rpc _U_, int status, void *command_data, void *private_data)
+{
+       struct nfs_mcb_data *mdata = private_data;
+       struct nfs_cb_data *data = mdata->data;
+       struct nfs_context *nfs = data->nfs;
+       WRITE3res *res;
+
+       data->num_calls--;
+
+       if (status == RPC_STATUS_ERROR) {
+               /* flag the failure but do not invoke callback until we have received all responses */
+               data->error = 1;
+       }
+       if (status == RPC_STATUS_CANCEL) {
+               /* flag the cancellation but do not invoke callback until we have received all responses */
+               data->cancel = 1;
+       }
+
+       if (status == RPC_STATUS_SUCCESS) {
+               res = command_data;
+               if (res->status != NFS3_OK) {
+                       rpc_set_error(nfs->rpc, "NFS: Write failed with %s(%d)", nfsstat3_to_str(res->status), nfsstat3_to_errno(res->status));
+                       data->error = 1;
+               } else  {
+                       if (res->WRITE3res_u.resok.count > 0) {
+                               if ((unsigned)data->max_offset < mdata->offset + res->WRITE3res_u.resok.count) {
+                                       data->max_offset = mdata->offset + res->WRITE3res_u.resok.count;
+                               }
+                       }
+               }
+       }
+
+       if (data->num_calls > 0) {
+               /* still waiting for more replies */
+               free(mdata);
+               return;
+       }
+
+       if (data->error != 0) {
+               data->cb(-EFAULT, nfs, command_data, data->private_data);
+               free_nfs_cb_data(data);
+               free(mdata);
+               return;
+       }
+       if (data->cancel != 0) {
+               data->cb(-EINTR, nfs, "Command was cancelled", data->private_data);
+               free_nfs_cb_data(data);
+               free(mdata);
+               return;
+       }
+
+       data->nfsfh->offset = data->max_offset;
+       data->cb(data->max_offset - data->start_offset, nfs, NULL, data->private_data);
+
+       free_nfs_cb_data(data);
+       free(mdata);
+}
+
+
 int nfs_pwrite_async(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset, size_t count, char *buf, nfs_cb cb, void *private_data)
 {
        struct nfs_cb_data *data;
@@ -1058,12 +1117,54 @@ int nfs_pwrite_async(struct nfs_context *nfs, struct nfsfh *nfsfh, off_t offset,
        data->nfsfh        = nfsfh;
 
        nfsfh->offset = offset;
-       if (rpc_nfs_write_async(nfs->rpc, nfs_pwrite_cb, &nfsfh->fh, buf, offset, count, nfsfh->is_sync?FILE_SYNC:UNSTABLE, data) != 0) {
-               rpc_set_error(nfs->rpc, "RPC error: Failed to send WRITE call for %s", data->path);
-               data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
-               free_nfs_cb_data(data);
-               return -1;
+
+       if (count <= nfs_get_writemax(nfs)) {
+               if (rpc_nfs_write_async(nfs->rpc, nfs_pwrite_cb, &nfsfh->fh, buf, offset, count, nfsfh->is_sync?FILE_SYNC:UNSTABLE, data) != 0) {
+                       rpc_set_error(nfs->rpc, "RPC error: Failed to send WRITE call for %s", data->path);
+                       data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
+                       free_nfs_cb_data(data);
+                       return -1;
+               }
+               return 0;
        }
+
+       /* trying to write more than maximum server write size, we has to chop it up into smaller
+        * chunks.
+        * we send all writes in parallell so that performance is still good.
+        */
+       data->max_offset = offset;
+       data->start_offset = offset;
+
+       while (count > 0) {
+               size_t writecount = count;
+               struct nfs_mcb_data *mdata;
+
+               if (writecount > nfs_get_writemax(nfs)) {
+                       writecount = nfs_get_writemax(nfs);
+               }
+
+               mdata = malloc(sizeof(struct nfs_mcb_data));
+               if (mdata == NULL) {
+                       rpc_set_error(nfs->rpc, "out of memory: failed to allocate nfs_mcb_data structure");
+                       return -1;
+               }
+               memset(mdata, 0, sizeof(struct nfs_mcb_data));
+               mdata->data   = data;
+               mdata->offset = offset;
+               mdata->count  = writecount;
+
+               if (rpc_nfs_write_async(nfs->rpc, nfs_pwrite_mcb, &nfsfh->fh, &buf[offset - data->start_offset], offset, writecount, nfsfh->is_sync?FILE_SYNC:UNSTABLE, mdata) != 0) {
+                       rpc_set_error(nfs->rpc, "RPC error: Failed to send WRITE call for %s", data->path);
+                       data->cb(-ENOMEM, nfs, rpc_get_error(nfs->rpc), data->private_data);
+                       free(mdata);
+                       return -1;
+               }
+
+               count               -= writecount;
+               offset              += writecount;
+               data->num_calls++;
+       }
+
        return 0;
 }
 
@@ -2910,7 +3011,11 @@ size_t nfs_get_readmax(struct nfs_context *nfs)
  */
 size_t nfs_get_writemax(struct nfs_context *nfs)
 {
-       return nfs->writemax;
+       /* Some XDR libraries can not marshall PDUs bigger than this */
+        if (nfs->writemax < 32768) {
+               return nfs->writemax;
+       }
+       return 32768;
 }
 
 void nfs_set_error(struct nfs_context *nfs, char *error_string, ...)