Code to handle messages from peers

This commit is contained in:
Kovid Goyal 2017-11-17 11:02:43 +05:30
parent c5d17934e4
commit 9e5882c9fb
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 100 additions and 21 deletions

View File

@ -265,7 +265,7 @@ class Boss:
def destroy(self):
self.shutting_down = True
self.child_monitor.shutdown()
self.child_monitor.shutdown_monitor()
wakeup()
self.child_monitor.join()
del self.child_monitor

View File

@ -24,12 +24,32 @@ extern int pthread_setname_np(const char *name);
#include <sys/ioctl.h>
#include <sys/wait.h>
#include <signal.h>
#include <sys/socket.h>
extern PyTypeObject Screen_Type;
#define EXTRA_FDS 2
static void (*parse_func)(Screen*, PyObject*);
typedef struct {
char *data;
size_t sz;
} Message;
typedef struct {
PyObject_HEAD
PyObject *dump_callback, *update_screen, *death_notify;
unsigned int count;
bool shutting_down;
pthread_t io_thread, talk_thread;
int talk_fd;
Message *messages;
size_t messages_capacity, messages_count;
} ChildMonitor;
typedef struct {
Screen *screen;
bool needs_removal;
@ -114,14 +134,16 @@ self_pipe(int fds[2]) {
return true;
}
static PyObject *
new(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) {
ChildMonitor *self;
PyObject *dump_callback, *death_notify;
int talk_fd = -1;
int ret;
if (the_monitor) { PyErr_SetString(PyExc_RuntimeError, "Can have only a single ChildMonitor instance"); return NULL; }
if (!PyArg_ParseTuple(args, "OO", &death_notify, &dump_callback)) return NULL;
if (!PyArg_ParseTuple(args, "OO|i", &death_notify, &dump_callback, &talk_fd)) return NULL;
if ((ret = pthread_mutex_init(&children_lock, NULL)) != 0) {
PyErr_Format(PyExc_RuntimeError, "Failed to create children_lock mutex: %s", strerror(ret));
return NULL;
@ -134,6 +156,7 @@ new(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) {
if (siginterrupt(SIGINT, false) != 0) return PyErr_SetFromErrno(PyExc_OSError);
if (siginterrupt(SIGTERM, false) != 0) return PyErr_SetFromErrno(PyExc_OSError);
self = (ChildMonitor *)type->tp_alloc(type, 0);
self->talk_fd = talk_fd;
if (self == NULL) return PyErr_NoMemory();
self->death_notify = death_notify; Py_INCREF(death_notify);
if (dump_callback != Py_None) {
@ -181,12 +204,17 @@ wakeup_io_loop() {
}
static void* io_loop(void *data);
static void* talk_loop(void *data);
static PyObject *
start(ChildMonitor *self) {
#define start_doc "start() -> Start the I/O thread"
if (self->talk_fd > -1) {
if (pthread_create(&self->talk_thread, NULL, talk_loop, self) != 0) return PyErr_SetFromErrno(PyExc_OSError);
}
int ret = pthread_create(&self->io_thread, NULL, io_loop, self);
if (ret != 0) return PyErr_SetFromErrno(PyExc_OSError);
Py_RETURN_NONE;
}
@ -195,6 +223,9 @@ join(ChildMonitor *self) {
#define join_doc "join() -> Wait for the I/O thread to finish"
int ret = pthread_join(self->io_thread, NULL);
if (ret != 0) return PyErr_SetFromErrno(PyExc_OSError);
if (self->talk_fd > -1) {
if (pthread_join(self->talk_thread, NULL) != 0) return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
@ -273,8 +304,8 @@ needs_write(ChildMonitor UNUSED *self, PyObject *args) {
}
static PyObject *
shutdown(ChildMonitor *self) {
#define shutdown_doc "shutdown() -> Shutdown the monitor loop."
shutdown_monitor(ChildMonitor *self) {
#define shutdown_monitor_doc "shutdown_monitor() -> Shutdown the monitor loop."
signal(SIGINT, SIG_DFL);
signal(SIGTERM, SIG_DFL);
self->shutting_down = true;
@ -309,6 +340,14 @@ parse_input(ChildMonitor *self) {
FREE_CHILD(remove_queue[remove_queue_count]);
}
if (UNLIKELY(self->messages_count)) {
while(self->messages_count--) {
Message *m = self->messages + self->messages_count;
call_boss(peer_msg_received, "y#", m->data, m->sz);
free(m->data);
}
}
if (UNLIKELY(signal_received)) {
global_state.close_all_windows = true;
} else {
@ -885,6 +924,62 @@ io_loop(void *data) {
}
// }}}
// {{{ Talk thread functions
static inline void
handle_peer(ChildMonitor *self, int s) {
size_t bufsz = 1024;
char *buf = NULL;
size_t buf_used = 0;
if (buf == NULL) return;
while(true) {
if (buf_used >= bufsz) {
bufsz *= 2;
if (bufsz > 1024 * 1024) return;
buf = realloc(buf, bufsz);
if (buf == NULL) return;
}
ssize_t n = recv(s, buf + buf_used, bufsz - buf_used, 0);
if (n == 0) break;
if (n < 0) {
if (errno == EINTR) continue;
perror("Error reading from talk peer");
break;
}
buf_used += n;
}
if (buf_used) {
children_mutex(lock);
ensure_space_for(self, messages, Message, self->messages_count + 1, messages_capacity, 16, false);
Message *m = self->messages + self->messages_count++;
m->data = buf; m->sz = buf_used;
children_mutex(unlock);
} else free(buf);
}
static void*
talk_loop(void *data) {
// The I/O thread loop
ChildMonitor *self = (ChildMonitor*)data;
set_thread_name("KittyTalkMon");
while (LIKELY(!self->shutting_down)) {
int peer = accept(self->talk_fd, NULL, NULL);
if (peer == -1) {
if (errno == EINTR) continue;
perror("accept() on talk socket failed!");
break;
}
handle_peer(self, peer);
shutdown(peer, SHUT_RDWR);
close(peer);
}
return 0;
}
// }}}
// Boilerplate {{{
static PyMethodDef methods[] = {
METHOD(add_child, METH_VARARGS)
@ -892,7 +987,7 @@ static PyMethodDef methods[] = {
METHOD(start, METH_NOARGS)
METHOD(join, METH_NOARGS)
METHOD(wakeup, METH_NOARGS)
METHOD(shutdown, METH_NOARGS)
METHOD(shutdown_monitor, METH_NOARGS)
METHOD(main_loop, METH_NOARGS)
METHOD(mark_for_close, METH_VARARGS)
METHOD(resize_pty, METH_VARARGS)

View File

@ -217,22 +217,6 @@ typedef struct {
#define PARSER_BUF_SZ (8 * 1024)
#define READ_BUF_SZ (1024*1024)
typedef struct {
double at;
PyObject *callback;
PyObject *args;
} TimerEvent;
typedef struct {
PyObject_HEAD
PyObject *dump_callback, *update_screen, *death_notify;
unsigned int count;
bool shutting_down;
pthread_t io_thread;
} ChildMonitor;
#define clear_sprite_position(cell) (cell).sprite_x = 0; (cell).sprite_y = 0; (cell).sprite_z = 0;
#define left_shift_line(line, at, num) \