From 3ca2aac9a4aceac602684847f756428439bf814d Mon Sep 17 00:00:00 2001 From: Peter Lieven Date: Mon, 23 Jun 2014 16:31:32 +0200 Subject: [PATCH] add readahead support This patch add support for an internal readahead machanism. The maximum readahead size can be specified via URL parameter readahead. This should significantly speed up small sequential reads. Signed-off-by: Peter Lieven --- include/libnfs-private.h | 3 + include/nfsc/libnfs.h | 4 ++ lib/init.c | 6 ++ lib/libnfs-win32.def | 1 + lib/libnfs.c | 149 ++++++++++++++++++++++++++++++++------- 5 files changed, 139 insertions(+), 24 deletions(-) diff --git a/include/libnfs-private.h b/include/libnfs-private.h index 2e4ccc0..ef3b9de 100644 --- a/include/libnfs-private.h +++ b/include/libnfs-private.h @@ -73,6 +73,7 @@ struct rpc_queue { }; #define HASHES 1024 +#define NFS_RA_TIMEOUT 5 struct rpc_context { uint32_t magic; @@ -115,6 +116,7 @@ struct rpc_context { int tcp_syncnt; int uid; int gid; + uint32_t readahead; }; struct rpc_pdu { @@ -174,6 +176,7 @@ void rpc_unset_autoreconnect(struct rpc_context *rpc); void rpc_set_tcp_syncnt(struct rpc_context *rpc, int v); void rpc_set_uid(struct rpc_context *rpc, int uid); void rpc_set_gid(struct rpc_context *rpc, int gid); +void rpc_set_readahead(struct rpc_context *rpc, uint32_t v); int rpc_add_fragment(struct rpc_context *rpc, char *data, uint64_t size); void rpc_free_all_fragments(struct rpc_context *rpc); diff --git a/include/nfsc/libnfs.h b/include/nfsc/libnfs.h index d960f7f..f2d8cb0 100644 --- a/include/nfsc/libnfs.h +++ b/include/nfsc/libnfs.h @@ -36,6 +36,9 @@ extern "C" { #endif +#define LIBNFS_FEATURE_READAHEAD +#define NFS_BLKSIZE 4096 + struct nfs_context; struct rpc_context; @@ -184,6 +187,7 @@ EXTERN uint64_t nfs_get_writemax(struct nfs_context *nfs); EXTERN void nfs_set_tcp_syncnt(struct nfs_context *nfs, int v); EXTERN void nfs_set_uid(struct nfs_context *nfs, int uid); EXTERN void nfs_set_gid(struct nfs_context *nfs, int gid); +EXTERN void nfs_set_readahead(struct nfs_context *nfs, uint32_t v); /* * MOUNT THE EXPORT diff --git a/lib/init.c b/lib/init.c index 782a7ea..99ec159 100644 --- a/lib/init.c +++ b/lib/init.c @@ -91,6 +91,12 @@ struct rpc_context *rpc_init_context(void) return rpc; } +void rpc_set_readahead(struct rpc_context *rpc, uint32_t v) +{ + assert(rpc->magic == RPC_CONTEXT_MAGIC); + + rpc->readahead = v; +} struct rpc_context *rpc_init_udp_context(void) { diff --git a/lib/libnfs-win32.def b/lib/libnfs-win32.def index 72d6be6..5a0df03 100644 --- a/lib/libnfs-win32.def +++ b/lib/libnfs-win32.def @@ -70,6 +70,7 @@ nfs_set_auth nfs_set_gid nfs_set_tcp_syncnt nfs_set_uid +nfs_set_readahead nfs_stat nfs_stat_async nfs_stat64 diff --git a/lib/libnfs.c b/lib/libnfs.c index 59b30fd..d54a542 100644 --- a/lib/libnfs.c +++ b/lib/libnfs.c @@ -81,11 +81,22 @@ struct nfsdir { struct nfsdirent *current; }; +struct nfs_readahead { + uint64_t fh_offset; + uint64_t last_offset; + uint64_t buf_offset; + uint64_t buf_count; + time_t buf_ts; + void *buf; + uint32_t cur_ra; +}; + struct nfsfh { struct nfs_fh3 fh; int is_sync; int is_append; uint64_t offset; + struct nfs_readahead ra; }; struct nfs_context { @@ -134,9 +145,8 @@ struct nfs_cb_data { int cancel; int oom; int num_calls; - uint64_t start_offset, max_offset, count; + uint64_t offset, count, max_offset, org_offset, org_count; char *buffer; - size_t request_size; char *usrbuf; }; @@ -181,12 +191,14 @@ char *nfs_get_error(struct nfs_context *nfs) static int nfs_set_context_args(struct nfs_context *nfs, char *arg, char *val) { - if (!strncmp(arg, "tcp-syncnt", 10)) { + if (!strcmp(arg, "tcp-syncnt")) { rpc_set_tcp_syncnt(nfs_get_rpc_context(nfs), atoi(val)); - } else if (!strncmp(arg, "uid", 3)) { + } else if (!strcmp(arg, "uid")) { rpc_set_uid(nfs_get_rpc_context(nfs), atoi(val)); - } else if (!strncmp(arg, "gid", 3)) { + } else if (!strcmp(arg, "gid")) { rpc_set_gid(nfs_get_rpc_context(nfs), atoi(val)); + } else if (!strcmp(arg, "readahaed")) { + rpc_set_readahead(nfs_get_rpc_context(nfs), atoi(val)); } return 0; } @@ -1148,8 +1160,8 @@ static void nfs_stat_1_cb(struct rpc_context *rpc, int status, void *command_dat st.st_rdev = 0; st.st_size = res->GETATTR3res_u.resok.obj_attributes.size; #ifndef WIN32 - st.st_blksize = 4096; - st.st_blocks = res->GETATTR3res_u.resok.obj_attributes.size / 4096; + st.st_blksize = NFS_BLKSIZE; + st.st_blocks = res->GETATTR3res_u.resok.obj_attributes.size / NFS_BLKSIZE; #endif//WIN32 st.st_atime = res->GETATTR3res_u.resok.obj_attributes.atime.seconds; st.st_mtime = res->GETATTR3res_u.resok.obj_attributes.mtime.seconds; @@ -1532,9 +1544,9 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat /* if we have more than one call or we have received a short read we need a reassembly buffer */ if (data->num_calls || (count < mdata->count && !res->READ3res_u.resok.eof)) { if (data->buffer == NULL) { - data->buffer = malloc(data->request_size); + data->buffer = malloc(data->count); if (data->buffer == NULL) { - rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)data->request_size); + rpc_set_error(nfs->rpc, "Out-Of-Memory: Failed to allocate reassembly buffer for %d bytes", (int)data->count); data->oom = 1; } } @@ -1543,7 +1555,7 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat if (count <= mdata->count) { /* copy data into reassembly buffer if we have one */ if (data->buffer != NULL) { - memcpy(&data->buffer[mdata->offset - data->start_offset], res->READ3res_u.resok.data.data_val, count); + memcpy(&data->buffer[mdata->offset - data->offset], res->READ3res_u.resok.data.data_val, count); } if (data->max_offset < mdata->offset + count) { data->max_offset = mdata->offset + count; @@ -1599,14 +1611,35 @@ static void nfs_pread_mcb(struct rpc_context *rpc, int status, void *command_dat } if (data->buffer) { - data->cb(data->max_offset - data->start_offset, nfs, data->buffer, data->private_data); + if (data->max_offset > data->org_offset + data->org_count) { + data->max_offset = data->org_offset + data->org_count; + } + data->cb(data->max_offset - data->org_offset, nfs, data->buffer + (data->org_offset - data->offset), data->private_data); } else { data->cb(res->READ3res_u.resok.count, nfs, res->READ3res_u.resok.data.data_val, data->private_data); } + data->nfsfh->ra.fh_offset = data->max_offset; + if (data->nfsfh->ra.cur_ra) { + free(data->nfsfh->ra.buf); + data->nfsfh->ra.buf = data->buffer; + data->nfsfh->ra.buf_offset = data->offset; + data->nfsfh->ra.buf_count = data->count; + data->nfsfh->ra.buf_ts = time(NULL); + data->buffer = NULL; + } free_nfs_cb_data(data); } +static void nfs_ra_invalidate(struct nfsfh *nfsfh) { + free(nfsfh->ra.buf); + nfsfh->ra.buf = NULL; + nfsfh->ra.buf_offset = 0; + nfsfh->ra.buf_count = 0; + nfsfh->ra.buf_ts = time(NULL); + nfsfh->ra.cur_ra = NFS_BLKSIZE; +} + static int nfs_pread_async_internal(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t offset, uint64_t count, nfs_cb cb, void *private_data, int update_pos) { struct nfs_cb_data *data; @@ -1621,16 +1654,78 @@ static int nfs_pread_async_internal(struct nfs_context *nfs, struct nfsfh *nfsfh data->cb = cb; data->private_data = private_data; data->nfsfh = nfsfh; - data->request_size = count; + data->org_offset = offset; + data->org_count = count; assert(data->num_calls == 0); + if (nfs->rpc->readahead && time(NULL) - nfsfh->ra.buf_ts > NFS_RA_TIMEOUT) { + /* readahead cache timeout */ + nfs_ra_invalidate(nfsfh); + } + + if (nfs->rpc->readahead) { + if (offset >= nfsfh->ra.last_offset && + offset - NFS_BLKSIZE <= nfsfh->ra.fh_offset + nfsfh->ra.cur_ra) { + if (nfs->rpc->readahead > nfsfh->ra.cur_ra) { + nfsfh->ra.cur_ra <<= 1; + } + } else { + nfsfh->ra.cur_ra = NFS_BLKSIZE; + } + + nfsfh->ra.last_offset = offset; + + if (nfsfh->ra.buf_offset <= offset && + nfsfh->ra.buf_offset + nfsfh->ra.buf_count >= offset + count) { + /* serve request completely from cache */ + data->buffer = malloc(count); + if (data->buffer == NULL) { + free_nfs_cb_data(data); + return -ENOMEM; + } + memcpy(data->buffer, nfsfh->ra.buf + (offset - nfsfh->ra.buf_offset), count); + data->cb(count, nfs, data->buffer, data->private_data); + nfsfh->ra.fh_offset = offset + count; + free_nfs_cb_data(data); + return 0; + } + + /* align start offset to blocksize */ + count += offset & (NFS_BLKSIZE - 1); + offset &= ~(NFS_BLKSIZE - 1); + + /* align end offset to blocksize and add readahead */ + count += nfsfh->ra.cur_ra - 1; + count &= ~(NFS_BLKSIZE - 1); + + data->buffer = malloc(count); + if (data->buffer == NULL) { + free_nfs_cb_data(data); + return -ENOMEM; + } + data->offset = offset; + data->count = count; + + if (nfsfh->ra.buf_count && nfsfh->ra.buf_offset <= offset && + nfsfh->ra.buf_offset + nfsfh->ra.buf_count >= offset) { + /* serve request partially from cache */ + size_t overlap = (nfsfh->ra.buf_offset + nfsfh->ra.buf_count) - offset; + if (overlap > count) count = overlap; + memcpy(data->buffer, nfsfh->ra.buf + (offset - nfsfh->ra.buf_offset), overlap); + offset += overlap; + count -= overlap; + } + } else { + data->offset = offset; + data->count = count; + } + + data->max_offset = offset; + /* chop requests into chunks of at most READMAX bytes if necessary. * we send all reads in parallel so that performance is still good. */ - data->max_offset = offset; - data->start_offset = offset; - do { uint64_t readcount = count; struct nfs_mcb_data *mdata; @@ -1749,7 +1844,7 @@ static void nfs_pwrite_mcb(struct rpc_context *rpc, int status, void *command_da mdata->count -= count; nfs_fill_WRITE3args(&args, data->nfsfh, mdata->offset, mdata->count, - &data->usrbuf[mdata->offset - data->start_offset]); + &data->usrbuf[mdata->offset - data->offset]); if (rpc_nfs3_write_async(nfs->rpc, nfs_pwrite_mcb, &args, mdata) == 0) { data->num_calls++; return; @@ -1789,7 +1884,7 @@ static void nfs_pwrite_mcb(struct rpc_context *rpc, int status, void *command_da return; } - data->cb(data->max_offset - data->start_offset, nfs, NULL, data->private_data); + data->cb(data->max_offset - data->offset, nfs, NULL, data->private_data); free_nfs_cb_data(data); } @@ -1818,7 +1913,7 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf * we send all writes in parallel so that performance is still good. */ data->max_offset = offset; - data->start_offset = offset; + data->offset = offset; do { uint64_t writecount = count; @@ -1845,7 +1940,7 @@ static int nfs_pwrite_async_internal(struct nfs_context *nfs, struct nfsfh *nfsf mdata->count = writecount; mdata->update_pos = update_pos; - nfs_fill_WRITE3args(&args, nfsfh, offset, writecount, &buf[offset - data->start_offset]); + nfs_fill_WRITE3args(&args, nfsfh, offset, writecount, &buf[offset - data->offset]); if (rpc_nfs3_write_async(nfs->rpc, nfs_pwrite_mcb, &args, mdata) != 0) { rpc_set_error(nfs->rpc, "RPC error: Failed to send WRITE call for %s", data->path); @@ -1911,6 +2006,7 @@ static void nfs_write_append_cb(struct rpc_context *rpc, int status, void *comma int nfs_write_async(struct nfs_context *nfs, struct nfsfh *nfsfh, uint64_t count, char *buf, nfs_cb cb, void *private_data) { + nfs_ra_invalidate(nfsfh); if (nfsfh->is_append) { struct GETATTR3args args; struct nfs_cb_data *data; @@ -1954,6 +2050,7 @@ int nfs_close_async(struct nfs_context *nfs, struct nfsfh *nfsfh, nfs_cb cb, voi free(nfsfh->fh.data.data_val); nfsfh->fh.data.data_val = NULL; } + free(nfsfh->ra.buf); free(nfsfh); cb(0, nfs, NULL, private_data); @@ -3249,11 +3346,11 @@ static void nfs_statvfs_1_cb(struct rpc_context *rpc, int status, void *command_ return; } - svfs.f_bsize = 4096; - svfs.f_frsize = 4096; - svfs.f_blocks = res->FSSTAT3res_u.resok.tbytes/4096; - svfs.f_bfree = res->FSSTAT3res_u.resok.fbytes/4096; - svfs.f_bavail = res->FSSTAT3res_u.resok.abytes/4096; + svfs.f_bsize = NFS_BLKSIZE; + svfs.f_frsize = NFS_BLKSIZE; + svfs.f_blocks = res->FSSTAT3res_u.resok.tbytes/NFS_BLKSIZE; + svfs.f_bfree = res->FSSTAT3res_u.resok.fbytes/NFS_BLKSIZE; + svfs.f_bavail = res->FSSTAT3res_u.resok.abytes/NFS_BLKSIZE; svfs.f_files = res->FSSTAT3res_u.resok.tfiles; svfs.f_ffree = res->FSSTAT3res_u.resok.ffiles; #if !defined(ANDROID) @@ -4291,6 +4388,10 @@ void nfs_set_gid(struct nfs_context *nfs, int gid) { rpc_set_gid(nfs->rpc, gid); } +void nfs_set_readahead(struct nfs_context *nfs, uint32_t v) { + rpc_set_readahead(nfs->rpc, v); +} + void nfs_set_error(struct nfs_context *nfs, char *error_string, ...) { va_list ap; -- 2.34.1