From 5bfc89e13f5cb5caa271adfcfea37632929aaa6a Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 2 Mar 2018 10:26:19 +0530 Subject: [PATCH] Implement listening on the --listen-on socket --- kitty/child-monitor.c | 166 ++++++++++++++++++++++++++++++++---------- 1 file changed, 129 insertions(+), 37 deletions(-) diff --git a/kitty/child-monitor.c b/kitty/child-monitor.c index fd6bfffd4..fe11f9a02 100644 --- a/kitty/child-monitor.c +++ b/kitty/child-monitor.c @@ -55,6 +55,9 @@ static const Child EMPTY_CHILD = {0}; pthread_mutex_##op(&screen->which##_buf_lock); #define children_mutex(op) \ pthread_mutex_##op(&children_lock); +#define peer_mutex(op) \ + pthread_mutex_##op(&talk_data.peer_lock); + static Child children[MAX_CHILDREN] = {{0}}; @@ -964,38 +967,111 @@ io_loop(void *data) { // {{{ Talk thread functions -static inline bool -handle_peer(ChildMonitor *self, int s) { - size_t bufsz = 0; - char *buf = NULL; - size_t buf_used = 0; +#define MAX_PEERS 256 +#define MAX_LISTENERS 2 - while(true) { - if (buf_used >= bufsz) { - bufsz = MAX(1024, bufsz) * 2; - if (bufsz > 16 * 1024 * 1024) return false; - buf = realloc(buf, bufsz); - if (buf == NULL) return false; - } - ssize_t n = recv(s, buf + buf_used, bufsz - buf_used, 0); - if (n == 0) break; - if (n < 0) { - if (errno == EINTR) continue; - perror("Error reading from talk peer"); +typedef struct { + char *data; + size_t capacity, used; + int fd; + bool finished, close_socket; +} PeerReadData; +static PeerReadData empty_prd = {.fd = -1, 0}; + +typedef struct { + size_t num_listen_fds, num_talk_fds, num_reads; + struct pollfd fds[MAX_PEERS + MAX_LISTENERS + 1]; + PeerReadData reads[MAX_LISTENERS]; + int wakeup_fds[2]; + pthread_mutex_t peer_lock; +} TalkData; + +static TalkData talk_data = {0}; +#define nuke_socket(s) { shutdown(s, SHUT_RDWR); close(s); } + +static inline bool +accept_peer(int listen_fd, bool shutting_down) { + int peer = accept(listen_fd, NULL, NULL); + if (UNLIKELY(peer == -1)) { + if (errno == EINTR) return true; + if (!shutting_down) perror("accept() on talk socket failed!"); + return false; + } + peer_mutex(lock); + size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds; + if (fd_idx < arraysz(talk_data.fds) && talk_data.num_reads < arraysz(talk_data.reads)) { + talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN; + talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer; + talk_data.num_talk_fds++; + } else nuke_socket(peer); + peer_mutex(unlock); + return true; +} + +static inline bool +read_from_peer(ChildMonitor *self, int s) { + bool read_finished = false; + for (size_t i = 0; i < talk_data.num_reads; i++) { + PeerReadData *rd = talk_data.reads + i; +#define failed(msg) { read_finished = true; fprintf(stderr, "%s\n", msg); rd->finished = true; rd->close_socket = true; break; } + if (rd->fd == s) { + if (rd->used >= rd->capacity) { + if (rd->capacity >= 1024 * 1024) failed("Ignoring too large message from peer"); + rd->capacity = MAX(8192, rd->capacity * 2); + rd->data = realloc(rd->data, rd->capacity); + if (!rd->data) failed("Out of memory"); + } + ssize_t n = recv(s, rd->data + rd->used, rd->capacity - rd->used, 0); + if (n == 0) { + read_finished = true; rd->finished = true; + children_mutex(lock); + ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true); + Message *m = self->messages + self->messages_count++; + m->data = rd->data; rd->data = NULL; m->sz = rd->used; m->fd = s; + children_mutex(unlock); + wakeup_main_loop(); + } else if (n < 0) { + if (errno != EINTR) { + perror("Error reading from talk peer"); + failed(""); + } + } else rd->used += n; break; } - buf_used += n; } - if (buf_used) { - children_mutex(lock); - ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true); - Message *m = self->messages + self->messages_count++; - m->data = buf; m->sz = buf_used; m->fd = s; - children_mutex(unlock); - wakeup_main_loop(); - return true; - } else free(buf); - return false; +#undef failed + return read_finished; +} + +static inline void +remove_poll_fd(int fd) { + size_t count = talk_data.num_talk_fds + talk_data.num_listen_fds; + for (size_t i = talk_data.num_listen_fds; i < count; i++) { + struct pollfd *pfd = talk_data.fds + i; + if (pfd->fd == fd) { + size_t num_to_right = count - 1 - i; + if (num_to_right) memmove(talk_data.fds + i, talk_data.fds + i + 1, num_to_right * sizeof(struct pollfd)); + talk_data.num_talk_fds--; + break; + } + } +} + +static inline void +prune_finished_reads() { + if (!talk_data.num_reads) return; + for (ssize_t i = talk_data.num_reads - 1; i >= 0; i--) { + PeerReadData *rd = talk_data.reads + i; + if (rd->finished) { + remove_poll_fd(rd->fd); + if (rd->close_socket) { nuke_socket(rd->fd); } + free(rd->data); + ssize_t num_to_right = talk_data.num_reads - 1 - i; + if (num_to_right > 0) memmove(talk_data.reads + i, talk_data.reads + i + 1, num_to_right * sizeof(PeerReadData)); + else talk_data.reads[i] = empty_prd; + talk_data.num_reads--; + } + } } static void* @@ -1003,18 +1079,34 @@ talk_loop(void *data) { // The talk thread loop ChildMonitor *self = (ChildMonitor*)data; - set_thread_name("KittyTalkMon"); + set_thread_name("KittyPeerMon"); + if ((pthread_mutex_init(&talk_data.peer_lock, NULL)) != 0) { perror("Failed to create peer mutex"); return 0; } + if (!self_pipe(talk_data.wakeup_fds)) { perror("Failed to create wakeup fds for talk thread"); return 0; } +#define add_listener(which) \ + if (self->which > -1) { \ + talk_data.fds[talk_data.num_listen_fds].fd = self->which; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; \ + } + add_listener(talk_fd); add_listener(listen_fd); +#undef add_listener + talk_data.fds[talk_data.num_listen_fds].fd = talk_data.wakeup_fds[0]; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; while (LIKELY(!self->shutting_down)) { - int peer = accept(self->talk_fd, NULL, NULL); - if (peer == -1) { - if (errno == EINTR) continue; - if (!self->shutting_down) perror("accept() on talk socket failed!"); - break; - } - if (handle_peer(self, peer)) shutdown(peer, SHUT_RD); - else { shutdown(peer, SHUT_RDWR); close(peer); } + for (size_t i = 0; i < talk_data.num_listen_fds + talk_data.num_talk_fds; i++) { talk_data.fds[i].revents = 0; } + int ret = poll(talk_data.fds, talk_data.num_listen_fds + talk_data.num_talk_fds, -1); + if (ret > 0) { + bool has_finished_reads = false; + for (size_t i = 0; i < talk_data.num_listen_fds - 1; i++) { + if (talk_data.fds[i].revents & POLLIN) {if (!accept_peer(talk_data.fds[i].fd, self->shutting_down)) goto end; } + } + if (talk_data.fds[talk_data.num_listen_fds - 1].revents & POLLIN) drain_fd(talk_data.fds[talk_data.num_listen_fds - 1].fd); // wakeup + for (size_t i = talk_data.num_listen_fds; i < talk_data.num_talk_fds + talk_data.num_listen_fds; i++) { + if (talk_data.fds[i].revents & (POLLIN | POLLHUP)) { if (read_from_peer(self, talk_data.fds[i].fd)) has_finished_reads = true; } + } + if (has_finished_reads) prune_finished_reads(); + } else if (ret < 0) { if (errno != EAGAIN && errno != EINTR) perror("poll() on talk fds failed"); } } +end: + close(talk_data.wakeup_fds[0]); close(talk_data.wakeup_fds[1]); return 0; }