Rewrite the talk loop for interaction with peers

The new code is simpler and hopefully more robust. Supports
multiple requests/responses. Fixes #2852
This commit is contained in:
Kovid Goyal 2020-07-15 14:31:45 +05:30
parent c0ff39c59d
commit cec4371b51
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 212 additions and 259 deletions

View File

@ -15,6 +15,9 @@ To update |kitty|, :doc:`follow the instructions <binary>`.
- Improve anti-aliasing of triangular box drawing characters, noticeable on
low-resolution screens (:iss:`2844`)
- Fix ``kitty @ send-text`` not working reliably when using a docket for remote
control (:iss:`2852`)
- Implement support for box drawing rounded-corners characters (:iss:`2240`)
- When a character from the Unicode Dingbat block is followed by a space, use

View File

@ -40,7 +40,7 @@ static void (*parse_func)(Screen*, PyObject*, monotonic_t);
typedef struct {
char *data;
size_t sz;
int fd;
id_type peer_id;
} Message;
typedef struct {
@ -71,9 +71,8 @@ static const Child EMPTY_CHILD = {0};
pthread_mutex_##op(&screen->which##_buf_lock);
#define children_mutex(op) \
pthread_mutex_##op(&children_lock);
#define peer_mutex(op) \
pthread_mutex_##op(&talk_data.peer_lock);
#define talk_mutex(op) \
pthread_mutex_##op(&talk_lock);
static Child children[MAX_CHILDREN] = {{0}};
@ -82,7 +81,7 @@ static Child add_queue[MAX_CHILDREN] = {{0}}, remove_queue[MAX_CHILDREN] = {{0}}
static unsigned long remove_notify[MAX_CHILDREN] = {0};
static size_t add_queue_count = 0, remove_queue_count = 0;
static struct pollfd fds[MAX_CHILDREN + EXTRA_FDS] = {{0}};
static pthread_mutex_t children_lock;
static pthread_mutex_t children_lock, talk_lock;
static bool kill_signal_received = false;
static ChildMonitor *the_monitor = NULL;
static uint8_t drain_buf[1024];
@ -131,6 +130,10 @@ new(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) {
PyErr_Format(PyExc_RuntimeError, "Failed to create children_lock mutex: %s", strerror(ret));
return NULL;
}
if ((ret = pthread_mutex_init(&talk_lock, NULL)) != 0) {
PyErr_Format(PyExc_RuntimeError, "Failed to create talk_lock mutex: %s", strerror(ret));
return NULL;
}
self = (ChildMonitor *)type->tp_alloc(type, 0);
if (!init_loop_data(&self->io_loop_data)) return PyErr_SetFromErrno(PyExc_OSError);
if (!install_signal_handlers(&self->io_loop_data)) return PyErr_SetFromErrno(PyExc_OSError);
@ -153,6 +156,7 @@ new(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) {
static void
dealloc(ChildMonitor* self) {
pthread_mutex_destroy(&children_lock);
pthread_mutex_destroy(&talk_lock);
Py_CLEAR(self->dump_callback);
Py_CLEAR(self->death_notify);
Py_TYPE(self)->tp_free((PyObject*)self);
@ -174,7 +178,7 @@ wakeup_io_loop(ChildMonitor *self, bool in_signal_handler) {
static void* io_loop(void *data);
static void* talk_loop(void *data);
static void send_response(int fd, const char *msg, size_t msg_sz);
static void send_response(id_type peer_id, const char *msg, size_t msg_sz);
static void wakeup_talk_loop(bool);
static bool talk_thread_started = false;
@ -324,7 +328,6 @@ parse_input(ChildMonitor *self) {
size_t count = 0, remove_count = 0;
bool input_read = false;
monotonic_t now = monotonic();
PyObject *msg = NULL;
children_mutex(lock);
while (remove_queue_count) {
remove_queue_count--;
@ -333,18 +336,6 @@ parse_input(ChildMonitor *self) {
FREE_CHILD(remove_queue[remove_queue_count]);
}
if (UNLIKELY(self->messages_count)) {
msg = PyTuple_New(self->messages_count);
if (msg) {
for (size_t i = 0; i < self->messages_count; i++) {
Message *m = self->messages + i;
PyTuple_SET_ITEM(msg, i, Py_BuildValue("y#i", m->data, (int)m->sz, m->fd));
free(m->data); m->data = NULL; m->sz = 0;
}
self->messages_count = 0;
} else fatal("Out of memory");
}
if (UNLIKELY(kill_signal_received)) {
global_state.quit_request = IMPERATIVE_CLOSE_REQUESTED;
global_state.has_pending_closes = true;
@ -358,15 +349,34 @@ parse_input(ChildMonitor *self) {
}
}
children_mutex(unlock);
if (msg) {
for (Py_ssize_t i = 0; i < PyTuple_GET_SIZE(msg); i++) {
PyObject *resp = PyObject_CallMethod(global_state.boss, "peer_message_received", "O", PyTuple_GET_ITEM(PyTuple_GET_ITEM(msg, i), 0));
int peer_fd = (int)PyLong_AsLong(PyTuple_GET_ITEM(PyTuple_GET_ITEM(msg, i), 1));
if (resp && PyBytes_Check(resp)) send_response(peer_fd, PyBytes_AS_STRING(resp), PyBytes_GET_SIZE(resp));
else { send_response(peer_fd, NULL, 0); if (!resp) PyErr_Print(); }
Message *msgs = NULL;
size_t msgs_count = 0;
talk_mutex(lock);
if (UNLIKELY(self->messages_count)) {
msgs = malloc(sizeof(Message) * self->messages_count);
if (msgs) {
memcpy(msgs, self->messages, sizeof(Message) * self->messages_count);
msgs_count = self->messages_count;
}
self->messages_count = 0;
}
talk_mutex(unlock);
if (msgs_count) {
for (size_t i = 0; i < msgs_count; i++) {
Message *msg = msgs + i;
PyObject *resp = NULL;
if (msg->data) {
resp = PyObject_CallMethod(global_state.boss, "peer_message_received", "y#", msg->data, (int)msg->sz);
free(msg->data);
if (!resp) PyErr_Print();
}
if (resp && PyBytes_Check(resp)) send_response(msg->peer_id, PyBytes_AS_STRING(resp), PyBytes_GET_SIZE(resp));
else send_response(msg->peer_id, NULL, 0);
Py_CLEAR(resp);
}
Py_CLEAR(msg);
free(msgs); msgs = NULL;
}
while(remove_count) {
@ -1279,33 +1289,30 @@ io_loop(void *data) {
// {{{ Talk thread functions
typedef struct {
char *data;
size_t capacity, used;
id_type id;
size_t num_of_unresponded_messages_sent_to_main_thread;
bool finished_reading;
int fd;
bool finished, close_socket, is_peer_command;
} PeerReadData;
static PeerReadData empty_prd = {.fd = -1, 0};
struct {
char *data;
size_t capacity, used, command_end;
bool finished;
} read;
struct {
char *data;
size_t capacity, used;
bool failed;
} write;
} Peer;
static id_type peer_id_counter = 0;
typedef struct {
char *data;
size_t sz, pos;
int fd;
bool finished;
} PeerWriteData;
static PeerWriteData empty_pwd = {.fd = -1, 0};
typedef struct {
size_t num_listen_fds, num_talk_fds, num_reads, num_writes, num_queued_writes;
size_t fds_capacity, reads_capacity, writes_capacity, queued_writes_capacity;
struct pollfd *fds;
PeerReadData *reads;
PeerWriteData *writes;
PeerWriteData *queued_writes;
size_t num_peers, peers_capacity;
Peer *peers;
LoopData loop_data;
pthread_mutex_t peer_lock;
} TalkData;
static TalkData talk_data = {0};
typedef struct pollfd PollFD;
#define PEER_LIMIT 256
#define nuke_socket(s) { shutdown(s, SHUT_RDWR); safe_close(s, __FILE__, __LINE__); }
@ -1318,13 +1325,12 @@ accept_peer(int listen_fd, bool shutting_down) {
if (!shutting_down) perror("accept() on talk socket failed!");
return false;
}
size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds;
if (fd_idx < PEER_LIMIT && talk_data.reads_capacity < PEER_LIMIT) {
ensure_space_for(&talk_data, fds, PollFD, fd_idx + 1, fds_capacity, 8, false);
talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN;
ensure_space_for(&talk_data, reads, PeerReadData, talk_data.num_reads + 1, reads_capacity, 8, false);
talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer;
talk_data.num_talk_fds++;
if (talk_data.num_peers < PEER_LIMIT) {
ensure_space_for(&talk_data, peers, Peer, talk_data.num_peers + 8, peers_capacity, 8, false);
Peer *p = talk_data.peers + talk_data.num_peers++;
memset(p, 0, sizeof(Peer));
p->fd = peer; p->id = ++peer_id_counter;
if (!p->id) p->id = ++peer_id_counter;
} else {
log_error("Too many peers want to talk, ignoring one.");
nuke_socket(peer);
@ -1332,168 +1338,102 @@ accept_peer(int listen_fd, bool shutting_down) {
return true;
}
static inline void
free_peer(Peer *peer) {
free(peer->read.data); peer->read.data = NULL;
free(peer->write.data); peer->write.data = NULL;
if (peer->fd > -1) { nuke_socket(peer->fd); peer->fd = -1; }
}
#define KITTY_CMD_PREFIX "\x1bP@kitty-cmd{"
static inline void
queue_peer_message(ChildMonitor *self, char *buf, size_t sz, int fd) {
children_mutex(lock);
ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, true);
queue_peer_message(ChildMonitor *self, Peer *peer) {
talk_mutex(lock);
ensure_space_for(self, messages, Message, self->messages_count + 16, messages_capacity, 16, true);
Message *m = self->messages + self->messages_count++;
m->data = buf; m->sz = sz; m->fd = fd;
children_mutex(unlock);
memset(m, 0, sizeof(Message));
if (peer->read.used) {
m->data = malloc(peer->read.used);
if (m->data) {
memcpy(m->data, peer->read.data, peer->read.used);
m->sz = peer->read.used;
}
}
m->peer_id = peer->id;
peer->num_of_unresponded_messages_sent_to_main_thread++;
talk_mutex(unlock);
wakeup_main_loop();
}
static inline void
dispatch_peer_command(ChildMonitor *self, PeerReadData *rd, int fd) {
size_t end = 0;
for (size_t i = 0; i < rd->used - 1; i++) {
if (rd->data[i] == 0x1b && rd->data[i+1] == '\\') {
end = i + 2;
break;
static inline bool
has_complete_peer_command(Peer *peer) {
peer->read.command_end = 0;
if (peer->read.used > sizeof(KITTY_CMD_PREFIX) && memcmp(peer->read.data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) {
for (size_t i = sizeof(KITTY_CMD_PREFIX)-1; i < peer->read.used - 1; i++) {
if (peer->read.data[i] == 0x1b && peer->read.data[i+1] == '\\') {
peer->read.command_end = i + 2;
break;
}
}
}
if (!end) return;
char *buf = malloc(end + 8);
if (buf) {
memcpy(buf, rd->data, end);
queue_peer_message(self, buf, end, fd);
}
rd->is_peer_command = false;
if (rd->used > end) {
rd->used -= end;
memmove(rd->data, rd->data + end, rd->used);
if (rd->used >= sizeof(KITTY_CMD_PREFIX) - 1 && memcmp(rd->data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) rd->is_peer_command = true;
} else rd->used = 0;
return peer->read.command_end ? true : false;
}
static inline bool
read_from_peer(ChildMonitor *self, int s) {
bool read_finished = false;
for (size_t i = 0; i < talk_data.num_reads; i++) {
PeerReadData *rd = talk_data.reads + i;
#define failed(msg) { read_finished = true; log_error("%s", msg); rd->finished = true; rd->close_socket = true; break; }
if (rd->fd == s) {
if (rd->used >= rd->capacity) {
if (rd->capacity >= 64 * 1024) failed("Ignoring too large message from peer");
rd->capacity = MAX(8192u, rd->capacity * 2);
rd->data = realloc(rd->data, rd->capacity);
if (!rd->data) failed("Out of memory");
}
ssize_t n = recv(s, rd->data + rd->used, rd->capacity - rd->used, 0);
if (n == 0) {
while (rd->is_peer_command) dispatch_peer_command(self, rd, s);
read_finished = true; rd->finished = true;
if (rd->used) queue_peer_message(self, rd->data, rd->used, s);
else free(rd->data);
rd->data = NULL;
} else if (n < 0) {
if (errno != EINTR) {
perror("Error reading from talk peer");
failed("Closing connection to peer");
}
} else {
if (!rd->used && memcmp(rd->data, KITTY_CMD_PREFIX, sizeof(KITTY_CMD_PREFIX)-1) == 0) rd->is_peer_command = true;
rd->used += n;
while (rd->is_peer_command) dispatch_peer_command(self, rd, s);
}
break;
}
static inline void
dispatch_peer_command(ChildMonitor *self, Peer *peer) {
if (peer->read.command_end) {
size_t used = peer->read.used;
peer->read.used = peer->read.command_end;
queue_peer_message(self, peer);
peer->read.used = used;
if (peer->read.used > peer->read.command_end) {
peer->read.used -= peer->read.command_end;
memmove(peer->read.data, peer->read.data + peer->read.command_end, peer->read.used);
} else peer->read.used = 0;
peer->read.command_end = 0;
}
}
static inline void
read_from_peer(ChildMonitor *self, Peer *peer) {
#define failed(msg) { log_error("Reading from peer failed: %s", msg); shutdown(peer->fd, SHUT_RD); peer->read.finished = true; return; }
if (peer->read.used >= peer->read.capacity) {
if (peer->read.capacity >= 64 * 1024) failed("Ignoring too large message from peer");
peer->read.capacity = MAX(8192u, peer->read.capacity * 2);
peer->read.data = realloc(peer->read.data, peer->read.capacity);
if (!peer->read.data) failed("Out of memory");
}
ssize_t n = recv(peer->fd, peer->read.data + peer->read.used, peer->read.capacity - peer->read.used, 0);
if (n == 0) {
peer->read.finished = true;
shutdown(peer->fd, SHUT_RD);
while (has_complete_peer_command(peer)) dispatch_peer_command(self, peer);
queue_peer_message(self, peer);
free(peer->read.data); peer->read.data = NULL;
peer->read.used = 0; peer->read.capacity = 0;
} else if (n < 0) {
if (errno != EINTR) failed(strerror(errno));
} else {
peer->read.used += n;
while (has_complete_peer_command(peer)) dispatch_peer_command(self, peer);
}
#undef failed
return read_finished;
}
static inline bool
write_to_peer(int fd) {
bool write_finished = false;
for (size_t i = 0; i < talk_data.num_writes; i++) {
PeerWriteData *wd = talk_data.writes + i;
#define failed(msg) { write_finished = true; log_error("%s", msg); wd->finished = true; break; }
if (wd->fd == fd) {
ssize_t n = send(fd, wd->data + wd->pos, wd->sz - wd->pos, MSG_NOSIGNAL);
if (n == 0) { failed("send() to peer failed to send any data"); }
else if (n < 0) {
if (errno != EINTR) { perror("write() to peer socket failed with error"); failed(""); }
} else {
wd->pos += n;
if (wd->pos >= wd->sz) { write_finished = true; wd->finished = true; }
}
break;
}
}
#undef failed
return write_finished;
}
static inline void
remove_poll_fd(int fd) {
size_t count = talk_data.num_talk_fds + talk_data.num_listen_fds;
for (size_t i = talk_data.num_listen_fds; i < count; i++) {
struct pollfd *pfd = talk_data.fds + i;
if (pfd->fd == fd) {
size_t num_to_right = count - 1 - i;
if (num_to_right) memmove(talk_data.fds + i, talk_data.fds + i + 1, num_to_right * sizeof(struct pollfd));
talk_data.num_talk_fds--;
break;
}
}
}
static inline void
prune_finished_reads(void) {
if (!talk_data.num_reads) return;
for (ssize_t i = talk_data.num_reads - 1; i >= 0; i--) {
PeerReadData *rd = talk_data.reads + i;
if (rd->finished) {
remove_poll_fd(rd->fd);
if (rd->close_socket) { nuke_socket(rd->fd); }
else shutdown(rd->fd, SHUT_RD);
free(rd->data);
ssize_t num_to_right = talk_data.num_reads - 1 - i;
if (num_to_right > 0) memmove(talk_data.reads + i, talk_data.reads + i + 1, num_to_right * sizeof(PeerReadData));
else talk_data.reads[i] = empty_prd;
talk_data.num_reads--;
}
}
}
static inline void
prune_finished_writes(void) {
if (!talk_data.num_writes) return;
for (ssize_t i = talk_data.num_writes - 1; i >= 0; i--) {
PeerWriteData *wd = talk_data.writes + i;
if (wd->finished) {
remove_poll_fd(wd->fd);
shutdown(wd->fd, SHUT_WR); safe_close(wd->fd, __FILE__, __LINE__);
free(wd->data);
ssize_t num_to_right = talk_data.num_writes - 1 - i;
if (num_to_right > 0) memmove(talk_data.writes + i, talk_data.writes + i + 1, num_to_right * sizeof(PeerWriteData));
else talk_data.writes[i] = empty_pwd;
talk_data.num_writes--;
}
}
}
static inline void
prune_invalid_fd(int fd) {
log_error("Pruning invalid peer fd: %d", fd);
for (ssize_t i = talk_data.num_reads - 1; i >= 0; i--) {
PeerReadData *rd = talk_data.reads + i;
if (rd->fd == fd) {
rd->finished = true;
rd->close_socket = true;
return;
}
}
for (ssize_t i = talk_data.num_writes - 1; i >= 0; i--) {
PeerWriteData *wd = talk_data.writes + i;
if (wd->fd == fd) {
wd->finished = true;
return;
}
write_to_peer(Peer *peer) {
talk_mutex(lock);
ssize_t n = send(peer->fd, peer->write.data, peer->write.used, MSG_NOSIGNAL);
if (n == 0) { log_error("send() to peer failed to send any data"); peer->write.used = 0; peer->write.failed = true; }
else if (n < 0) {
if (errno != EINTR) { log_error("write() to peer socket failed with error: %s", strerror(errno)); peer->write.used = 0; peer->write.failed = true; }
} else {
if ((size_t)n > peer->write.used) memmove(peer->write.data, peer->write.data + n, peer->write.used - n);
peer->write.used -= n;
}
talk_mutex(unlock);
}
static void
@ -1501,95 +1441,105 @@ wakeup_talk_loop(bool in_signal_handler) {
if (talk_thread_started) wakeup_loop(&talk_data.loop_data, in_signal_handler, "talk_loop");
}
static inline void
move_queued_writes(void) {
while (talk_data.num_queued_writes) {
PeerWriteData *src = talk_data.queued_writes + --talk_data.num_queued_writes;
size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds;
if (fd_idx < PEER_LIMIT && talk_data.num_writes < PEER_LIMIT) {
ensure_space_for(&talk_data, fds, PollFD, fd_idx + 1, fds_capacity, 8, false);
talk_data.fds[fd_idx].fd = src->fd; talk_data.fds[fd_idx].events = POLLOUT;
ensure_space_for(&talk_data, writes, PeerWriteData, talk_data.num_writes + 1, writes_capacity, 8, false);
talk_data.writes[talk_data.num_writes++] = *src;
talk_data.num_talk_fds++;
} else {
log_error("Cannot send response to peer, too many peers");
free(src->data); nuke_socket(src->fd);
prune_peers(void) {
for (size_t i = 0; i < talk_data.num_peers; i++) {
size_t idx = talk_data.num_peers - 1 - i;
Peer *p = talk_data.peers + idx;
if (p->read.finished && !p->num_of_unresponded_messages_sent_to_main_thread && !p->write.used) {
free_peer(p);
remove_i_from_array(talk_data.peers, idx, talk_data.num_peers);
}
*src = empty_pwd;
}
}
static void*
talk_loop(void *data) {
// The talk thread loop
ChildMonitor *self = (ChildMonitor*)data;
set_thread_name("KittyPeerMon");
if ((pthread_mutex_init(&talk_data.peer_lock, NULL)) != 0) { perror("Failed to create peer mutex"); return 0; }
if (!init_loop_data(&talk_data.loop_data)) { log_error("Failed to create wakeup fd for talk thread with error: %s", strerror(errno)); }
ensure_space_for(&talk_data, fds, PollFD, 8, fds_capacity, 8, false);
PollFD fds[PEER_LIMIT + 8] = {0};
size_t num_listen_fds = 0, num_peer_fds = 0;
#define add_listener(which) \
if (self->which > -1) { \
talk_data.fds[talk_data.num_listen_fds].fd = self->which; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; \
fds[num_listen_fds].fd = self->which; fds[num_listen_fds++].events = POLLIN; \
}
add_listener(talk_fd); add_listener(listen_fd);
#undef add_listener
talk_data.fds[talk_data.num_listen_fds].fd = talk_data.loop_data.wakeup_read_fd; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN;
fds[num_listen_fds].fd = talk_data.loop_data.wakeup_read_fd; fds[num_listen_fds++].events = POLLIN;
while (LIKELY(!self->shutting_down)) {
for (size_t i = 0; i < talk_data.num_listen_fds + talk_data.num_talk_fds; i++) { talk_data.fds[i].revents = 0; }
int ret = poll(talk_data.fds, talk_data.num_listen_fds + talk_data.num_talk_fds, -1);
num_peer_fds = 0;
if (talk_data.num_peers > 0) {
talk_mutex(unlock);
prune_peers();
for (size_t i = 0; i < talk_data.num_peers; i++) {
Peer *p = talk_data.peers + i;
if (!p->read.finished || p->write.used) {
fds[num_listen_fds + num_peer_fds].fd = p->fd;
fds[num_listen_fds + num_peer_fds].revents = 0;
int flags = 0;
if (!p->read.finished) flags |= POLLIN;
if (p->write.used) flags |= POLLOUT;
fds[num_listen_fds + num_peer_fds++].events = flags;
}
}
talk_mutex(unlock);
}
for (size_t i = 0; i < num_listen_fds; i++) fds[i].revents = 0;
int ret = poll(fds, num_listen_fds + num_peer_fds, -1);
if (ret > 0) {
bool has_finished_reads = false, has_finished_writes = false;
int invalid_fd = -1;
for (size_t i = 0; i < talk_data.num_listen_fds - 1; i++) {
if (talk_data.fds[i].revents & POLLIN) {if (!accept_peer(talk_data.fds[i].fd, self->shutting_down)) goto end; }
for (size_t i = 0; i < num_listen_fds - 1; i++) {
if (fds[i].revents & POLLIN) {
if (!accept_peer(fds[i].fd, self->shutting_down)) goto end;
}
}
if (talk_data.fds[talk_data.num_listen_fds - 1].revents & POLLIN) drain_fd(talk_data.fds[talk_data.num_listen_fds - 1].fd); // wakeup
for (size_t i = talk_data.num_listen_fds; i < talk_data.num_talk_fds + talk_data.num_listen_fds; i++) {
if (talk_data.fds[i].revents & (POLLIN | POLLHUP)) { if (read_from_peer(self, talk_data.fds[i].fd)) has_finished_reads = true; }
if (talk_data.fds[i].revents & POLLOUT) { if (write_to_peer(talk_data.fds[i].fd)) has_finished_writes = true; }
if (talk_data.fds[i].revents & POLLNVAL) invalid_fd = talk_data.fds[i].fd;
if (fds[num_listen_fds - 1].revents & POLLIN) {
drain_fd(fds[num_listen_fds - 1].fd); // wakeup
}
for (size_t i = num_listen_fds, k = 0; i < num_peer_fds + num_listen_fds; i++, k++) {
Peer *p = talk_data.peers + k;
if (fds[i].revents & (POLLIN | POLLHUP)) read_from_peer(self, p);
if (fds[i].revents & POLLOUT) write_to_peer(p);
if (fds[i].revents & POLLNVAL) {
p->read.finished = true;
p->write.failed = true; p->write.used = 0;
}
}
if (invalid_fd > -1) { prune_invalid_fd(invalid_fd); has_finished_reads = true; has_finished_writes = true; }
if (has_finished_reads) prune_finished_reads();
if (has_finished_writes) prune_finished_writes();
peer_mutex(lock);
if (talk_data.num_queued_writes) move_queued_writes();
peer_mutex(unlock);
} else if (ret < 0) { if (errno != EAGAIN && errno != EINTR) perror("poll() on talk fds failed"); }
}
end:
free_loop_data(&talk_data.loop_data);
free(talk_data.fds); free(talk_data.reads); free(talk_data.writes); free(talk_data.queued_writes);
for (size_t i = 0; i < talk_data.num_peers; i++) free_peer(talk_data.peers + i);
free(talk_data.peers);
return 0;
}
static inline bool
add_peer_writer(int fd, const char* msg, size_t msg_sz) {
bool ok = false;
peer_mutex(lock);
if (talk_data.num_queued_writes < PEER_LIMIT) {
ensure_space_for(&talk_data, queued_writes, PeerWriteData, talk_data.num_queued_writes + 1, queued_writes_capacity, 8, false);
talk_data.queued_writes[talk_data.num_queued_writes] = empty_pwd;
talk_data.queued_writes[talk_data.num_queued_writes].data = malloc(msg_sz);
if (talk_data.queued_writes[talk_data.num_queued_writes].data) {
memcpy(talk_data.queued_writes[talk_data.num_queued_writes].data, msg, msg_sz);
talk_data.queued_writes[talk_data.num_queued_writes].sz = msg_sz;
talk_data.queued_writes[talk_data.num_queued_writes++].fd = fd;
ok = true;
}
} else log_error("Cannot send response to peer, too many peers");
peer_mutex(unlock);
return ok;
}
static void
send_response(int fd, const char *msg, size_t msg_sz) {
if (msg == NULL) { shutdown(fd, SHUT_WR); safe_close(fd, __FILE__, __LINE__); return; }
if (!add_peer_writer(fd, msg, msg_sz)) { shutdown(fd, SHUT_WR); safe_close(fd, __FILE__, __LINE__); }
else wakeup_talk_loop(false);
send_response(id_type peer_id, const char *msg, size_t msg_sz) {
talk_mutex(lock);
for (size_t i = 0; i < talk_data.num_peers; i++) {
Peer *peer = talk_data.peers + i;
if (peer->id == peer_id) {
if (peer->num_of_unresponded_messages_sent_to_main_thread) peer->num_of_unresponded_messages_sent_to_main_thread--;
if (!peer->write.failed) {
if (peer->write.capacity - peer->write.used < msg_sz) {
void *data = realloc(peer->write.data, peer->write.capacity + msg_sz);
if (data) {
peer->write.data = data;
peer->write.capacity += msg_sz;
} else fatal("Out of memory");
}
memcpy(peer->write.data + peer->write.used, msg, msg_sz);
peer->write.used += msg_sz;
}
wakeup_talk_loop(false);
break;
}
}
talk_mutex(unlock);
}
// }}}