SocketSelector fixes. Set SO_REUSEPORT.

This commit is contained in:
Fred Nicolson 2018-11-28 13:21:50 +00:00
parent 87f8ee21b1
commit 8ea8eafdd8
3 changed files with 30 additions and 44 deletions

View File

@ -46,9 +46,10 @@ namespace fr
/*! /*!
* Removes a socket from the selector. * Removes a socket from the selector.
* Does nothing if the socket isn't a member.
* *
* @throws An std::exception on failure * @throws An std::exception if an internal EPOLL error occurs
* @param socket The socket to remove. Must not be disconnected. * @param socket The socket to remove. May have been disconnected.
* @return The opaque data passed to add(). Or nullptr if the socket wasn't found. * @return The opaque data passed to add(). Or nullptr if the socket wasn't found.
*/ */
void *remove(const std::shared_ptr<fr::SocketDescriptor> &socket); void *remove(const std::shared_ptr<fr::SocketDescriptor> &socket);
@ -57,18 +58,18 @@ namespace fr
#ifndef _WIN32 #ifndef _WIN32
struct Opaque struct Opaque
{ {
Opaque(int descriptor_, std::shared_ptr<fr::SocketDescriptor> socket_, void *opaque_) Opaque(std::shared_ptr<fr::SocketDescriptor> socket_, void *opaque_, int32_t descriptor_)
: descriptor(descriptor_), : socket(std::move(socket_)),
socket(std::move(socket_)), opaque(opaque_),
opaque(opaque_) descriptor(descriptor_)
{} {}
int descriptor;
std::shared_ptr<fr::SocketDescriptor> socket; std::shared_ptr<fr::SocketDescriptor> socket;
void *opaque; void *opaque;
int32_t descriptor;
}; };
int epoll_fd; int epoll_fd;
std::unordered_map<int, Opaque> added_sockets; std::unordered_map<uintptr_t, Opaque> added_sockets;
#endif #endif
}; };
} }

View File

@ -28,24 +28,25 @@ namespace fr
void SocketSelector::add(const std::shared_ptr<fr::SocketDescriptor> &socket, void *opaque) void SocketSelector::add(const std::shared_ptr<fr::SocketDescriptor> &socket, void *opaque)
{ {
int32_t descriptor = socket->get_socket_descriptor();
if(!socket->connected()) if(!socket->connected())
{ {
throw std::logic_error("Can't add disconnected socket"); throw std::logic_error("Can't add disconnected socket");
} }
auto add_iter = added_sockets.emplace(socket->get_socket_descriptor(), Opaque(socket->get_socket_descriptor(), socket, opaque)); auto added_iter = added_sockets.emplace((uintptr_t)socket.get(), Opaque(socket, opaque, descriptor));
if(!add_iter.second) if(!added_iter.second)
{ {
throw std::logic_error("Can't add duplicate socket: " + std::to_string(socket->get_socket_descriptor())); throw std::logic_error("Can't add duplicate socket");
} }
epoll_event event = {0}; epoll_event event = {0};
event.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP; event.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
event.data.ptr = &add_iter.first->second; event.data.ptr = &added_iter.first->second;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket->get_socket_descriptor(), &event) < 0) if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, descriptor, &event) < 0)
{ {
added_sockets.erase(socket->get_socket_descriptor()); delete (Opaque*)event.data.ptr;
throw std::runtime_error("Failed to add socket: " + std::to_string(errno)); throw std::runtime_error("Failed to add socket: " + std::to_string(errno));
} }
} }
@ -69,49 +70,25 @@ namespace fr
{ {
auto *opaque = static_cast<Opaque *>(events[a].data.ptr); auto *opaque = static_cast<Opaque *>(events[a].data.ptr);
ret.emplace_back(opaque->socket, opaque->opaque); ret.emplace_back(opaque->socket, opaque->opaque);
if(events[a].events & EPOLLERR || events[a].events & EPOLLHUP || events[a].events & EPOLLRDHUP)
{
auto iter = added_sockets.find(opaque->descriptor);
if(iter != added_sockets.end())
{
epoll_event event = {0};
auto remove_ret = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, opaque->descriptor, &event);
added_sockets.erase(opaque->descriptor);
if(remove_ret < 0)
{
throw std::runtime_error(
"Failed to remove socket: " + std::to_string(opaque->descriptor) + ". Errno: " +
std::to_string(errno));
}
}
}
} }
return ret; return ret;
} }
void *SocketSelector::remove(const std::shared_ptr<fr::SocketDescriptor> &socket) void *SocketSelector::remove(const std::shared_ptr<fr::SocketDescriptor> &socket)
{ {
auto descriptor = socket->get_socket_descriptor(); auto iter = added_sockets.find((uintptr_t)socket.get());
if(!socket->connected())
{
throw std::runtime_error("Can't remove disconnected socket");
}
auto iter = added_sockets.find(descriptor);
if(iter == added_sockets.end()) if(iter == added_sockets.end())
{ {
return nullptr; return nullptr;
} }
added_sockets.erase(iter); if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, iter->second.descriptor, nullptr) < 0)
epoll_event event = {0};
if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, descriptor, &event) < 0)
{ {
throw std::runtime_error("Failed to remove socket: " + std::to_string(descriptor) + ". Errno: " + std::to_string(errno)); throw std::runtime_error("Failed to remove socket: " + std::to_string(iter->second.descriptor) + ". Errno: " + std::to_string(errno));
} }
void *opaque = ((Opaque*)event.data.ptr)->opaque;
delete static_cast<Opaque *>(event.data.ptr); void *opaque = iter->second.opaque;
added_sockets.erase(iter);
return opaque; return opaque;
} }

View File

@ -53,6 +53,14 @@ namespace fr
continue; continue;
} }
//Set port re-use option
#ifndef _WIN32
if(setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT, (char*)&yes, sizeof(int)) == SOCKET_ERROR)
{
continue;
}
#endif
//If it's an IPv6 interface, attempt to allow IPv4 connections //If it's an IPv6 interface, attempt to allow IPv4 connections
if(c->ai_family == AF_INET6) if(c->ai_family == AF_INET6)
{ {