When making many concurrent requests (as is likely in any performance
criticial application), the use of SLIST_REMOVE and SLIST_ADD_END are
a severe bottleneck because of their linear search.
I considered using a double-linked list but it was unnecessary to
allocate the additional memory for each list entry.
Instead, continue to use a single-linked list but retain:
* a pointer to the end of the list; and
* a pointer to the previous entry during a linear search.
The former would makes append operations O(1) time, and the latter
does the same for removal. We can do this because removal only happens
within the linear search, and there is no random access to the queue.
#define RPC_CONTEXT_MAGIC 0xc6e46435
#define RPC_PARAM_UNDEFINED -1
#define RPC_CONTEXT_MAGIC 0xc6e46435
#define RPC_PARAM_UNDEFINED -1
+/*
+ * Queue is singly-linked but we hold on to the tail
+ */
+struct rpc_queue {
+ struct rpc_pdu *head, *tail;
+};
+
struct rpc_context {
uint32_t magic;
int fd;
struct rpc_context {
uint32_t magic;
int fd;
char *encodebuf;
int encodebuflen;
char *encodebuf;
int encodebuflen;
- struct rpc_pdu *outqueue;
+ struct rpc_queue outqueue;
struct sockaddr_storage udp_src;
struct sockaddr_storage udp_src;
- struct rpc_pdu *waitpdu;
+ struct rpc_queue waitpdu;
uint32_t inpos;
uint32_t insize;
uint32_t inpos;
uint32_t insize;
uint32_t zdr_decode_bufsize;
};
uint32_t zdr_decode_bufsize;
};
+void rpc_reset_queue(struct rpc_queue *q);
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu);
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu);
+
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize);
void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize);
void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
rpc->uid = getuid();
rpc->gid = getgid();
#endif
rpc->uid = getuid();
rpc->gid = getgid();
#endif
+ rpc_reset_queue(&rpc->outqueue);
+ rpc_reset_queue(&rpc->waitpdu);
void rpc_error_all_pdus(struct rpc_context *rpc, char *error)
{
void rpc_error_all_pdus(struct rpc_context *rpc, char *error)
{
+ struct rpc_pdu *pdu, *next;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- while((pdu = rpc->outqueue) != NULL) {
+ while ((pdu = rpc->outqueue.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
- SLIST_REMOVE(&rpc->outqueue, pdu);
+ rpc->outqueue.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
rpc_free_pdu(rpc, pdu);
}
- while((pdu = rpc->waitpdu) != NULL) {
+ rpc->outqueue.tail = NULL;
+
+ while((pdu = rpc->waitpdu.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
- SLIST_REMOVE(&rpc->waitpdu, pdu);
+ rpc->waitpdu.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
rpc_free_pdu(rpc, pdu);
}
+ rpc->waitpdu.tail = NULL;
}
static void rpc_free_fragment(struct rpc_fragment *fragment)
}
static void rpc_free_fragment(struct rpc_fragment *fragment)
while (rpc->fragments != NULL) {
struct rpc_fragment *fragment = rpc->fragments;
while (rpc->fragments != NULL) {
struct rpc_fragment *fragment = rpc->fragments;
- SLIST_REMOVE(&rpc->fragments, fragment);
+ rpc->fragments = fragment->next;
rpc_free_fragment(fragment);
}
}
rpc_free_fragment(fragment);
}
}
assert(rpc->magic == RPC_CONTEXT_MAGIC);
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- while((pdu = rpc->outqueue) != NULL) {
+ while((pdu = rpc->outqueue.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
- SLIST_REMOVE(&rpc->outqueue, pdu);
+ rpc->outqueue.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
rpc_free_pdu(rpc, pdu);
}
- while((pdu = rpc->waitpdu) != NULL) {
+ while((pdu = rpc->waitpdu.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
- SLIST_REMOVE(&rpc->waitpdu, pdu);
+ rpc->outqueue.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
rpc_free_pdu(rpc, pdu);
}
#include "libnfs-raw.h"
#include "libnfs-private.h"
#include "libnfs-raw.h"
#include "libnfs-private.h"
+void rpc_reset_queue(struct rpc_queue *q)
+{
+ q->head = NULL;
+ q->tail = NULL;
+}
+
+/*
+ * Push to the tail end of the queue
+ */
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+ if (q->head == NULL)
+ q->head = pdu;
+ else
+ q->tail->next = pdu;
+ q->tail = pdu;
+}
+
+/*
+ * Push to the front/head of the queue
+ */
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+ pdu->next = q->head;
+ q->head = pdu;
+ if (q->tail == NULL)
+ q->tail = pdu;
+}
+
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
{
struct rpc_pdu *pdu;
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
{
struct rpc_pdu *pdu;
rpc_free_pdu(rpc, pdu);
return -1;
}
rpc_free_pdu(rpc, pdu);
return -1;
}
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+ rpc_enqueue(&rpc->waitpdu, pdu);
}
memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size);
}
memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size);
- SLIST_ADD_END(&rpc->outqueue, pdu);
+ rpc_enqueue(&rpc->outqueue, pdu);
int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
{
int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
{
+ struct rpc_pdu *pdu, *prev_pdu;
ZDR zdr;
int pos, recordmarker = 0;
uint32_t xid;
ZDR zdr;
int pos, recordmarker = 0;
uint32_t xid;
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
+ /* Linear traverse singly-linked list, but track previous
+ * entry for optimised removal */
+ prev_pdu = NULL;
+ for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
continue;
}
if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
continue;
}
if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
+ /* Singly-linked but we track head and tail */
+ if (pdu == rpc->waitpdu.head)
+ rpc->waitpdu.head = pdu->next;
+ if (pdu == rpc->waitpdu.tail)
+ rpc->waitpdu.tail = prev_pdu;
+ if (prev_pdu != NULL)
+ prev_pdu->next = pdu->next;
}
if (rpc_process_reply(rpc, pdu, &zdr) != 0) {
rpc_set_error(rpc, "rpc_procdess_reply failed");
}
if (rpc_process_reply(rpc, pdu, &zdr) != 0) {
rpc_set_error(rpc, "rpc_procdess_reply failed");
+static int rpc_has_queue(struct rpc_queue *q)
+{
+ return q->head != NULL;
+}
+
int rpc_which_events(struct rpc_context *rpc)
{
int events;
int rpc_which_events(struct rpc_context *rpc)
{
int events;
+ if (rpc_has_queue(&rpc->outqueue)) {
events |= POLLOUT;
}
return events;
events |= POLLOUT;
}
return events;
static int rpc_write_to_socket(struct rpc_context *rpc)
{
int32_t count;
static int rpc_write_to_socket(struct rpc_context *rpc)
{
int32_t count;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- while (rpc->outqueue != NULL) {
+ while ((pdu = rpc->outqueue.head) != NULL) {
- total = rpc->outqueue->outdata.size;
+ total = pdu->outdata.size;
- count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+ count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
- rpc->outqueue->written += count;
- if (rpc->outqueue->written == total) {
- struct rpc_pdu *pdu = rpc->outqueue;
-
- SLIST_REMOVE(&rpc->outqueue, pdu);
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+ pdu->written += count;
+ if (pdu->written == total) {
+ rpc->outqueue.head = pdu->next;
+ if (pdu->next == NULL)
+ rpc->outqueue.tail = NULL;
+ rpc_enqueue(&rpc->waitpdu, pdu);
- if (revents & POLLOUT && rpc->outqueue != NULL) {
+ if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
if (rpc_write_to_socket(rpc) != 0) {
rpc_set_error(rpc, "write to socket failed");
return -1;
if (rpc_write_to_socket(rpc) != 0) {
rpc_set_error(rpc, "write to socket failed");
return -1;
/* socket is closed so we will not get any replies to any commands
* in flight. Move them all over from the waitpdu queue back to the out queue
*/
/* socket is closed so we will not get any replies to any commands
* in flight. Move them all over from the waitpdu queue back to the out queue
*/
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
- SLIST_ADD(&rpc->outqueue, pdu);
+ for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
+ rpc_return_to_queue(&rpc->outqueue, pdu);
/* we have to re-send the whole pdu again */
pdu->written = 0;
}
/* we have to re-send the whole pdu again */
pdu->written = 0;
}
+ rpc_reset_queue(&rpc->waitpdu);
if (rpc->auto_reconnect != 0) {
rpc->connect_cb = reconnect_cb;
if (rpc->auto_reconnect != 0) {
rpc->connect_cb = reconnect_cb;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+ for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
- for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
+ for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {