diff --git a/docs/changelog.rst b/docs/changelog.rst index 35d584d5c..288100234 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -15,6 +15,9 @@ To update |kitty|, :doc:`follow the instructions `. - Improve anti-aliasing of triangular box drawing characters, noticeable on low-resolution screens (:iss:`2844`) +- Fix ``kitty @ send-text`` not working reliably when using a docket for remote + control (:iss:`2852`) + - Implement support for box drawing rounded-corners characters (:iss:`2240`) - When a character from the Unicode Dingbat block is followed by a space, use diff --git a/kitty/child-monitor.c b/kitty/child-monitor.c index 94bd703df..b2cf19b83 100644 --- a/kitty/child-monitor.c +++ b/kitty/child-monitor.c @@ -40,7 +40,7 @@ static void (*parse_func)(Screen*, PyObject*, monotonic_t); typedef struct { char *data; size_t sz; - int fd; + id_type peer_id; } Message; typedef struct { @@ -71,9 +71,8 @@ static const Child EMPTY_CHILD = {0}; pthread_mutex_##op(&screen->which##_buf_lock); #define children_mutex(op) \ pthread_mutex_##op(&children_lock); -#define peer_mutex(op) \ - pthread_mutex_##op(&talk_data.peer_lock); - +#define talk_mutex(op) \ + pthread_mutex_##op(&talk_lock); static Child children[MAX_CHILDREN] = {{0}}; @@ -82,7 +81,7 @@ static Child add_queue[MAX_CHILDREN] = {{0}}, remove_queue[MAX_CHILDREN] = {{0}} static unsigned long remove_notify[MAX_CHILDREN] = {0}; static size_t add_queue_count = 0, remove_queue_count = 0; static struct pollfd fds[MAX_CHILDREN + EXTRA_FDS] = {{0}}; -static pthread_mutex_t children_lock; +static pthread_mutex_t children_lock, talk_lock; static bool kill_signal_received = false; static ChildMonitor *the_monitor = NULL; static uint8_t drain_buf[1024]; @@ -131,6 +130,10 @@ new(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) { PyErr_Format(PyExc_RuntimeError, "Failed to create children_lock mutex: %s", strerror(ret)); return NULL; } + if ((ret = pthread_mutex_init(&talk_lock, NULL)) != 0) { + PyErr_Format(PyExc_RuntimeError, "Failed to create talk_lock mutex: %s", strerror(ret)); + return NULL; + } self = (ChildMonitor *)type->tp_alloc(type, 0); if (!init_loop_data(&self->io_loop_data)) return PyErr_SetFromErrno(PyExc_OSError); if (!install_signal_handlers(&self->io_loop_data)) return PyErr_SetFromErrno(PyExc_OSError); @@ -153,6 +156,7 @@ new(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) { static void dealloc(ChildMonitor* self) { pthread_mutex_destroy(&children_lock); + pthread_mutex_destroy(&talk_lock); Py_CLEAR(self->dump_callback); Py_CLEAR(self->death_notify); Py_TYPE(self)->tp_free((PyObject*)self); @@ -174,7 +178,7 @@ wakeup_io_loop(ChildMonitor *self, bool in_signal_handler) { static void* io_loop(void *data); static void* talk_loop(void *data); -static void send_response(int fd, const char *msg, size_t msg_sz); +static void send_response(id_type peer_id, const char *msg, size_t msg_sz); static void wakeup_talk_loop(bool); static bool talk_thread_started = false; @@ -324,7 +328,6 @@ parse_input(ChildMonitor *self) { size_t count = 0, remove_count = 0; bool input_read = false; monotonic_t now = monotonic(); - PyObject *msg = NULL; children_mutex(lock); while (remove_queue_count) { remove_queue_count--; @@ -333,18 +336,6 @@ parse_input(ChildMonitor *self) { FREE_CHILD(remove_queue[remove_queue_count]); } - if (UNLIKELY(self->messages_count)) { - msg = PyTuple_New(self->messages_count); - if (msg) { - for (size_t i = 0; i < self->messages_count; i++) { - Message *m = self->messages + i; - PyTuple_SET_ITEM(msg, i, Py_BuildValue("y#i", m->data, (int)m->sz, m->fd)); - free(m->data); m->data = NULL; m->sz = 0; - } - self->messages_count = 0; - } else fatal("Out of memory"); - } - if (UNLIKELY(kill_signal_received)) { global_state.quit_request = IMPERATIVE_CLOSE_REQUESTED; global_state.has_pending_closes = true; @@ -358,15 +349,34 @@ parse_input(ChildMonitor *self) { } } children_mutex(unlock); - if (msg) { - for (Py_ssize_t i = 0; i < PyTuple_GET_SIZE(msg); i++) { - PyObject *resp = PyObject_CallMethod(global_state.boss, "peer_message_received", "O", PyTuple_GET_ITEM(PyTuple_GET_ITEM(msg, i), 0)); - int peer_fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(PyTuple_GET_ITEM(msg, i), 1)); - if (resp && PyBytes_Check(resp)) send_response(peer_fd, PyBytes_AS_STRING(resp), PyBytes_GET_SIZE(resp)); - else { send_response(peer_fd, NULL, 0); if (!resp) PyErr_Print(); } + + Message *msgs = NULL; + size_t msgs_count = 0; + talk_mutex(lock); + if (UNLIKELY(self->messages_count)) { + msgs = malloc(sizeof(Message) * self->messages_count); + if (msgs) { + memcpy(msgs, self->messages, sizeof(Message) * self->messages_count); + msgs_count = self->messages_count; + } + self->messages_count = 0; + } + talk_mutex(unlock); + + if (msgs_count) { + for (size_t i = 0; i < msgs_count; i++) { + Message *msg = msgs + i; + PyObject *resp = NULL; + if (msg->data) { + resp = PyObject_CallMethod(global_state.boss, "peer_message_received", "y#", msg->data, (int)msg->sz); + free(msg->data); + if (!resp) PyErr_Print(); + } + if (resp && PyBytes_Check(resp)) send_response(msg->peer_id, PyBytes_AS_STRING(resp), PyBytes_GET_SIZE(resp)); + else send_response(msg->peer_id, NULL, 0); Py_CLEAR(resp); } - Py_CLEAR(msg); + free(msgs); msgs = NULL; } while(remove_count) { @@ -1279,33 +1289,30 @@ io_loop(void *data) { // {{{ Talk thread functions typedef struct { - char *data; - size_t capacity, used; + id_type id; + size_t num_of_unresponded_messages_sent_to_main_thread; + bool finished_reading; int fd; - bool finished, close_socket, is_peer_command; -} PeerReadData; -static PeerReadData empty_prd = {.fd = -1, 0}; + struct { + char *data; + size_t capacity, used, command_end; + bool finished; + } read; + struct { + char *data; + size_t capacity, used; + bool failed; + } write; +} Peer; +static id_type peer_id_counter = 0; typedef struct { - char *data; - size_t sz, pos; - int fd; - bool finished; -} PeerWriteData; -static PeerWriteData empty_pwd = {.fd = -1, 0}; - -typedef struct { - size_t num_listen_fds, num_talk_fds, num_reads, num_writes, num_queued_writes; - size_t fds_capacity, reads_capacity, writes_capacity, queued_writes_capacity; - struct pollfd *fds; - PeerReadData *reads; - PeerWriteData *writes; - PeerWriteData *queued_writes; + size_t num_peers, peers_capacity; + Peer *peers; LoopData loop_data; - pthread_mutex_t peer_lock; } TalkData; - static TalkData talk_data = {0}; + typedef struct pollfd PollFD; #define PEER_LIMIT 256 #define nuke_socket(s) { shutdown(s, SHUT_RDWR); safe_close(s, __FILE__, __LINE__); } @@ -1318,13 +1325,12 @@ accept_peer(int listen_fd, bool shutting_down) { if (!shutting_down) perror("accept() on talk socket failed!"); return false; } - size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds; - if (fd_idx < PEER_LIMIT && talk_data.reads_capacity < PEER_LIMIT) { - ensure_space_for(&talk_data, fds, PollFD, fd_idx + 1, fds_capacity, 8, false); - talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN; - ensure_space_for(&talk_data, reads, PeerReadData, talk_data.num_reads + 1, reads_capacity, 8, false); - talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer; - talk_data.num_talk_fds++; + if (talk_data.num_peers < PEER_LIMIT) { + ensure_space_for(&talk_data, peers, Peer, talk_data.num_peers + 8, peers_capacity, 8, false); + Peer *p = talk_data.peers + talk_data.num_peers++; + memset(p, 0, sizeof(Peer)); + p->fd = peer; p->id = ++peer_id_counter; + if (!p->id) p->id = ++peer_id_counter; } else { log_error("Too many peers want to talk, ignoring one."); nuke_socket(peer); @@ -1332,168 +1338,102 @@ accept_peer(int listen_fd, bool shutting_down) { return true; } +static inline void +free_peer(Peer *peer) { + free(peer->read.data); peer->read.data = NULL; + free(peer->write.data); peer->write.data = NULL; + if (peer->fd > -1) { nuke_socket(peer->fd); peer->fd = -1; } +} + #define KITTY_CMD_PREFIX "\x1bP@kitty-cmd{" static inline void -queue_peer_message(ChildMonitor *self, char *buf, size_t sz, int fd) { - children_mutex(lock); - ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true); +queue_peer_message(ChildMonitor *self, Peer *peer) { + talk_mutex(lock); + ensure_space_for(self, messages, Message, self->messages_count + 16, messages_capacity, 16, true); Message *m = self->messages + self->messages_count++; - m->data = buf; m->sz = sz; m->fd = fd; - children_mutex(unlock); + memset(m, 0, sizeof(Message)); + if (peer->read.used) { + m->data = malloc(peer->read.used); + if (m->data) { + memcpy(m->data, peer->read.data, peer->read.used); + m->sz = peer->read.used; + } + } + m->peer_id = peer->id; + peer->num_of_unresponded_messages_sent_to_main_thread++; + talk_mutex(unlock); wakeup_main_loop(); } -static inline void -dispatch_peer_command(ChildMonitor *self, PeerReadData *rd, int fd) { - size_t end = 0; - for (size_t i = 0; i < rd->used - 1; i++) { - if (rd->data[i] == 0x1b && rd->data[i+1] == '\\') { - end = i + 2; - break; +static inline bool +has_complete_peer_command(Peer *peer) { + peer->read.command_end = 0; + if (peer->read.used > sizeof(KITTY_CMD_PREFIX) && memcmp(peer->read.data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) { + for (size_t i = sizeof(KITTY_CMD_PREFIX)-1; i < peer->read.used - 1; i++) { + if (peer->read.data[i] == 0x1b && peer->read.data[i+1] == '\\') { + peer->read.command_end = i + 2; + break; + } } } - if (!end) return; - char *buf = malloc(end + 8); - if (buf) { - memcpy(buf, rd->data, end); - queue_peer_message(self, buf, end, fd); - } - rd->is_peer_command = false; - if (rd->used > end) { - rd->used -= end; - memmove(rd->data, rd->data + end, rd->used); - if (rd->used >= sizeof(KITTY_CMD_PREFIX) - 1 && memcmp(rd->data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) rd->is_peer_command = true; - } else rd->used = 0; + return peer->read.command_end ? true : false; } -static inline bool -read_from_peer(ChildMonitor *self, int s) { - bool read_finished = false; - for (size_t i = 0; i < talk_data.num_reads; i++) { - PeerReadData *rd = talk_data.reads + i; -#define failed(msg) { read_finished = true; log_error("%s", msg); rd->finished = true; rd->close_socket = true; break; } - if (rd->fd == s) { - if (rd->used >= rd->capacity) { - if (rd->capacity >= 64 * 1024) failed("Ignoring too large message from peer"); - rd->capacity = MAX(8192u, rd->capacity * 2); - rd->data = realloc(rd->data, rd->capacity); - if (!rd->data) failed("Out of memory"); - } - ssize_t n = recv(s, rd->data + rd->used, rd->capacity - rd->used, 0); - if (n == 0) { - while (rd->is_peer_command) dispatch_peer_command(self, rd, s); - read_finished = true; rd->finished = true; - if (rd->used) queue_peer_message(self, rd->data, rd->used, s); - else free(rd->data); - rd->data = NULL; - } else if (n < 0) { - if (errno != EINTR) { - perror("Error reading from talk peer"); - failed("Closing connection to peer"); - } - } else { - if (!rd->used && memcmp(rd->data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) rd->is_peer_command = true; - rd->used += n; - while (rd->is_peer_command) dispatch_peer_command(self, rd, s); - } - break; - } + +static inline void +dispatch_peer_command(ChildMonitor *self, Peer *peer) { + if (peer->read.command_end) { + size_t used = peer->read.used; + peer->read.used = peer->read.command_end; + queue_peer_message(self, peer); + peer->read.used = used; + if (peer->read.used > peer->read.command_end) { + peer->read.used -= peer->read.command_end; + memmove(peer->read.data, peer->read.data + peer->read.command_end, peer->read.used); + } else peer->read.used = 0; + peer->read.command_end = 0; + } +} + +static inline void +read_from_peer(ChildMonitor *self, Peer *peer) { +#define failed(msg) { log_error("Reading from peer failed: %s", msg); shutdown(peer->fd, SHUT_RD); peer->read.finished = true; return; } + if (peer->read.used >= peer->read.capacity) { + if (peer->read.capacity >= 64 * 1024) failed("Ignoring too large message from peer"); + peer->read.capacity = MAX(8192u, peer->read.capacity * 2); + peer->read.data = realloc(peer->read.data, peer->read.capacity); + if (!peer->read.data) failed("Out of memory"); + } + ssize_t n = recv(peer->fd, peer->read.data + peer->read.used, peer->read.capacity - peer->read.used, 0); + if (n == 0) { + peer->read.finished = true; + shutdown(peer->fd, SHUT_RD); + while (has_complete_peer_command(peer)) dispatch_peer_command(self, peer); + queue_peer_message(self, peer); + free(peer->read.data); peer->read.data = NULL; + peer->read.used = 0; peer->read.capacity = 0; + } else if (n < 0) { + if (errno != EINTR) failed(strerror(errno)); + } else { + peer->read.used += n; + while (has_complete_peer_command(peer)) dispatch_peer_command(self, peer); } #undef failed - return read_finished; -} - -static inline bool -write_to_peer(int fd) { - bool write_finished = false; - for (size_t i = 0; i < talk_data.num_writes; i++) { - PeerWriteData *wd = talk_data.writes + i; -#define failed(msg) { write_finished = true; log_error("%s", msg); wd->finished = true; break; } - if (wd->fd == fd) { - ssize_t n = send(fd, wd->data + wd->pos, wd->sz - wd->pos, MSG_NOSIGNAL); - if (n == 0) { failed("send() to peer failed to send any data"); } - else if (n < 0) { - if (errno != EINTR) { perror("write() to peer socket failed with error"); failed(""); } - } else { - wd->pos += n; - if (wd->pos >= wd->sz) { write_finished = true; wd->finished = true; } - } - break; - } - - } -#undef failed - return write_finished; } static inline void -remove_poll_fd(int fd) { - size_t count = talk_data.num_talk_fds + talk_data.num_listen_fds; - for (size_t i = talk_data.num_listen_fds; i < count; i++) { - struct pollfd *pfd = talk_data.fds + i; - if (pfd->fd == fd) { - size_t num_to_right = count - 1 - i; - if (num_to_right) memmove(talk_data.fds + i, talk_data.fds + i + 1, num_to_right * sizeof(struct pollfd)); - talk_data.num_talk_fds--; - break; - } - } -} - -static inline void -prune_finished_reads(void) { - if (!talk_data.num_reads) return; - for (ssize_t i = talk_data.num_reads - 1; i >= 0; i--) { - PeerReadData *rd = talk_data.reads + i; - if (rd->finished) { - remove_poll_fd(rd->fd); - if (rd->close_socket) { nuke_socket(rd->fd); } - else shutdown(rd->fd, SHUT_RD); - free(rd->data); - ssize_t num_to_right = talk_data.num_reads - 1 - i; - if (num_to_right > 0) memmove(talk_data.reads + i, talk_data.reads + i + 1, num_to_right * sizeof(PeerReadData)); - else talk_data.reads[i] = empty_prd; - talk_data.num_reads--; - } - } -} - -static inline void -prune_finished_writes(void) { - if (!talk_data.num_writes) return; - for (ssize_t i = talk_data.num_writes - 1; i >= 0; i--) { - PeerWriteData *wd = talk_data.writes + i; - if (wd->finished) { - remove_poll_fd(wd->fd); - shutdown(wd->fd, SHUT_WR); safe_close(wd->fd, __FILE__, __LINE__); - free(wd->data); - ssize_t num_to_right = talk_data.num_writes - 1 - i; - if (num_to_right > 0) memmove(talk_data.writes + i, talk_data.writes + i + 1, num_to_right * sizeof(PeerWriteData)); - else talk_data.writes[i] = empty_pwd; - talk_data.num_writes--; - } - } -} - -static inline void -prune_invalid_fd(int fd) { - log_error("Pruning invalid peer fd: %d", fd); - for (ssize_t i = talk_data.num_reads - 1; i >= 0; i--) { - PeerReadData *rd = talk_data.reads + i; - if (rd->fd == fd) { - rd->finished = true; - rd->close_socket = true; - return; - } - } - for (ssize_t i = talk_data.num_writes - 1; i >= 0; i--) { - PeerWriteData *wd = talk_data.writes + i; - if (wd->fd == fd) { - wd->finished = true; - return; - } +write_to_peer(Peer *peer) { + talk_mutex(lock); + ssize_t n = send(peer->fd, peer->write.data, peer->write.used, MSG_NOSIGNAL); + if (n == 0) { log_error("send() to peer failed to send any data"); peer->write.used = 0; peer->write.failed = true; } + else if (n < 0) { + if (errno != EINTR) { log_error("write() to peer socket failed with error: %s", strerror(errno)); peer->write.used = 0; peer->write.failed = true; } + } else { + if ((size_t)n > peer->write.used) memmove(peer->write.data, peer->write.data + n, peer->write.used - n); + peer->write.used -= n; } + talk_mutex(unlock); } static void @@ -1501,95 +1441,105 @@ wakeup_talk_loop(bool in_signal_handler) { if (talk_thread_started) wakeup_loop(&talk_data.loop_data, in_signal_handler, "talk_loop"); } + static inline void -move_queued_writes(void) { - while (talk_data.num_queued_writes) { - PeerWriteData *src = talk_data.queued_writes + --talk_data.num_queued_writes; - size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds; - if (fd_idx < PEER_LIMIT && talk_data.num_writes < PEER_LIMIT) { - ensure_space_for(&talk_data, fds, PollFD, fd_idx + 1, fds_capacity, 8, false); - talk_data.fds[fd_idx].fd = src->fd; talk_data.fds[fd_idx].events = POLLOUT; - ensure_space_for(&talk_data, writes, PeerWriteData, talk_data.num_writes + 1, writes_capacity, 8, false); - talk_data.writes[talk_data.num_writes++] = *src; - talk_data.num_talk_fds++; - } else { - log_error("Cannot send response to peer, too many peers"); - free(src->data); nuke_socket(src->fd); +prune_peers(void) { + for (size_t i = 0; i < talk_data.num_peers; i++) { + size_t idx = talk_data.num_peers - 1 - i; + Peer *p = talk_data.peers + idx; + if (p->read.finished && !p->num_of_unresponded_messages_sent_to_main_thread && !p->write.used) { + free_peer(p); + remove_i_from_array(talk_data.peers, idx, talk_data.num_peers); } - *src = empty_pwd; } } static void* talk_loop(void *data) { // The talk thread loop - ChildMonitor *self = (ChildMonitor*)data; set_thread_name("KittyPeerMon"); - if ((pthread_mutex_init(&talk_data.peer_lock, NULL)) != 0) { perror("Failed to create peer mutex"); return 0; } if (!init_loop_data(&talk_data.loop_data)) { log_error("Failed to create wakeup fd for talk thread with error: %s", strerror(errno)); } - ensure_space_for(&talk_data, fds, PollFD, 8, fds_capacity, 8, false); + PollFD fds[PEER_LIMIT + 8] = {0}; + size_t num_listen_fds = 0, num_peer_fds = 0; #define add_listener(which) \ if (self->which > -1) { \ - talk_data.fds[talk_data.num_listen_fds].fd = self->which; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; \ + fds[num_listen_fds].fd = self->which; fds[num_listen_fds++].events = POLLIN; \ } add_listener(talk_fd); add_listener(listen_fd); #undef add_listener - talk_data.fds[talk_data.num_listen_fds].fd = talk_data.loop_data.wakeup_read_fd; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; + fds[num_listen_fds].fd = talk_data.loop_data.wakeup_read_fd; fds[num_listen_fds++].events = POLLIN; while (LIKELY(!self->shutting_down)) { - for (size_t i = 0; i < talk_data.num_listen_fds + talk_data.num_talk_fds; i++) { talk_data.fds[i].revents = 0; } - int ret = poll(talk_data.fds, talk_data.num_listen_fds + talk_data.num_talk_fds, -1); + num_peer_fds = 0; + if (talk_data.num_peers > 0) { + talk_mutex(unlock); + prune_peers(); + for (size_t i = 0; i < talk_data.num_peers; i++) { + Peer *p = talk_data.peers + i; + if (!p->read.finished || p->write.used) { + fds[num_listen_fds + num_peer_fds].fd = p->fd; + fds[num_listen_fds + num_peer_fds].revents = 0; + int flags = 0; + if (!p->read.finished) flags |= POLLIN; + if (p->write.used) flags |= POLLOUT; + fds[num_listen_fds + num_peer_fds++].events = flags; + } + } + talk_mutex(unlock); + } + for (size_t i = 0; i < num_listen_fds; i++) fds[i].revents = 0; + int ret = poll(fds, num_listen_fds + num_peer_fds, -1); if (ret > 0) { - bool has_finished_reads = false, has_finished_writes = false; - int invalid_fd = -1; - for (size_t i = 0; i < talk_data.num_listen_fds - 1; i++) { - if (talk_data.fds[i].revents & POLLIN) {if (!accept_peer(talk_data.fds[i].fd, self->shutting_down)) goto end; } + for (size_t i = 0; i < num_listen_fds - 1; i++) { + if (fds[i].revents & POLLIN) { + if (!accept_peer(fds[i].fd, self->shutting_down)) goto end; + } } - if (talk_data.fds[talk_data.num_listen_fds - 1].revents & POLLIN) drain_fd(talk_data.fds[talk_data.num_listen_fds - 1].fd); // wakeup - for (size_t i = talk_data.num_listen_fds; i < talk_data.num_talk_fds + talk_data.num_listen_fds; i++) { - if (talk_data.fds[i].revents & (POLLIN | POLLHUP)) { if (read_from_peer(self, talk_data.fds[i].fd)) has_finished_reads = true; } - if (talk_data.fds[i].revents & POLLOUT) { if (write_to_peer(talk_data.fds[i].fd)) has_finished_writes = true; } - if (talk_data.fds[i].revents & POLLNVAL) invalid_fd = talk_data.fds[i].fd; + if (fds[num_listen_fds - 1].revents & POLLIN) { + drain_fd(fds[num_listen_fds - 1].fd); // wakeup + } + for (size_t i = num_listen_fds, k = 0; i < num_peer_fds + num_listen_fds; i++, k++) { + Peer *p = talk_data.peers + k; + if (fds[i].revents & (POLLIN | POLLHUP)) read_from_peer(self, p); + if (fds[i].revents & POLLOUT) write_to_peer(p); + if (fds[i].revents & POLLNVAL) { + p->read.finished = true; + p->write.failed = true; p->write.used = 0; + } } - if (invalid_fd > -1) { prune_invalid_fd(invalid_fd); has_finished_reads = true; has_finished_writes = true; } - if (has_finished_reads) prune_finished_reads(); - if (has_finished_writes) prune_finished_writes(); - peer_mutex(lock); - if (talk_data.num_queued_writes) move_queued_writes(); - peer_mutex(unlock); } else if (ret < 0) { if (errno != EAGAIN && errno != EINTR) perror("poll() on talk fds failed"); } } end: free_loop_data(&talk_data.loop_data); - free(talk_data.fds); free(talk_data.reads); free(talk_data.writes); free(talk_data.queued_writes); + for (size_t i = 0; i < talk_data.num_peers; i++) free_peer(talk_data.peers + i); + free(talk_data.peers); return 0; } -static inline bool -add_peer_writer(int fd, const char* msg, size_t msg_sz) { - bool ok = false; - peer_mutex(lock); - if (talk_data.num_queued_writes < PEER_LIMIT) { - ensure_space_for(&talk_data, queued_writes, PeerWriteData, talk_data.num_queued_writes + 1, queued_writes_capacity, 8, false); - talk_data.queued_writes[talk_data.num_queued_writes] = empty_pwd; - talk_data.queued_writes[talk_data.num_queued_writes].data = malloc(msg_sz); - if (talk_data.queued_writes[talk_data.num_queued_writes].data) { - memcpy(talk_data.queued_writes[talk_data.num_queued_writes].data, msg, msg_sz); - talk_data.queued_writes[talk_data.num_queued_writes].sz = msg_sz; - talk_data.queued_writes[talk_data.num_queued_writes++].fd = fd; - ok = true; - } - } else log_error("Cannot send response to peer, too many peers"); - peer_mutex(unlock); - return ok; -} - static void -send_response(int fd, const char *msg, size_t msg_sz) { - if (msg == NULL) { shutdown(fd, SHUT_WR); safe_close(fd, __FILE__, __LINE__); return; } - if (!add_peer_writer(fd, msg, msg_sz)) { shutdown(fd, SHUT_WR); safe_close(fd, __FILE__, __LINE__); } - else wakeup_talk_loop(false); +send_response(id_type peer_id, const char *msg, size_t msg_sz) { + talk_mutex(lock); + for (size_t i = 0; i < talk_data.num_peers; i++) { + Peer *peer = talk_data.peers + i; + if (peer->id == peer_id) { + if (peer->num_of_unresponded_messages_sent_to_main_thread) peer->num_of_unresponded_messages_sent_to_main_thread--; + if (!peer->write.failed) { + if (peer->write.capacity - peer->write.used < msg_sz) { + void *data = realloc(peer->write.data, peer->write.capacity + msg_sz); + if (data) { + peer->write.data = data; + peer->write.capacity += msg_sz; + } else fatal("Out of memory"); + } + memcpy(peer->write.data + peer->write.used, msg, msg_sz); + peer->write.used += msg_sz; + } + wakeup_talk_loop(false); + break; + } + } + talk_mutex(unlock); } // }}}