return rpc->fd;
}
+static int rpc_has_queue(struct rpc_queue *q)
+{
+ return q->head != NULL;
+}
+
int rpc_which_events(struct rpc_context *rpc)
{
int events;
return POLLIN;
}
- if (rpc->outqueue) {
+ if (rpc_has_queue(&rpc->outqueue)) {
events |= POLLOUT;
}
return events;
static int rpc_write_to_socket(struct rpc_context *rpc)
{
int32_t count;
+ struct rpc_pdu *pdu;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
return -1;
}
- while (rpc->outqueue != NULL) {
+ while ((pdu = rpc->outqueue.head) != NULL) {
int64_t total;
- total = rpc->outqueue->outdata.size;
+ total = pdu->outdata.size;
- count = send(rpc->fd, rpc->outqueue->outdata.data + rpc->outqueue->written, total - rpc->outqueue->written, 0);
+ count = send(rpc->fd, pdu->outdata.data + pdu->written, total - pdu->written, 0);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
return -1;
}
- rpc->outqueue->written += count;
- if (rpc->outqueue->written == total) {
- struct rpc_pdu *pdu = rpc->outqueue;
-
- SLIST_REMOVE(&rpc->outqueue, pdu);
- SLIST_ADD_END(&rpc->waitpdu, pdu);
+ pdu->written += count;
+ if (pdu->written == total) {
+ rpc->outqueue.head = pdu->next;
+ if (pdu->next == NULL)
+ rpc->outqueue.tail = NULL;
+ rpc_enqueue(&rpc->waitpdu, pdu);
}
}
return 0;
}
}
- if (revents & POLLOUT && rpc->outqueue != NULL) {
+ if (revents & POLLOUT && rpc_has_queue(&rpc->outqueue)) {
if (rpc_write_to_socket(rpc) != 0) {
rpc_set_error(rpc, "write to socket failed");
return -1;
/* socket is closed so we will not get any replies to any commands
* in flight. Move them all over from the waitpdu queue back to the out queue
*/
- for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
- SLIST_REMOVE(&rpc->waitpdu, pdu);
- SLIST_ADD(&rpc->outqueue, pdu);
+ for (pdu=rpc->waitpdu.head; pdu; pdu=pdu->next) {
+ rpc_return_to_queue(&rpc->outqueue, pdu);
/* we have to re-send the whole pdu again */
pdu->written = 0;
}
+ rpc_reset_queue(&rpc->waitpdu);
if (rpc->auto_reconnect != 0) {
rpc->connect_cb = reconnect_cb;
assert(rpc->magic == RPC_CONTEXT_MAGIC);
- for(pdu = rpc->outqueue; pdu; pdu = pdu->next) {
+ for(pdu = rpc->outqueue.head; pdu; pdu = pdu->next) {
i++;
}
- for(pdu = rpc->waitpdu; pdu; pdu = pdu->next) {
+ for(pdu = rpc->waitpdu.head; pdu; pdu = pdu->next) {
i++;
}
return i;