#endif
-static int rpc_disconnect_requeue(struct rpc_context *rpc);
+static int rpc_reconnect_requeue(struct rpc_context *rpc);
+static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s);
static void set_nonblocking(int fd)
{
if (revents & POLLIN) {
if (rpc_read_from_socket(rpc) != 0) {
- rpc_disconnect_requeue(rpc);
+ rpc_reconnect_requeue(rpc);
return 0;
}
}
return 0;
}
-int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
+void rpc_set_autoreconnect(struct rpc_context *rpc)
{
- struct sockaddr_storage s;
- struct sockaddr_in *sin = (struct sockaddr_in *)&s;
- int socksize;
-
- if (rpc->fd != -1) {
- rpc_set_error(rpc, "Trying to connect while already connected");
- return -1;
- }
+ rpc->auto_reconnect = 1;
+}
- if (rpc->is_udp != 0) {
- rpc_set_error(rpc, "Trying to connect on UDP socket");
- return -1;
- }
+void rpc_unset_autoreconnect(struct rpc_context *rpc)
+{
+ rpc->auto_reconnect = 0;
+}
- sin->sin_family = AF_INET;
- sin->sin_port = htons(port);
- if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
- rpc_set_error(rpc, "Not a valid server ip address");
- return -1;
- }
+static int rpc_connect_sockaddr_async(struct rpc_context *rpc, struct sockaddr_storage *s)
+{
+ int socksize;
- switch (s.ss_family) {
+ switch (s->ss_family) {
case AF_INET:
socksize = sizeof(struct sockaddr_in);
-#ifdef HAVE_SOCKADDR_LEN
- sin->sin_len = socksize;
-#endif
rpc->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
break;
+ default:
+ rpc_set_error(rpc, "Can not handle AF_FAMILY:%d", s->ss_family);
+ return -1;
}
if (rpc->fd == -1) {
return -1;
}
- rpc->connect_cb = cb;
- rpc->connect_data = private_data;
-
#if !defined(WIN32)
/* Some systems allow you to set capabilities on an executable
set_nonblocking(rpc->fd);
#if defined(WIN32)
- if (connect(rpc->fd, (struct sockaddr *)&s, socksize) == 0 && errno != EINPROGRESS )
+ if (connect(rpc->fd, (struct sockaddr *)s, socksize) == 0 && errno != EINPROGRESS )
#else
- if (connect(rpc->fd, (struct sockaddr *)&s, socksize) != 0 && errno != EINPROGRESS)
+ if (connect(rpc->fd, (struct sockaddr *)s, socksize) != 0 && errno != EINPROGRESS)
#endif
{
rpc_set_error(rpc, "connect() to server failed");
return 0;
}
+int rpc_connect_async(struct rpc_context *rpc, const char *server, int port, rpc_cb cb, void *private_data)
+{
+ struct sockaddr_in *sin = (struct sockaddr_in *)&rpc->s;
+
+ if (rpc->fd != -1) {
+ rpc_set_error(rpc, "Trying to connect while already connected");
+ return -1;
+ }
+
+ if (rpc->is_udp != 0) {
+ rpc_set_error(rpc, "Trying to connect on UDP socket");
+ return -1;
+ }
+
+ rpc->auto_reconnect = 0;
+
+ sin->sin_family = AF_INET;
+ sin->sin_port = htons(port);
+ if (inet_pton(AF_INET, server, &sin->sin_addr) != 1) {
+ rpc_set_error(rpc, "Not a valid server ip address");
+ return -1;
+ }
+
+
+ switch (rpc->s.ss_family) {
+ case AF_INET:
+#ifdef HAVE_SOCKADDR_LEN
+ sin->sin_len = sizeof(struct sockaddr_in);
+#endif
+ break;
+ }
+
+ rpc->connect_cb = cb;
+ rpc->connect_data = private_data;
+
+ if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
int rpc_disconnect(struct rpc_context *rpc, char *error)
{
+ rpc_unset_autoreconnect(rpc);
+
if (rpc->fd != -1) {
#if defined(WIN32)
closesocket(rpc->fd);
return 0;
}
-/* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue */
-static int rpc_disconnect_requeue(struct rpc_context *rpc)
+static void reconnect_cb(struct rpc_context *rpc, int status, void *data _U_, void *private_data)
+{
+ if (status != RPC_STATUS_SUCCESS) {
+ rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
+ return;
+ }
+
+ rpc->is_connected = 1;
+}
+
+/* disconnect but do not error all PDUs, just move pdus in-flight back to the outqueue and reconnect */
+static int rpc_reconnect_requeue(struct rpc_context *rpc)
{
struct rpc_pdu *pdu;
for (pdu=rpc->waitpdu; pdu; pdu=pdu->next) {
SLIST_REMOVE(&rpc->waitpdu, pdu);
SLIST_ADD(&rpc->outqueue, pdu);
+ /* we have to re-send the whole pdu again */
+ pdu->written = 0;
+ }
+
+ if (rpc->auto_reconnect != 0) {
+ rpc->connect_cb = reconnect_cb;
+
+ if (rpc_connect_sockaddr_async(rpc, &rpc->s) != 0) {
+ rpc_error_all_pdus(rpc, "RPC ERROR: Failed to reconnect async");
+ return -1;
+ }
}
return 0;