Implement listening on the --listen-on socket

This commit is contained in:
Kovid Goyal 2018-03-02 10:26:19 +05:30
parent 52d2b7b09e
commit 5bfc89e13f
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -55,6 +55,9 @@ static const Child EMPTY_CHILD = {0};
pthread_mutex_##op(&screen->which##_buf_lock); pthread_mutex_##op(&screen->which##_buf_lock);
#define children_mutex(op) \ #define children_mutex(op) \
pthread_mutex_##op(&children_lock); pthread_mutex_##op(&children_lock);
#define peer_mutex(op) \
pthread_mutex_##op(&talk_data.peer_lock);
static Child children[MAX_CHILDREN] = {{0}}; static Child children[MAX_CHILDREN] = {{0}};
@ -964,38 +967,111 @@ io_loop(void *data) {
// {{{ Talk thread functions // {{{ Talk thread functions
static inline bool #define MAX_PEERS 256
handle_peer(ChildMonitor *self, int s) { #define MAX_LISTENERS 2
size_t bufsz = 0;
char *buf = NULL;
size_t buf_used = 0;
while(true) { typedef struct {
if (buf_used >= bufsz) { char *data;
bufsz = MAX(1024, bufsz) * 2; size_t capacity, used;
if (bufsz > 16 * 1024 * 1024) return false; int fd;
buf = realloc(buf, bufsz); bool finished, close_socket;
if (buf == NULL) return false; } PeerReadData;
static PeerReadData empty_prd = {.fd = -1, 0};
typedef struct {
size_t num_listen_fds, num_talk_fds, num_reads;
struct pollfd fds[MAX_PEERS + MAX_LISTENERS + 1];
PeerReadData reads[MAX_LISTENERS];
int wakeup_fds[2];
pthread_mutex_t peer_lock;
} TalkData;
static TalkData talk_data = {0};
#define nuke_socket(s) { shutdown(s, SHUT_RDWR); close(s); }
static inline bool
accept_peer(int listen_fd, bool shutting_down) {
int peer = accept(listen_fd, NULL, NULL);
if (UNLIKELY(peer == -1)) {
if (errno == EINTR) return true;
if (!shutting_down) perror("accept() on talk socket failed!");
return false;
} }
ssize_t n = recv(s, buf + buf_used, bufsz - buf_used, 0); peer_mutex(lock);
if (n == 0) break; size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds;
if (n < 0) { if (fd_idx < arraysz(talk_data.fds) && talk_data.num_reads < arraysz(talk_data.reads)) {
if (errno == EINTR) continue; talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN;
perror("Error reading from talk peer"); talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer;
break; talk_data.num_talk_fds++;
} else nuke_socket(peer);
peer_mutex(unlock);
return true;
}
static inline bool
read_from_peer(ChildMonitor *self, int s) {
bool read_finished = false;
for (size_t i = 0; i < talk_data.num_reads; i++) {
PeerReadData *rd = talk_data.reads + i;
#define failed(msg) { read_finished = true; fprintf(stderr, "%s\n", msg); rd->finished = true; rd->close_socket = true; break; }
if (rd->fd == s) {
if (rd->used >= rd->capacity) {
if (rd->capacity >= 1024 * 1024) failed("Ignoring too large message from peer");
rd->capacity = MAX(8192, rd->capacity * 2);
rd->data = realloc(rd->data, rd->capacity);
if (!rd->data) failed("Out of memory");
} }
buf_used += n; ssize_t n = recv(s, rd->data + rd->used, rd->capacity - rd->used, 0);
} if (n == 0) {
if (buf_used) { read_finished = true; rd->finished = true;
children_mutex(lock); children_mutex(lock);
ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true); ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true);
Message *m = self->messages + self->messages_count++; Message *m = self->messages + self->messages_count++;
m->data = buf; m->sz = buf_used; m->fd = s; m->data = rd->data; rd->data = NULL; m->sz = rd->used; m->fd = s;
children_mutex(unlock); children_mutex(unlock);
wakeup_main_loop(); wakeup_main_loop();
return true; } else if (n < 0) {
} else free(buf); if (errno != EINTR) {
return false; perror("Error reading from talk peer");
failed("");
}
} else rd->used += n;
break;
}
}
#undef failed
return read_finished;
}
static inline void
remove_poll_fd(int fd) {
size_t count = talk_data.num_talk_fds + talk_data.num_listen_fds;
for (size_t i = talk_data.num_listen_fds; i < count; i++) {
struct pollfd *pfd = talk_data.fds + i;
if (pfd->fd == fd) {
size_t num_to_right = count - 1 - i;
if (num_to_right) memmove(talk_data.fds + i, talk_data.fds + i + 1, num_to_right * sizeof(struct pollfd));
talk_data.num_talk_fds--;
break;
}
}
}
static inline void
prune_finished_reads() {
if (!talk_data.num_reads) return;
for (ssize_t i = talk_data.num_reads - 1; i >= 0; i--) {
PeerReadData *rd = talk_data.reads + i;
if (rd->finished) {
remove_poll_fd(rd->fd);
if (rd->close_socket) { nuke_socket(rd->fd); }
free(rd->data);
ssize_t num_to_right = talk_data.num_reads - 1 - i;
if (num_to_right > 0) memmove(talk_data.reads + i, talk_data.reads + i + 1, num_to_right * sizeof(PeerReadData));
else talk_data.reads[i] = empty_prd;
talk_data.num_reads--;
}
}
} }
static void* static void*
@ -1003,18 +1079,34 @@ talk_loop(void *data) {
// The talk thread loop // The talk thread loop
ChildMonitor *self = (ChildMonitor*)data; ChildMonitor *self = (ChildMonitor*)data;
set_thread_name("KittyTalkMon"); set_thread_name("KittyPeerMon");
if ((pthread_mutex_init(&talk_data.peer_lock, NULL)) != 0) { perror("Failed to create peer mutex"); return 0; }
if (!self_pipe(talk_data.wakeup_fds)) { perror("Failed to create wakeup fds for talk thread"); return 0; }
#define add_listener(which) \
if (self->which > -1) { \
talk_data.fds[talk_data.num_listen_fds].fd = self->which; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; \
}
add_listener(talk_fd); add_listener(listen_fd);
#undef add_listener
talk_data.fds[talk_data.num_listen_fds].fd = talk_data.wakeup_fds[0]; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN;
while (LIKELY(!self->shutting_down)) { while (LIKELY(!self->shutting_down)) {
int peer = accept(self->talk_fd, NULL, NULL); for (size_t i = 0; i < talk_data.num_listen_fds + talk_data.num_talk_fds; i++) { talk_data.fds[i].revents = 0; }
if (peer == -1) { int ret = poll(talk_data.fds, talk_data.num_listen_fds + talk_data.num_talk_fds, -1);
if (errno == EINTR) continue; if (ret > 0) {
if (!self->shutting_down) perror("accept() on talk socket failed!"); bool has_finished_reads = false;
break; for (size_t i = 0; i < talk_data.num_listen_fds - 1; i++) {
if (talk_data.fds[i].revents & POLLIN) {if (!accept_peer(talk_data.fds[i].fd, self->shutting_down)) goto end; }
} }
if (handle_peer(self, peer)) shutdown(peer, SHUT_RD); if (talk_data.fds[talk_data.num_listen_fds - 1].revents & POLLIN) drain_fd(talk_data.fds[talk_data.num_listen_fds - 1].fd); // wakeup
else { shutdown(peer, SHUT_RDWR); close(peer); } for (size_t i = talk_data.num_listen_fds; i < talk_data.num_talk_fds + talk_data.num_listen_fds; i++) {
if (talk_data.fds[i].revents & (POLLIN | POLLHUP)) { if (read_from_peer(self, talk_data.fds[i].fd)) has_finished_reads = true; }
} }
if (has_finished_reads) prune_finished_reads();
} else if (ret < 0) { if (errno != EAGAIN && errno != EINTR) perror("poll() on talk fds failed"); }
}
end:
close(talk_data.wakeup_fds[0]); close(talk_data.wakeup_fds[1]);
return 0; return 0;
} }