Don't do a DNS lookup on every connection attempt, fix TCP disconnect detection
This commit is contained in:
152
src/services.c
152
src/services.c
@@ -15,67 +15,61 @@ Vector status_timeline; // of StatusPeroid*
|
||||
|
||||
void* run_endpoint(void* endpoint) {
|
||||
struct Endpoint* self = (struct Endpoint*)endpoint;
|
||||
struct timespec start, end;
|
||||
struct addrinfo* result = NULL;
|
||||
while (self->enabled) {
|
||||
int success = 0;
|
||||
if (strcmp(self->scheme, "tcp") == 0 || strcmp(self->scheme, "udp") == 0) {
|
||||
if (self->sockfd < 0) {
|
||||
struct addrinfo hints = {0}, *result, *rp;
|
||||
hints.ai_family = AF_UNSPEC;
|
||||
if (strcmp(self->scheme, "udp") == 0) {
|
||||
hints.ai_socktype = SOCK_DGRAM;
|
||||
hints.ai_protocol = IPPROTO_UDP;
|
||||
} else {
|
||||
hints.ai_socktype = SOCK_STREAM;
|
||||
hints.ai_protocol = IPPROTO_TCP;
|
||||
}
|
||||
struct addrinfo hints = {0};
|
||||
if (!result) {
|
||||
hints.ai_family = AF_UNSPEC;
|
||||
if (strcmp(self->scheme, "udp") == 0) {
|
||||
hints.ai_socktype = SOCK_DGRAM;
|
||||
hints.ai_protocol = IPPROTO_UDP;
|
||||
} else {
|
||||
hints.ai_socktype = SOCK_STREAM;
|
||||
hints.ai_protocol = IPPROTO_TCP;
|
||||
}
|
||||
|
||||
char port_str[6];
|
||||
snprintf(port_str, sizeof(port_str), "%d", self->port);
|
||||
char port_str[6] = {0};
|
||||
snprintf(port_str, sizeof(port_str), "%d", self->port);
|
||||
|
||||
int ret = getaddrinfo(self->target, port_str, &hints, &result);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "getaddrinfo failed: %s\n", gai_strerror(ret));
|
||||
queue_report((struct EndpointStatus){ self, DOWN, -1.0 });
|
||||
sleep(self->backoff);
|
||||
continue;
|
||||
int ret = getaddrinfo(self->target, port_str, &hints, &result);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "getaddrinfo failed: %s\n", gai_strerror(ret));
|
||||
queue_report((struct EndpointStatus){ self, DOWN, -1.0 });
|
||||
sleep(self->backoff);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
self->sockfd = -1;
|
||||
for (rp = result; rp != NULL; rp = rp->ai_next) {
|
||||
for (struct addrinfo* rp = result; rp != NULL; rp = rp->ai_next) {
|
||||
int fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
|
||||
if (fd == -1) continue;
|
||||
|
||||
int flags = fcntl(fd, F_GETFL, 0);
|
||||
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
||||
|
||||
if (strcmp(self->scheme, "tcp") == 0) {
|
||||
int opt = 1;
|
||||
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &opt, sizeof(opt));
|
||||
|
||||
int keepidle = 0;
|
||||
setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle));
|
||||
|
||||
int keepintvl = self->backoff;
|
||||
setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, sizeof(keepintvl));
|
||||
|
||||
int keepcnt = 1;
|
||||
setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt));
|
||||
}
|
||||
|
||||
struct timespec connect_start;
|
||||
clock_gettime(CLOCK_REALTIME_ALARM, &connect_start);
|
||||
|
||||
ret = connect(fd, rp->ai_addr, rp->ai_addrlen);
|
||||
int ret = connect(fd, rp->ai_addr, rp->ai_addrlen);
|
||||
int error = 0;
|
||||
if (strcmp(self->scheme, "tcp") == 0) {
|
||||
if (errno == EINPROGRESS) {
|
||||
struct pollfd pfd = { .fd = fd, .events = POLLOUT };
|
||||
int pollret = poll(&pfd, 1, 5000);
|
||||
|
||||
int opt = 1;
|
||||
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &opt, sizeof(opt));
|
||||
|
||||
if (ret == 0) {
|
||||
|
||||
} else if (errno == EINPROGRESS) {
|
||||
struct pollfd pfd = { .fd = fd, .events = POLLOUT };
|
||||
|
||||
int pollret = poll(&pfd, 1, 5000);
|
||||
if (pollret > 0 && (pfd.revents & POLLOUT)) {
|
||||
int error = 0;
|
||||
socklen_t len = sizeof(error);
|
||||
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 && error != 0) {
|
||||
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 || error != 0) {
|
||||
close(fd);
|
||||
continue;
|
||||
}
|
||||
@@ -99,31 +93,33 @@ void* run_endpoint(void* endpoint) {
|
||||
}
|
||||
|
||||
struct pollfd pfd = { .fd = fd, .events = POLLIN | POLLERR };
|
||||
if (poll(&pfd, 1, (self->backoff > 0 ? self->backoff * 1000 : 10000)) > 0 && (pfd.revents & (POLLERR | POLLIN))) {
|
||||
if (poll(&pfd, 1, (self->backoff)) > 0 && (pfd.revents & (POLLERR | POLLIN))) {
|
||||
if (recv(fd, NULL, sizeof(NULL), MSG_DONTWAIT) < 0) {
|
||||
self->last_errno = errno;
|
||||
if (self->last_errno == ECONNREFUSED || self->last_errno == EHOSTUNREACH || self->last_errno == ENETUNREACH) {
|
||||
close(fd);
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_REALTIME_ALARM, &now);
|
||||
if (fd > 0) {
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_REALTIME_ALARM, &now);
|
||||
|
||||
double ping = (now.tv_sec - connect_start.tv_sec) * 1000.0 + (now.tv_nsec - connect_start.tv_nsec) / 1000000.0;
|
||||
double ping = (now.tv_sec - connect_start.tv_sec) * 1000.0 + (now.tv_nsec - connect_start.tv_nsec) / 1000000.0;
|
||||
|
||||
self->sockfd = fd;
|
||||
self->last_errno = 0;
|
||||
queue_report((struct EndpointStatus){ self, UP, ping });
|
||||
self->sockfd = fd;
|
||||
self->last_errno = 0;
|
||||
queue_report((struct EndpointStatus){ self, UP, ping });
|
||||
break;
|
||||
}
|
||||
}
|
||||
freeaddrinfo(result);
|
||||
|
||||
if (self->sockfd < 0) {
|
||||
if (self->last_errno != errno) {
|
||||
self->last_errno = errno;
|
||||
fprintf(stderr, "%s://%s:%u connect failed: %i\n", self->scheme, self->target, self->port, errno);
|
||||
fprintf(stderr, "[%lu] %s://%s:%u connect failed: %i\n", time(NULL), self->scheme, self->target, self->port, errno);
|
||||
}
|
||||
queue_report((struct EndpointStatus){ self, DOWN, -1.0 });
|
||||
sleep(self->backoff);
|
||||
@@ -131,7 +127,7 @@ void* run_endpoint(void* endpoint) {
|
||||
}
|
||||
}
|
||||
|
||||
struct pollfd pfd = { .fd = self->sockfd, .events = POLLERR | POLLHUP | POLLRDHUP };
|
||||
struct pollfd pfd = { .fd = self->sockfd, .events = POLLIN | POLLERR | POLLHUP | POLLRDHUP };
|
||||
int pollret = poll(&pfd, 1, (self->backoff > 0 ? self->backoff * 1000 : 10000));
|
||||
|
||||
if (pollret < 0) {
|
||||
@@ -140,21 +136,26 @@ void* run_endpoint(void* endpoint) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pfd.revents & (POLLERR | POLLHUP | POLLRDHUP)) {
|
||||
close(self->sockfd);
|
||||
self->sockfd = -1;
|
||||
//queue_report((struct EndpointStatus){ self, DOWN, -1.0 });
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(self->scheme, "udp") == 0) {
|
||||
if (send(self->sockfd, NULL, sizeof(NULL), 0) < 0) {
|
||||
if (pfd.revents & POLLIN) {
|
||||
if (recv(self->sockfd, NULL, sizeof(NULL), MSG_DONTWAIT | MSG_PEEK) < 0) {
|
||||
close(self->sockfd);
|
||||
self->sockfd = -1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (pfd.revents & (POLLERR | POLLHUP | POLLRDHUP)) {
|
||||
close(self->sockfd);
|
||||
self->sockfd = -1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(self->scheme, "udp") == 0 && send(self->sockfd, NULL, 0, 0) < 0) {
|
||||
close(self->sockfd);
|
||||
self->sockfd = -1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pollret == 0) {
|
||||
time_t now = time(NULL);
|
||||
if (now - self->last_check >= self->backoff) {
|
||||
@@ -172,6 +173,8 @@ void* run_endpoint(void* endpoint) {
|
||||
close(self->sockfd);
|
||||
self->sockfd = -1;
|
||||
}
|
||||
freeaddrinfo(result);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -199,7 +202,7 @@ void load_service(struct Service** self) {
|
||||
|
||||
uint8_t add_endpoint(struct Service* self, const char* uri) {
|
||||
if (!uri || !*uri) return 255;
|
||||
struct Endpoint* endpoint = (struct Endpoint*)malloc(2048);
|
||||
struct Endpoint* endpoint = malloc(sizeof(struct Endpoint));
|
||||
endpoint->service = self;
|
||||
endpoint->enabled = true;
|
||||
endpoint->sockfd = -1;
|
||||
@@ -260,6 +263,8 @@ uint8_t add_endpoint(struct Service* self, const char* uri) {
|
||||
endpoint->port = (int)strtol(p, NULL, 10);
|
||||
}
|
||||
|
||||
endpoint->last_check = 0;
|
||||
endpoint->last_state = NONE;
|
||||
printf("[%s] Added endpoint to service: %s, %s, %i\n", self->id, endpoint->scheme, endpoint->target, endpoint->port);
|
||||
vector_push_back(&self->endpoints, &endpoint);
|
||||
return 0;
|
||||
@@ -326,12 +331,15 @@ struct ServerStatus* update_server_status(struct Service** self) {
|
||||
|
||||
size_t pings_len = index;
|
||||
double ping_sum = 0;
|
||||
for (int i = 0; i < sizeof(pings)/sizeof(double); i++)
|
||||
if (pings[i] >= 0) {
|
||||
ping_sum+=pings[i];
|
||||
pings_len--;
|
||||
}
|
||||
double avg_ping = ping_sum / pings_len;
|
||||
double avg_ping;
|
||||
if (pings_len > 0) {
|
||||
for (int i = 0; i < sizeof(pings)/sizeof(double); i++)
|
||||
if (pings[i] >= 0) {
|
||||
ping_sum+=pings[i];
|
||||
pings_len--;
|
||||
}
|
||||
avg_ping = ping_sum / pings_len;
|
||||
} else avg_ping = -1.0;
|
||||
|
||||
server_status->state = service_state;
|
||||
server_status->ping = avg_ping;
|
||||
@@ -339,9 +347,11 @@ struct ServerStatus* update_server_status(struct Service** self) {
|
||||
}
|
||||
|
||||
const char* state_text(enum State state) {
|
||||
if (state == UP)
|
||||
return "UP";
|
||||
else if (state == PARTIAL)
|
||||
return "PARTIAL";
|
||||
else return "DOWN";
|
||||
switch (state) {
|
||||
case NONE: return "NONE";
|
||||
case DOWN: return "DOWN";
|
||||
case PARTIAL: return "PARTIAL";
|
||||
case UP: return "UP";
|
||||
}
|
||||
return "UNKNOWN";
|
||||
}
|
||||
@@ -16,7 +16,7 @@
|
||||
#include "../deps/vector/vector.h"
|
||||
#include "util.h"
|
||||
|
||||
enum State { DOWN, PARTIAL, UP };
|
||||
enum State { NONE, DOWN, PARTIAL, UP };
|
||||
struct StatusPeroid;
|
||||
struct ServerStatus;
|
||||
struct EndpointStatus;
|
||||
@@ -63,6 +63,7 @@ struct Endpoint {
|
||||
double last_ping;
|
||||
Vector status_history;
|
||||
pthread_t thread;
|
||||
struct addrinfo* addr;
|
||||
int sockfd;
|
||||
int last_errno;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user