#define RPC_CONTEXT_MAGIC 0xc6e46435
#define RPC_PARAM_UNDEFINED -1
+/*
+ * Queue is singly-linked but we hold on to the tail
+ */
+struct rpc_queue {
+ struct rpc_pdu *head, *tail;
+};
+
struct rpc_context {
uint32_t magic;
int fd;
char *encodebuf;
int encodebuflen;
- struct rpc_pdu *outqueue;
+ struct rpc_queue outqueue;
struct sockaddr_storage udp_src;
- struct rpc_pdu *waitpdu;
+ struct rpc_queue waitpdu;
uint32_t inpos;
uint32_t insize;
uint32_t zdr_decode_bufsize;
};
+void rpc_reset_queue(struct rpc_queue *q);
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu);
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu);
+
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize);
void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
int rpc_queue_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
rpc->uid = getuid();
rpc->gid = getgid();
#endif
+ rpc_reset_queue(&rpc->outqueue);
+ rpc_reset_queue(&rpc->waitpdu);
return rpc;
}
void rpc_error_all_pdus(struct rpc_context *rpc, char *error)
{
- struct rpc_pdu *pdu;
+ struct rpc_pdu *pdu, *next;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- while((pdu = rpc->outqueue) != NULL) {
+ while ((pdu = rpc->outqueue.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
- SLIST_REMOVE(&rpc->outqueue, pdu);
+ rpc->outqueue.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
- while((pdu = rpc->waitpdu) != NULL) {
+ rpc->outqueue.tail = NULL;
+
+ while((pdu = rpc->waitpdu.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
- SLIST_REMOVE(&rpc->waitpdu, pdu);
+ rpc->waitpdu.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
+ rpc->waitpdu.tail = NULL;
}
static void rpc_free_fragment(struct rpc_fragment *fragment)
while (rpc->fragments != NULL) {
struct rpc_fragment *fragment = rpc->fragments;
- SLIST_REMOVE(&rpc->fragments, fragment);
+ rpc->fragments = fragment->next;
rpc_free_fragment(fragment);
}
}
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- while((pdu = rpc->outqueue) != NULL) {
+ while((pdu = rpc->outqueue.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
- SLIST_REMOVE(&rpc->outqueue, pdu);
+ rpc->outqueue.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
- while((pdu = rpc->waitpdu) != NULL) {
+ while((pdu = rpc->waitpdu.head) != NULL) {
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
- SLIST_REMOVE(&rpc->waitpdu, pdu);
+ rpc->outqueue.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
#include "libnfs-raw.h"
#include "libnfs-private.h"
+void rpc_reset_queue(struct rpc_queue *q)
+{
+ q->head = NULL;
+ q->tail = NULL;
+}
+
+/*
+ * Push to the tail end of the queue
+ */
+void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+ if (q->head == NULL)
+ q->head = pdu;
+ else
+ q->tail->next = pdu;
+ q->tail = pdu;
+}
+
+/*
+ * Push to the front/head of the queue
+ */
+void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu)
+{
+ pdu->next = q->head;
+ q->head = pdu;
+ if (q->tail == NULL)
+ q->tail = pdu;
+}
+
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
{
struct rpc_pdu *pdu;
rpc_free_pdu(rpc, pdu);
return -1;
}
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+ rpc_enqueue(&rpc->waitpdu, pdu);
return 0;
}
}
memcpy(pdu->outdata.data, rpc->encodebuf, pdu->outdata.size);
- SLIST_ADD_END(&rpc->outqueue, pdu);
+ rpc_enqueue(&rpc->outqueue, pdu);
return 0;
}
int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
{
- struct rpc_pdu *pdu;
+ struct rpc_pdu *pdu, *prev_pdu;
ZDR zdr;
int pos, recordmarker = 0;
uint32_t xid;
}
zdr_setpos(&zdr, pos);
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
+ /* Linear traverse singly-linked list, but track previous
+ * entry for optimised removal */
+ prev_pdu = NULL;
+ for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
if (pdu->xid != xid) {
+ prev_pdu = pdu;
continue;
}
if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
+ /* Singly-linked but we track head and tail */
+ if (pdu == rpc->waitpdu.head)
+ rpc->waitpdu.head = pdu->next;
+ if (pdu == rpc->waitpdu.tail)
+ rpc->waitpdu.tail = prev_pdu;
+ if (prev_pdu != NULL)
+ prev_pdu->next = pdu->next;
}
if (rpc_process_reply(rpc, pdu, &zdr) != 0) {
rpc_set_error(rpc, "rpc_procdess_reply failed");
return rpc->fd;
}
+static int rpc_has_queue(struct rpc_queue *q)
+{
+ return q->head != NULL;
+}
+
int rpc_which_events(struct rpc_context *rpc)
{
int events;
return POLLIN;
}
- if (rpc->outqueue) {
+ if (rpc_has_queue(&rpc->outqueue)) {
events |= POLLOUT;
}
return events;
static int rpc_write_to_socket(struct rpc_context *rpc)
{
int32_t count;
+ struct rpc_pdu *pdu;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
return -1;
}
- while (rpc->outqueue != NULL) {
+ while ((pdu = rpc->outqueue.head) != NULL) {
int64_t total;
- total = rpc->outqueue->outdata.size;
+ total = pdu->outdata.size;
- count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+ count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
return -1;
}
- rpc->outqueue->written += count;
- if (rpc->outqueue->written == total) {
- struct rpc_pdu *pdu = rpc->outqueue;
-
- SLIST_REMOVE(&rpc->outqueue, pdu);
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+ pdu->written += count;
+ if (pdu->written == total) {
+ rpc->outqueue.head = pdu->next;
+ if (pdu->next == NULL)
+ rpc->outqueue.tail = NULL;
+ rpc_enqueue(&rpc->waitpdu, pdu);
}
}
return 0;
}
}
- if (revents & POLLOUT && rpc->outqueue != NULL) {
+ if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
if (rpc_write_to_socket(rpc) != 0) {
rpc_set_error(rpc, "write to socket failed");
return -1;
/* socket is closed so we will not get any replies to any commands
* in flight. Move them all over from the waitpdu queue back to the out queue
*/
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
- SLIST_ADD(&rpc->outqueue, pdu);
+ for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
+ rpc_return_to_queue(&rpc->outqueue, pdu);
/* we have to re-send the whole pdu again */
pdu->written = 0;
}
+ rpc_reset_queue(&rpc->waitpdu);
if (rpc->auto_reconnect != 0) {
rpc->connect_cb = reconnect_cb;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+ for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
i++;
}
- for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
+ for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {
i++;
}
return i;