return rpc->fd;
}
+static int rpc_has_queue(struct rpc_queue *q)
+{
+ return q->head != NULL;
+}
+
int rpc_which_events(struct rpc_context *rpc)
{
int events;
return POLLIN;
}
- if (rpc->outqueue) {
+ if (rpc_has_queue(&rpc->outqueue)) {
events |= POLLOUT;
}
return events;
static int rpc_write_to_socket(struct rpc_context *rpc)
{
int32_t count;
+ struct rpc_pdu *pdu;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
return -1;
}
- while (rpc->outqueue != NULL) {
+ while ((pdu = rpc->outqueue.head) != NULL) {
int64_t total;
- total = rpc->outqueue->outdata.size;
+ total = pdu->outdata.size;
- count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+ count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
return -1;
}
- rpc->outqueue->written += count;
- if (rpc->outqueue->written == total) {
- struct rpc_pdu *pdu = rpc->outqueue;
+ pdu->written += count;
+ if (pdu->written == total) {
+ unsigned int hash;
+
+ rpc->outqueue.head = pdu->next;
+ if (pdu->next == NULL)
+ rpc->outqueue.tail = NULL;
- SLIST_REMOVE(&rpc->outqueue, pdu);
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+ hash = rpc_hash_xid(pdu->xid);
+ rpc_enqueue(&rpc->waitpdu[hash], pdu);
}
}
return 0;
}
}
- if (revents & POLLOUT && rpc->outqueue != NULL) {
+ if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
if (rpc_write_to_socket(rpc) != 0) {
rpc_set_error(rpc, "write to socket failed");
return -1;
((struct sockaddr_in6 *)&ss)->sin6_port = port;
((struct sockaddr_in6 *)&ss)->sin6_family = AF_INET6;
#ifdef HAVE_SOCKADDR_LEN
- ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr_in);
+ ((struct sockaddr_in6 *)&ss)->sin6_len = sizeof(struct sockaddr6_in);
#endif
break;
}
case AF_INET:
((struct sockaddr_in *)&rpc->s)->sin_family = ai->ai_family;
((struct sockaddr_in *)&rpc->s)->sin_port = htons(port);
+ ((struct sockaddr_in *)&rpc->s)->sin_addr = ((struct sockaddr_in *)(ai->ai_addr))->sin_addr;
#ifdef HAVE_SOCKADDR_LEN
((struct sockaddr_in *)&rpc->s)->sin_len = sizeof(struct sockaddr_in);
#endif
case AF_INET6:
((struct sockaddr_in6 *)&rpc->s)->sin6_family = ai->ai_family;
((struct sockaddr_in6 *)&rpc->s)->sin6_port = htons(port);
+ ((struct sockaddr_in6 *)&rpc->s)->sin6_addr = ((struct sockaddr_in6 *)(ai->ai_addr))->sin6_addr;
#ifdef HAVE_SOCKADDR_LEN
((struct sockaddr_in6 *)&rpc->s)->sin6_len = sizeof(struct sockaddr_in6);
#endif
static int rpc_reconnect_requeue(struct rpc_context *rpc)
{
struct rpc_pdu *pdu;
+ unsigned int i;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
/* socket is closed so we will not get any replies to any commands
* in flight. Move them all over from the waitpdu queue back to the out queue
*/
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
- SLIST_ADD(&rpc->outqueue, pdu);
- /* we have to re-send the whole pdu again */
- pdu->written = 0;
+ for (i = 0; i < HASHES; i++) {
+ struct rpc_queue *q = &rpc->waitpdu[i];
+
+ for (pdu=q->head; pdu; pdu=pdu->next) {
+ rpc_return_to_queue(&rpc->outqueue, pdu);
+ /* we have to re-send the whole pdu again */
+ pdu->written = 0;
+ }
+ rpc_reset_queue(q);
}
if (rpc->auto_reconnect != 0) {
{
int i=0;
struct rpc_pdu *pdu;
+ unsigned int n;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+ for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
i++;
}
- for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
- i++;
+
+ for (n = 0; n < HASHES; n++) {
+ struct rpc_queue *q = &rpc->waitpdu[n];
+
+ for(pdu = q->head; pdu; pdu = pdu->next)
+ i++;
}
return i;
}