struct rpc_pdu *head, *tail;
};
+#define HASHES 1024
+
struct rpc_context {
uint32_t magic;
int fd;
struct rpc_queue outqueue;
struct sockaddr_storage udp_src;
- struct rpc_queue waitpdu;
+ struct rpc_queue waitpdu[HASHES];
uint32_t inpos;
uint32_t insize;
void rpc_reset_queue(struct rpc_queue *q);
void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu);
void rpc_return_to_queue(struct rpc_queue *q, struct rpc_pdu *pdu);
+unsigned int rpc_hash_xid(uint32_t xid);
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_bufsize);
void rpc_free_pdu(struct rpc_context *rpc, struct rpc_pdu *pdu);
{
struct rpc_context *rpc;
static uint32_t salt = 0;
+ unsigned int i;
rpc = malloc(sizeof(struct rpc_context));
if (rpc == NULL) {
rpc->gid = getgid();
#endif
rpc_reset_queue(&rpc->outqueue);
- rpc_reset_queue(&rpc->waitpdu);
+ for (i = 0; i < HASHES; i++)
+ rpc_reset_queue(&rpc->waitpdu[i]);
return rpc;
}
void rpc_error_all_pdus(struct rpc_context *rpc, char *error)
{
struct rpc_pdu *pdu, *next;
+ unsigned int i;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
}
rpc->outqueue.tail = NULL;
- while((pdu = rpc->waitpdu.head) != NULL) {
- pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
- rpc->waitpdu.head = pdu->next;
- rpc_free_pdu(rpc, pdu);
+ for (i = 0; i < HASHES; i++) {
+ struct rpc_queue *q = &rpc->waitpdu[i];
+
+ while((pdu = q->head) != NULL) {
+ pdu->cb(rpc, RPC_STATUS_ERROR, error, pdu->private_data);
+ q->head = pdu->next;
+ rpc_free_pdu(rpc, pdu);
+ }
+ q->tail = NULL;
}
- rpc->waitpdu.tail = NULL;
}
static void rpc_free_fragment(struct rpc_fragment *fragment)
void rpc_destroy_context(struct rpc_context *rpc)
{
struct rpc_pdu *pdu;
+ unsigned int i;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
rpc->outqueue.head = pdu->next;
rpc_free_pdu(rpc, pdu);
}
- while((pdu = rpc->waitpdu.head) != NULL) {
- pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
- rpc->outqueue.head = pdu->next;
- rpc_free_pdu(rpc, pdu);
+
+ for (i = 0; i < HASHES; i++) {
+ struct rpc_queue *q = &rpc->waitpdu[i];
+
+ while((pdu = q->head) != NULL) {
+ pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
+ rpc->outqueue.head = pdu->next;
+ rpc_free_pdu(rpc, pdu);
+ }
}
rpc_free_all_fragments(rpc);
q->tail = pdu;
}
+unsigned int rpc_hash_xid(uint32_t xid)
+{
+ return (xid * 7919) % HASHES;
+}
+
struct rpc_pdu *rpc_allocate_pdu(struct rpc_context *rpc, int program, int version, int procedure, rpc_cb cb, void *private_data, zdrproc_t zdr_decode_fn, int zdr_decode_bufsize)
{
struct rpc_pdu *pdu;
/* for udp we dont queue, we just send it straight away */
if (rpc->is_udp != 0) {
+ unsigned int hash;
+
// XXX add a rpc->udp_dest_sock_size and get rid of sys/socket.h and netinet/in.h
if (sendto(rpc->fd, rpc->encodebuf, size, MSG_DONTWAIT, rpc->udp_dest, sizeof(struct sockaddr_in)) < 0) {
rpc_set_error(rpc, "Sendto failed with errno %s", strerror(errno));
rpc_free_pdu(rpc, pdu);
return -1;
}
- rpc_enqueue(&rpc->waitpdu, pdu);
+
+ hash = rpc_hash_xid(pdu->xid);
+ rpc_enqueue(&rpc->waitpdu[hash], pdu);
return 0;
}
int rpc_process_pdu(struct rpc_context *rpc, char *buf, int size)
{
struct rpc_pdu *pdu, *prev_pdu;
+ struct rpc_queue *q;
ZDR zdr;
int pos, recordmarker = 0;
+ unsigned int hash;
uint32_t xid;
char *reasbuf = NULL;
}
zdr_setpos(&zdr, pos);
- /* Linear traverse singly-linked list, but track previous
- * entry for optimised removal */
+ /* Look up the transaction in a hash table of our requests */
+ hash = rpc_hash_xid(xid);
+ q = &rpc->waitpdu[hash];
+
+ /* Follow the hash chain. Linear traverse singly-linked list,
+ * but track previous entry for optimised removal */
prev_pdu = NULL;
- for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
+ for (pdu=q->head; pdu; pdu=pdu->next) {
if (pdu->xid != xid) {
prev_pdu = pdu;
continue;
}
if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
/* Singly-linked but we track head and tail */
- if (pdu == rpc->waitpdu.head)
- rpc->waitpdu.head = pdu->next;
- if (pdu == rpc->waitpdu.tail)
- rpc->waitpdu.tail = prev_pdu;
+ if (pdu == q->head)
+ q->head = pdu->next;
+ if (pdu == q->tail)
+ q->tail = prev_pdu;
if (prev_pdu != NULL)
prev_pdu->next = pdu->next;
}
pdu->written += count;
if (pdu->written == total) {
+ unsigned int hash;
+
rpc->outqueue.head = pdu->next;
if (pdu->next == NULL)
rpc->outqueue.tail = NULL;
- rpc_enqueue(&rpc->waitpdu, pdu);
+
+ hash = rpc_hash_xid(pdu->xid);
+ rpc_enqueue(&rpc->waitpdu[hash], pdu);
}
}
return 0;
static int rpc_reconnect_requeue(struct rpc_context *rpc)
{
struct rpc_pdu *pdu;
+ unsigned int i;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
/* socket is closed so we will not get any replies to any commands
* in flight. Move them all over from the waitpdu queue back to the out queue
*/
- for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
- rpc_return_to_queue(&rpc->outqueue, pdu);
- /* we have to re-send the whole pdu again */
- pdu->written = 0;
+ for (i = 0; i < HASHES; i++) {
+ struct rpc_queue *q = &rpc->waitpdu[i];
+
+ for (pdu=q->head; pdu; pdu=pdu->next) {
+ rpc_return_to_queue(&rpc->outqueue, pdu);
+ /* we have to re-send the whole pdu again */
+ pdu->written = 0;
+ }
+ rpc_reset_queue(q);
}
- rpc_reset_queue(&rpc->waitpdu);
if (rpc->auto_reconnect != 0) {
rpc->connect_cb = reconnect_cb;
{
int i=0;
struct rpc_pdu *pdu;
+ unsigned int n;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
i++;
}
- for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {
- i++;
+
+ for (n = 0; n < HASHES; n++) {
+ struct rpc_queue *q = &rpc->waitpdu[n];
+
+ for(pdu = q->head; pdu; pdu = pdu->next)
+ i++;
}
return i;
}