Allocate talk data on the heap

Most users will talk to 0, 1, 2 peers simultaneously at the most,
so allocate memory for them dynamically.
This commit is contained in:
Kovid Goyal 2018-07-13 17:13:59 +05:30
parent 19a672caf2
commit fbb4db8a83
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -1065,8 +1065,6 @@ io_loop(void *data) {
// {{{ Talk thread functions // {{{ Talk thread functions
#define MAX_PEERS 256
typedef struct { typedef struct {
char *data; char *data;
size_t capacity, used; size_t capacity, used;
@ -1085,15 +1083,18 @@ static PeerWriteData empty_pwd = {.fd = -1, 0};
typedef struct { typedef struct {
size_t num_listen_fds, num_talk_fds, num_reads, num_writes, num_queued_writes; size_t num_listen_fds, num_talk_fds, num_reads, num_writes, num_queued_writes;
struct pollfd fds[MAX_PEERS + 1]; size_t fds_capacity, reads_capacity, writes_capacity, queued_writes_capacity;
PeerReadData reads[MAX_PEERS + 1]; struct pollfd *fds;
PeerWriteData writes[MAX_PEERS + 1]; PeerReadData *reads;
PeerWriteData queued_writes[MAX_PEERS + 1]; PeerWriteData *writes;
PeerWriteData *queued_writes;
int wakeup_fds[2]; int wakeup_fds[2];
pthread_mutex_t peer_lock; pthread_mutex_t peer_lock;
} TalkData; } TalkData;
static TalkData talk_data = {0}; static TalkData talk_data = {0};
typedef struct pollfd PollFD;
#define PEER_LIMIT 256
#define nuke_socket(s) { shutdown(s, SHUT_RDWR); close(s); } #define nuke_socket(s) { shutdown(s, SHUT_RDWR); close(s); }
static inline bool static inline bool
@ -1105,12 +1106,14 @@ accept_peer(int listen_fd, bool shutting_down) {
return false; return false;
} }
size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds; size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds;
if (fd_idx < arraysz(talk_data.fds) && talk_data.num_reads < arraysz(talk_data.reads)) { if (fd_idx < PEER_LIMIT && talk_data.reads_capacity < PEER_LIMIT) {
ensure_space_for(&talk_data, fds, PollFD, fd_idx + 1, fds_capacity, 8, false);
talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN; talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN;
ensure_space_for(&talk_data, reads, PeerReadData, talk_data.num_reads + 1, reads_capacity, 8, false);
talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer; talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer;
talk_data.num_talk_fds++; talk_data.num_talk_fds++;
} else { } else {
log_error("Too many peers want to talk, already connected to %lu ignoring one.", talk_data.num_talk_fds); log_error("Too many peers want to talk, ignoring one.");
nuke_socket(peer); nuke_socket(peer);
} }
return true; return true;
@ -1241,8 +1244,10 @@ move_queued_writes() {
while (talk_data.num_queued_writes) { while (talk_data.num_queued_writes) {
PeerWriteData *src = talk_data.queued_writes + --talk_data.num_queued_writes; PeerWriteData *src = talk_data.queued_writes + --talk_data.num_queued_writes;
size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds; size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds;
if (fd_idx < arraysz(talk_data.fds) && talk_data.num_writes < arraysz(talk_data.writes)) { if (fd_idx < PEER_LIMIT && talk_data.num_writes < PEER_LIMIT) {
ensure_space_for(&talk_data, fds, PollFD, fd_idx + 1, fds_capacity, 8, false);
talk_data.fds[fd_idx].fd = src->fd; talk_data.fds[fd_idx].events = POLLOUT; talk_data.fds[fd_idx].fd = src->fd; talk_data.fds[fd_idx].events = POLLOUT;
ensure_space_for(&talk_data, writes, PeerWriteData, talk_data.num_writes + 1, writes_capacity, 8, false);
talk_data.writes[talk_data.num_writes++] = *src; talk_data.writes[talk_data.num_writes++] = *src;
talk_data.num_talk_fds++; talk_data.num_talk_fds++;
} else { } else {
@ -1261,6 +1266,7 @@ talk_loop(void *data) {
set_thread_name("KittyPeerMon"); set_thread_name("KittyPeerMon");
if ((pthread_mutex_init(&talk_data.peer_lock, NULL)) != 0) { perror("Failed to create peer mutex"); return 0; } if ((pthread_mutex_init(&talk_data.peer_lock, NULL)) != 0) { perror("Failed to create peer mutex"); return 0; }
if (!self_pipe(talk_data.wakeup_fds)) { perror("Failed to create wakeup fds for talk thread"); return 0; } if (!self_pipe(talk_data.wakeup_fds)) { perror("Failed to create wakeup fds for talk thread"); return 0; }
ensure_space_for(&talk_data, fds, PollFD, 8, fds_capacity, 8, false);
#define add_listener(which) \ #define add_listener(which) \
if (self->which > -1) { \ if (self->which > -1) { \
talk_data.fds[talk_data.num_listen_fds].fd = self->which; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; \ talk_data.fds[talk_data.num_listen_fds].fd = self->which; talk_data.fds[talk_data.num_listen_fds++].events = POLLIN; \
@ -1291,6 +1297,7 @@ talk_loop(void *data) {
} }
end: end:
close(talk_data.wakeup_fds[0]); close(talk_data.wakeup_fds[1]); close(talk_data.wakeup_fds[0]); close(talk_data.wakeup_fds[1]);
free(talk_data.fds); free(talk_data.reads); free(talk_data.writes); free(talk_data.queued_writes);
return 0; return 0;
} }
@ -1298,7 +1305,8 @@ static inline bool
add_peer_writer(int fd, const char* msg, size_t msg_sz) { add_peer_writer(int fd, const char* msg, size_t msg_sz) {
bool ok = false; bool ok = false;
peer_mutex(lock); peer_mutex(lock);
if (talk_data.num_queued_writes < arraysz(talk_data.queued_writes)) { if (talk_data.num_queued_writes < PEER_LIMIT) {
ensure_space_for(&talk_data, queued_writes, PeerWriteData, talk_data.num_queued_writes + 1, queued_writes_capacity, 8, false);
talk_data.queued_writes[talk_data.num_queued_writes] = empty_pwd; talk_data.queued_writes[talk_data.num_queued_writes] = empty_pwd;
talk_data.queued_writes[talk_data.num_queued_writes].data = malloc(msg_sz); talk_data.queued_writes[talk_data.num_queued_writes].data = malloc(msg_sz);
if (talk_data.queued_writes[talk_data.num_queued_writes].data) { if (talk_data.queued_writes[talk_data.num_queued_writes].data) {