Removed SocketReactor. Switched SocketSelector implementation to EPOLL
Should result in better performance, although a Windows implementation is not yet ready.
This commit is contained in:
parent
f5d8dd201b
commit
322f53841b
78
README.md
78
README.md
@ -186,84 +186,6 @@ Here we create an fr::TcpSocket object, connect to a domain (don't include the '
|
||||
|
||||
You can both set and get GET/POST data through the fr::(HttpRequest/HttpResponse)::(get/post) functions. And access/set headers though the [] operator. Once we've sent a request, we wait for a response. Once received, we print out the body of the response and exit.
|
||||
|
||||
# Blocking on multiple sockets simultaneously:
|
||||
|
||||
```c++
|
||||
#include <SocketSelector.h>
|
||||
|
||||
//Bind to port
|
||||
fr::TcpListener listener;
|
||||
if(listener.listen("8080") != fr::Socket::Success)
|
||||
{
|
||||
//Error
|
||||
}
|
||||
|
||||
//Create socket selector and add listener
|
||||
fr::SocketSelector selector;
|
||||
selector.add(listener);
|
||||
|
||||
//Create vector to store open connections
|
||||
std::vector<std::unique_ptr<fr::Socket>> connections;
|
||||
|
||||
//Infinitely loop. No timeout is specified so it will not return false.
|
||||
while(selector.wait())
|
||||
{
|
||||
//Check if it was the selector who sent data
|
||||
if(selector.is_ready(listener))
|
||||
{
|
||||
std::unique_ptr<fr::TcpSocket> socket(new fr::TcpSocket);
|
||||
if(listener.accept(*socket) == fr::Socket::Success)
|
||||
{
|
||||
selector.add(*socket);
|
||||
connections.emplace_back(std::move(socket));
|
||||
}
|
||||
}
|
||||
|
||||
//Else it must have been one of the clients
|
||||
else
|
||||
{
|
||||
//Find which client send the data
|
||||
for(auto iter = connections.begin(); iter != connections.end();)
|
||||
{
|
||||
//Eww
|
||||
fr::TcpSocket &client = (fr::TcpSocket &)**iter;
|
||||
|
||||
//Check if it's this client
|
||||
if(selector.is_ready(client))
|
||||
{
|
||||
//It is, so receive their HTTP request
|
||||
fr::HttpRequest request;
|
||||
if(client.receive(request) == fr::Socket::Success)
|
||||
{
|
||||
//Send back a HTTP response containing 'Hello, World!'
|
||||
fr::HttpResponse response;
|
||||
response.set_body("<h1>Hello, World!</h1>");
|
||||
client.send(response);
|
||||
|
||||
//Remove them from the selector and close the connection
|
||||
selector.remove(client);
|
||||
client.close_socket();
|
||||
iter = connections.erase(iter);
|
||||
}
|
||||
else
|
||||
{
|
||||
iter++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
iter++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
fr::SocketSelector can be used to monitor lots of blocking sockets at once, without polling, to see when data is being received or a connection has closed. To add a socket, just call fr::SocketSelector::add, and to remove a socket, which must be done before the socket object is destroyed, call fr::SocketSelector::remove. You can add as many fr::Socket's as you want.It is also important to add your fr::TcpListener to the selector, otherwise you wont be able to accept new connections whilst blocking.
|
||||
|
||||
Once added, you can call fr::SocketSelector::wait() to wait for socket events. You can also specify a timeout, forcing it to return even if there was no activity. If any of the sockets added to the selector send data or disconnect, then it will return true. If the specified timeout has expired, then it will return false.
|
||||
|
||||
To find which socket actually did something, we need to call fr::SocketSelector::is_ready(), passing it the socket to check. It'll return true if it was this socket that sent data, false otherwise. First we check our selector, and accept a new connection if it was that. Otherwise, we check through all of the connected clients. If the fr::Socket::receive() call failed on the socket, then it must have disconnected, so we remove it from the client list, and remove it from the selector.
|
||||
|
||||
# Sending custom objects:
|
||||
```c++
|
||||
class MyClass : public fr::Packetable
|
||||
|
||||
@ -1,2 +1,3 @@
|
||||
add_subdirectory(simple_http_server_and_client)
|
||||
add_subdirectory(simple_websocket_server_and_client)
|
||||
add_subdirectory(concurrent_http_server)
|
||||
|
||||
4
examples/concurrent_http_server/CMakeLists.txt
Normal file
4
examples/concurrent_http_server/CMakeLists.txt
Normal file
@ -0,0 +1,4 @@
|
||||
add_executable(concurrent_http_server ConcurrentHTTPServer.cpp)
|
||||
target_link_libraries(concurrent_http_server frnetlib)
|
||||
|
||||
install(TARGETS concurrent_http_server DESTINATION "bin")
|
||||
97
examples/concurrent_http_server/ConcurrentHTTPServer.cpp
Normal file
97
examples/concurrent_http_server/ConcurrentHTTPServer.cpp
Normal file
@ -0,0 +1,97 @@
|
||||
//
|
||||
// Created by fred.nicolson on 01/10/18.
|
||||
//
|
||||
|
||||
#include <frnetlib/TcpListener.h>
|
||||
#include <iostream>
|
||||
#include <frnetlib/SocketSelector.h>
|
||||
#include <frnetlib/Http.h>
|
||||
#include <frnetlib/HttpRequest.h>
|
||||
#include <frnetlib/HttpResponse.h>
|
||||
#include <thread>
|
||||
|
||||
int main()
|
||||
{
|
||||
//Bind to port
|
||||
auto listener = std::make_shared<fr::TcpListener>();
|
||||
if(listener->listen("8080") != fr::Socket::Success)
|
||||
{
|
||||
std::cerr << "Failed to bind to port" << std::endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
for(size_t a = 0; a < std::thread::hardware_concurrency(); ++a)
|
||||
{
|
||||
if(fork() <= 0)
|
||||
break;
|
||||
}
|
||||
|
||||
//Setup our socket selector and add the listener to it
|
||||
fr::SocketSelector selector;
|
||||
selector.add(listener, nullptr);
|
||||
|
||||
//Enter main server loop
|
||||
volatile bool running = true;
|
||||
while(running)
|
||||
{
|
||||
//Wait for a socket to be ready with no timeout
|
||||
auto ready_sockets = selector.wait();
|
||||
|
||||
//Process each socket
|
||||
for(auto &ready_socket : ready_sockets)
|
||||
{
|
||||
//If this is the listener, then accept a new connection
|
||||
if(ready_socket.first->get_socket_descriptor() == listener->get_socket_descriptor())
|
||||
{
|
||||
auto client = std::make_shared<fr::TcpSocket>();
|
||||
if(listener->accept(*client) == fr::Socket::Success)
|
||||
{
|
||||
//We've accepted the connection, so add it to the selector, associating a new fr::HttpRequest
|
||||
//Object which will slowly be filled with the client's request. You could also pass a struct
|
||||
//associated with the connection for easier state management.
|
||||
selector.add(client, new fr::HttpRequest());
|
||||
client->set_blocking(false);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
//Else, a socket is sending data/has disconnected, accept it
|
||||
auto client = std::static_pointer_cast<fr::Socket>(ready_socket.first); //fr::TcpSocket -> fr::Socket -> fr::SocketDescriptor
|
||||
auto partial_request = static_cast<fr::HttpRequest*>(ready_socket.second);
|
||||
|
||||
char data[0x1000];
|
||||
size_t received = 0;
|
||||
auto recv_status = client->receive_raw(data, sizeof(data), received);
|
||||
if(recv_status == fr::Socket::Success)
|
||||
{
|
||||
//We receied data, so parse it using the partial HTTP request associated with this connection
|
||||
auto parse_status = partial_request->parse(data, received);
|
||||
if(parse_status == fr::Socket::Success)
|
||||
{
|
||||
//The client has sent a full request, send them something back
|
||||
//This could alternatively pass this connection/request off to a handler in another
|
||||
//thread for further processing.
|
||||
fr::HttpResponse response;
|
||||
response.set_body("<h1>Hello World!</h1>");
|
||||
client->send(response);
|
||||
}
|
||||
else if(parse_status == fr::Socket::NotEnoughData)
|
||||
{
|
||||
//More data needed, wait for it
|
||||
}
|
||||
else
|
||||
{
|
||||
//HTTP error, disconnect the client
|
||||
delete partial_request;
|
||||
selector.remove(client);
|
||||
client->disconnect();
|
||||
}
|
||||
}
|
||||
else if(recv_status == fr::Socket::Disconnected)
|
||||
{
|
||||
//Free the allocated fr::HttpRequest. The socket is auto-removed from the selector as it notified us.
|
||||
delete partial_request;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -28,7 +28,10 @@ namespace fr
|
||||
*
|
||||
* @param data The HTTP response to parse
|
||||
* @param datasz The length of data in bytes
|
||||
* @return True if more data is needed, false if finished.
|
||||
* @return Status of the parse:
|
||||
* 'NotEnoughData' if parse needs to be re-called with more data.
|
||||
* 'Success' if the whole request has been parsed
|
||||
* Anything else - on error
|
||||
*/
|
||||
fr::Socket::Status parse(const char *data, size_t datasz) override;
|
||||
|
||||
|
||||
@ -5,10 +5,10 @@
|
||||
#ifndef FRNETLIB_LISTENER_H
|
||||
#define FRNETLIB_LISTENER_H
|
||||
#include "Socket.h"
|
||||
|
||||
#include "SocketDescriptor.h"
|
||||
namespace fr
|
||||
{
|
||||
class Listener
|
||||
class Listener : public SocketDescriptor
|
||||
{
|
||||
public:
|
||||
Listener()
|
||||
@ -52,13 +52,6 @@ namespace fr
|
||||
*/
|
||||
virtual void close_socket()=0;
|
||||
|
||||
/*!
|
||||
* Gets the socket descriptor.
|
||||
*
|
||||
* @return The listen socket descriptor
|
||||
*/
|
||||
virtual int32_t get_socket_descriptor() const=0;
|
||||
|
||||
/*!
|
||||
* Sets the socket descriptor.
|
||||
*
|
||||
|
||||
@ -60,13 +60,6 @@ namespace fr
|
||||
*/
|
||||
void shutdown() override;
|
||||
|
||||
/*!
|
||||
* Gets the socket descriptor.
|
||||
*
|
||||
* @return The listen socket descriptor
|
||||
*/
|
||||
int32_t get_socket_descriptor() const override;
|
||||
|
||||
/*!
|
||||
* Sets the socket descriptor.
|
||||
*
|
||||
@ -74,6 +67,20 @@ namespace fr
|
||||
*/
|
||||
void set_socket_descriptor(int32_t descriptor) override;
|
||||
|
||||
/*!
|
||||
* Checks to see if the socket is connected or not
|
||||
*
|
||||
* @return True if connected, false otherwise
|
||||
*/
|
||||
bool connected() const noexcept override;
|
||||
|
||||
/*!
|
||||
* Gets the underlying socket descriptor.
|
||||
*
|
||||
* @return The socket descriptor.
|
||||
*/
|
||||
int32_t get_socket_descriptor() const noexcept override;
|
||||
|
||||
private:
|
||||
mbedtls_net_context listen_fd;
|
||||
mbedtls_ssl_config conf;
|
||||
|
||||
@ -97,7 +97,7 @@ namespace fr
|
||||
*
|
||||
* @return The socket's descriptor. -1 indicates no connection.
|
||||
*/
|
||||
inline int32_t get_socket_descriptor() const override
|
||||
inline int32_t get_socket_descriptor() const noexcept override
|
||||
{
|
||||
if(!ssl_socket_descriptor)
|
||||
return -1;
|
||||
@ -120,7 +120,7 @@ namespace fr
|
||||
*
|
||||
* @return True if it's connected. False otherwise.
|
||||
*/
|
||||
inline bool connected() const final
|
||||
inline bool connected() const noexcept final
|
||||
{
|
||||
return ssl_socket_descriptor && ssl_socket_descriptor->fd > -1;
|
||||
}
|
||||
|
||||
@ -7,13 +7,14 @@
|
||||
|
||||
#include <mutex>
|
||||
#include "NetworkEncoding.h"
|
||||
#include "SocketDescriptor.h"
|
||||
|
||||
#define RECV_CHUNK_SIZE 4096 //How much data to try and recv at once
|
||||
namespace fr
|
||||
{
|
||||
class Packet;
|
||||
class Sendable;
|
||||
class Socket
|
||||
class Socket : public SocketDescriptor
|
||||
{
|
||||
public:
|
||||
enum Status
|
||||
@ -94,20 +95,6 @@ namespace fr
|
||||
*/
|
||||
virtual Status receive_raw(void *data, size_t data_size, size_t &received) = 0;
|
||||
|
||||
/*!
|
||||
* Gets the underlying socket descriptor.
|
||||
*
|
||||
* @return The socket descriptor.
|
||||
*/
|
||||
virtual int32_t get_socket_descriptor() const = 0;
|
||||
|
||||
/*!
|
||||
* Checks to see if we're connected to a socket or not
|
||||
*
|
||||
* @return True if it's connected. False otherwise.
|
||||
*/
|
||||
virtual bool connected() const =0;
|
||||
|
||||
/*!
|
||||
* Sets the socket file descriptor. Internally used.
|
||||
*
|
||||
|
||||
32
include/frnetlib/SocketDescriptor.h
Normal file
32
include/frnetlib/SocketDescriptor.h
Normal file
@ -0,0 +1,32 @@
|
||||
//
|
||||
// Created by fred.nicolson on 01/10/18.
|
||||
//
|
||||
|
||||
#ifndef FRNETLIB_SOCKETDESCRIPTOR_H
|
||||
#define FRNETLIB_SOCKETDESCRIPTOR_H
|
||||
|
||||
#include <stdint.h>
|
||||
namespace fr
|
||||
{
|
||||
class SocketDescriptor
|
||||
{
|
||||
public:
|
||||
|
||||
/*!
|
||||
* Checks to see if the socket is connected or not
|
||||
*
|
||||
* @return True if connected, false otherwise
|
||||
*/
|
||||
virtual bool connected() const noexcept = 0;
|
||||
|
||||
/*!
|
||||
* Gets the underlying socket descriptor.
|
||||
*
|
||||
* @return The socket descriptor.
|
||||
*/
|
||||
virtual int32_t get_socket_descriptor() const noexcept = 0;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
#endif //FRNETLIB_SOCKETDESCRIPTOR_H
|
||||
@ -1,68 +0,0 @@
|
||||
//
|
||||
// Created by fred on 20/12/16.
|
||||
//
|
||||
|
||||
#ifndef FRNETLIB_SOCKETREACTOR_H
|
||||
#define FRNETLIB_SOCKETREACTOR_H
|
||||
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
#include "Socket.h"
|
||||
#include "SocketSelector.h"
|
||||
|
||||
namespace fr
|
||||
{
|
||||
class SocketReactor
|
||||
{
|
||||
public:
|
||||
SocketReactor(const SocketReactor&)=delete;
|
||||
SocketReactor(SocketReactor&&) noexcept = default;
|
||||
|
||||
/*!
|
||||
* Adds a socket to the selector. Note that SocketSelector
|
||||
* does not keep a copy of the object, just a handle, it's
|
||||
* up to you to store your fr::Sockets.
|
||||
*
|
||||
* @param socket The socket to add.
|
||||
* @param callback A function to call when the socket shows activity. Remember to remove it from the reactor if the client has disconnected.
|
||||
*/
|
||||
template<typename T>
|
||||
inline void add(const T &socket, std::function<void()> callback)
|
||||
{
|
||||
socket_selector.add(socket);
|
||||
socket_callbacks.emplace_back(std::make_pair(socket.get_remote_address(), callback));
|
||||
}
|
||||
|
||||
/*!
|
||||
* Removes a socket from the socket selector.
|
||||
*
|
||||
* @param socket The socket to remove.
|
||||
*/
|
||||
template<typename T>
|
||||
inline void remove(const T &socket)
|
||||
{
|
||||
socket_selector.remove(socket);
|
||||
socket_callbacks.erase(std::find_if(socket_callbacks.begin(), socket_callbacks.end(), [&](const std::pair<const int32_t, std::function<void()>> &check) {
|
||||
return check.first == socket.get_socket_descriptor();
|
||||
}));
|
||||
}
|
||||
|
||||
/*!
|
||||
* Waits for a socket to become ready.
|
||||
*
|
||||
* @param timeout The amount of time wait should block for before timing out.
|
||||
* @return True if a socket is ready. False if it timed out.
|
||||
*/
|
||||
bool wait(std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
|
||||
|
||||
private:
|
||||
std::vector<std::pair<const int32_t, std::function<void()>>> socket_callbacks; //<descriptor, callback>
|
||||
fr::SocketSelector socket_selector;
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
#endif //FRNETLIB_SOCKETREACTOR_H
|
||||
@ -6,6 +6,10 @@
|
||||
#define FRNETLIB_SOCKETSELECTOR_H
|
||||
|
||||
#include <chrono>
|
||||
#include <vector>
|
||||
#include <iostream>
|
||||
#include <unordered_map>
|
||||
#include <sys/epoll.h>
|
||||
#include "NetworkEncoding.h"
|
||||
#include "Socket.h"
|
||||
#include "TcpListener.h"
|
||||
@ -15,81 +19,55 @@ namespace fr
|
||||
class SocketSelector
|
||||
{
|
||||
public:
|
||||
SocketSelector() noexcept;
|
||||
SocketSelector();
|
||||
~SocketSelector();
|
||||
|
||||
/*!
|
||||
* Waits for a socket to become ready.
|
||||
* Adds a socket to the selector along with some opaque state
|
||||
*
|
||||
* @throws An std::exception if the socket encountered an error
|
||||
* @param timeout The amount of time wait should block for before timing out.
|
||||
* @return True if a socket is ready. False if it timed out.
|
||||
* @throws An std::exception on failure
|
||||
* @param socket The socket to add, can be a Listener/Socket.
|
||||
* @param opaque Opaque data which is passed back by wait() when the socket
|
||||
* has activity. Can be used for state management.
|
||||
*/
|
||||
bool wait(std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
|
||||
void add(const std::shared_ptr<fr::SocketDescriptor> &socket, void *opaque);
|
||||
|
||||
/*!
|
||||
* Adds a socket to the selector. Note that SocketSelector
|
||||
* does not keep a copy of the object, just a handle, it's
|
||||
* up to you to store your fr::Sockets.
|
||||
* Waits for activity on one of the added sockets. If a socket disconnects,
|
||||
* then it will automatically be removed from the selector, and so remove()
|
||||
* should not be called.
|
||||
*
|
||||
* @param socket The socket to add.
|
||||
* @throws An std::exception on failure
|
||||
* @param timeout The maximum time in milliseconds to wait for. Default/-1 for no timeout.
|
||||
* @return A list of sockets which either are ready, or have disconnected.
|
||||
*/
|
||||
template<typename T>
|
||||
inline void add(const T &socket)
|
||||
{
|
||||
add(socket.get_socket_descriptor());
|
||||
}
|
||||
std::vector<std::pair<std::shared_ptr<fr::SocketDescriptor>, void*>> wait(std::chrono::milliseconds timeout = std::chrono::milliseconds(-1));
|
||||
|
||||
/*!
|
||||
* Adds a socket to the selector. Note that SocketSelector
|
||||
* does not keep a copy of the object, just a handle, it's
|
||||
* up to you to store your fr::Sockets.
|
||||
* Removes a socket from the selector.
|
||||
*
|
||||
* @param socket The socket descriptor to add.
|
||||
* @throws An std::exception on failure
|
||||
* @param socket The socket to remove. Must not be disconnected.
|
||||
*/
|
||||
void add(int32_t socket_descriptor)
|
||||
{
|
||||
//Add it to the set
|
||||
FD_SET(socket_descriptor, &listen_set);
|
||||
|
||||
if(socket_descriptor > max_descriptor)
|
||||
max_descriptor = socket_descriptor;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Checks to see if a socket inside of the selector is ready.
|
||||
* This should be called after 'wait' returns true, on
|
||||
* each of the added sockets to see which one has received data.
|
||||
*
|
||||
* @param socket The socket to check if it's ready
|
||||
* @return True if this socket is ready, false otherwise.
|
||||
*/
|
||||
template<typename T>
|
||||
inline bool is_ready(const T &socket)
|
||||
{
|
||||
return (FD_ISSET(socket.get_socket_descriptor(), &listen_read));
|
||||
}
|
||||
|
||||
inline bool is_ready(int32_t socket)
|
||||
{
|
||||
return (FD_ISSET(socket, &listen_read));
|
||||
}
|
||||
|
||||
/*!
|
||||
* Removes a socket from the socket selector.
|
||||
*
|
||||
* @param socket The socket to remove.
|
||||
*/
|
||||
template<typename T>
|
||||
inline void remove(const T &socket)
|
||||
{
|
||||
FD_CLR(socket.get_socket_descriptor(), &listen_set);
|
||||
}
|
||||
void remove(const std::shared_ptr<fr::SocketDescriptor> &socket);
|
||||
private:
|
||||
|
||||
fd_set listen_set;
|
||||
fd_set listen_read;
|
||||
int32_t max_descriptor;
|
||||
#ifndef _WIN32
|
||||
struct Opaque
|
||||
{
|
||||
Opaque(int descriptor_, std::shared_ptr<fr::SocketDescriptor> socket_, void *opaque_)
|
||||
: descriptor(descriptor_),
|
||||
socket(std::move(socket_)),
|
||||
opaque(opaque_)
|
||||
{}
|
||||
|
||||
int descriptor;
|
||||
std::shared_ptr<fr::SocketDescriptor> socket;
|
||||
void *opaque;
|
||||
};
|
||||
int epoll_fd;
|
||||
std::unordered_map<int, Opaque> added_sockets;
|
||||
#endif
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@ -47,13 +47,6 @@ public:
|
||||
*/
|
||||
void shutdown() override;
|
||||
|
||||
/*!
|
||||
* Gets the socket descriptor.
|
||||
*
|
||||
* @return The listen socket descriptor
|
||||
*/
|
||||
int32_t get_socket_descriptor() const override;
|
||||
|
||||
/*!
|
||||
* Sets the socket descriptor.
|
||||
*
|
||||
@ -66,6 +59,20 @@ public:
|
||||
*/
|
||||
void close_socket() override;
|
||||
|
||||
/*!
|
||||
* Checks to see if the socket is connected or not
|
||||
*
|
||||
* @return True if connected, false otherwise
|
||||
*/
|
||||
bool connected() const noexcept override;
|
||||
|
||||
/*!
|
||||
* Gets the underlying socket descriptor.
|
||||
*
|
||||
* @return The socket descriptor.
|
||||
*/
|
||||
int32_t get_socket_descriptor() const noexcept override;
|
||||
|
||||
|
||||
private:
|
||||
int32_t socket_descriptor;
|
||||
|
||||
@ -76,13 +76,6 @@ public:
|
||||
*/
|
||||
void set_descriptor(void *descriptor_data) override;
|
||||
|
||||
/*!
|
||||
* Gets the underlying socket descriptor
|
||||
*
|
||||
* @return The socket descriptor. -1 typically indicates no connection.
|
||||
*/
|
||||
int32_t get_socket_descriptor() const override;
|
||||
|
||||
/*!
|
||||
* Applies requested socket options to the socket.
|
||||
* Should be called when a new socket is created.
|
||||
@ -90,14 +83,18 @@ public:
|
||||
void reconfigure_socket() override;
|
||||
|
||||
/*!
|
||||
* Checks to see if we're connected to a socket or not
|
||||
* Checks to see if the socket is connected or not
|
||||
*
|
||||
* @return True if it's connected. False otherwise.
|
||||
* @return True if connected, false otherwise
|
||||
*/
|
||||
inline bool connected() const final
|
||||
{
|
||||
return socket_descriptor > -1;
|
||||
}
|
||||
bool connected() const noexcept override;
|
||||
|
||||
/*!
|
||||
* Gets the underlying socket descriptor.
|
||||
*
|
||||
* @return The socket descriptor.
|
||||
*/
|
||||
int32_t get_socket_descriptor() const noexcept override;
|
||||
|
||||
protected:
|
||||
|
||||
|
||||
@ -10,6 +10,8 @@
|
||||
|
||||
#include <mbedtls/net_sockets.h>
|
||||
#include <iostream>
|
||||
#include <frnetlib/SSLListener.h>
|
||||
|
||||
|
||||
namespace fr
|
||||
{
|
||||
@ -149,7 +151,7 @@ namespace fr
|
||||
::shutdown(listen_fd.fd, 0);
|
||||
}
|
||||
|
||||
int32_t SSLListener::get_socket_descriptor() const
|
||||
int32_t SSLListener::get_socket_descriptor() const noexcept
|
||||
{
|
||||
return listen_fd.fd;
|
||||
}
|
||||
@ -168,4 +170,9 @@ namespace fr
|
||||
}
|
||||
}
|
||||
|
||||
bool SSLListener::connected() const noexcept
|
||||
{
|
||||
return listen_fd.fd > -1;
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,28 +0,0 @@
|
||||
//
|
||||
// Created by fred on 20/12/16.
|
||||
//
|
||||
|
||||
#include <algorithm>
|
||||
#include "frnetlib/SocketReactor.h"
|
||||
|
||||
namespace fr
|
||||
{
|
||||
bool SocketReactor::wait(std::chrono::milliseconds timeout)
|
||||
{
|
||||
bool found = false;
|
||||
if(socket_selector.wait(timeout))
|
||||
{
|
||||
//Find which socket sent the activity
|
||||
for(auto &callback : socket_callbacks)
|
||||
{
|
||||
if(socket_selector.is_ready(callback.first))
|
||||
{
|
||||
//Call the socket's callback
|
||||
callback.second();
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return found;
|
||||
}
|
||||
}
|
||||
@ -6,49 +6,97 @@
|
||||
#include <mutex>
|
||||
#include "frnetlib/SocketSelector.h"
|
||||
|
||||
//Linux EPOLL implementation
|
||||
namespace fr
|
||||
{
|
||||
|
||||
SocketSelector::SocketSelector() noexcept
|
||||
#ifndef _WIN32
|
||||
SocketSelector::SocketSelector()
|
||||
: epoll_fd(-1)
|
||||
{
|
||||
//Zero out sets
|
||||
FD_ZERO(&listen_set);
|
||||
FD_ZERO(&listen_read);
|
||||
|
||||
max_descriptor = 0;
|
||||
epoll_fd = epoll_create1(O_CLOEXEC);
|
||||
if(epoll_fd < 0)
|
||||
{
|
||||
throw std::runtime_error("Failed to create EPOLL descriptor: " + std::to_string(errno));
|
||||
}
|
||||
}
|
||||
|
||||
SocketSelector::~SocketSelector()
|
||||
{
|
||||
FD_ZERO(&listen_set);
|
||||
FD_ZERO(&listen_read);
|
||||
close(epoll_fd);
|
||||
}
|
||||
|
||||
bool SocketSelector::wait(std::chrono::milliseconds timeout)
|
||||
void SocketSelector::add(const std::shared_ptr<fr::SocketDescriptor> &socket, void *opaque)
|
||||
{
|
||||
//Windows will crash if we pass an empty set. Do a check.
|
||||
#ifdef _WIN32
|
||||
if(listen_set.fd_count == 0)
|
||||
if(!socket->connected())
|
||||
{
|
||||
//It's empty. Emulate UNIX behaviour by sleeping for timeout.
|
||||
std::this_thread::sleep_for(timeout);
|
||||
return false;
|
||||
throw std::logic_error("Can't add disconnected socket");
|
||||
}
|
||||
#endif
|
||||
|
||||
timeval wait_time{};
|
||||
wait_time.tv_sec = 0;
|
||||
wait_time.tv_usec = std::chrono::duration_cast<std::chrono::microseconds>(timeout).count();
|
||||
auto add_iter = added_sockets.emplace(socket->get_socket_descriptor(), Opaque(socket->get_socket_descriptor(), socket, opaque));
|
||||
if(!add_iter.second)
|
||||
{
|
||||
throw std::logic_error("Can't add duplicate socket: " + std::to_string(socket->get_socket_descriptor()));
|
||||
}
|
||||
|
||||
listen_read = listen_set;
|
||||
int select_result = select(max_descriptor + 1, &listen_read, nullptr, nullptr, timeout.count() == 0 ? nullptr : &wait_time);
|
||||
epoll_event event = {0};
|
||||
event.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
|
||||
event.data.ptr = &add_iter.first->second;
|
||||
|
||||
if(select_result == 0) //If it's timed out
|
||||
return false;
|
||||
if(select_result == SOCKET_ERROR) //Else if error
|
||||
throw std::logic_error("select() returned -1. Errno: " + std::to_string(errno));
|
||||
|
||||
return true;
|
||||
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket->get_socket_descriptor(), &event) < 0)
|
||||
{
|
||||
added_sockets.erase(socket->get_socket_descriptor());
|
||||
throw std::runtime_error("Failed to add socket: " + std::to_string(errno));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
std::vector<std::pair<std::shared_ptr<fr::SocketDescriptor>, void *>>
|
||||
SocketSelector::wait(std::chrono::milliseconds timeout)
|
||||
{
|
||||
static thread_local epoll_event events[100];
|
||||
int event_count = epoll_wait(epoll_fd, events, 100, timeout.count());
|
||||
if(event_count < 0)
|
||||
{
|
||||
throw std::runtime_error("epoll_wait returned: " + std::to_string(errno));
|
||||
}
|
||||
|
||||
std::vector<std::pair<std::shared_ptr<fr::SocketDescriptor>, void*>> ret;
|
||||
for(int a = 0; a < event_count; ++a)
|
||||
{
|
||||
auto *opaque = static_cast<Opaque*>(events[a].data.ptr);
|
||||
ret.emplace_back(opaque->socket, opaque->opaque);
|
||||
if(events[a].events & EPOLLERR || events[a].events & EPOLLHUP || events[a].events & EPOLLRDHUP)
|
||||
{
|
||||
auto iter = added_sockets.find(opaque->descriptor);
|
||||
if(iter != added_sockets.end())
|
||||
{
|
||||
added_sockets.erase(opaque->descriptor);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void SocketSelector::remove(const std::shared_ptr<fr::SocketDescriptor> &socket)
|
||||
{
|
||||
auto descriptor = socket->get_socket_descriptor();
|
||||
if(!socket->connected())
|
||||
{
|
||||
throw std::runtime_error("Can't remove disconnected socket");
|
||||
}
|
||||
auto iter = added_sockets.find(descriptor);
|
||||
if(iter != added_sockets.end())
|
||||
{
|
||||
added_sockets.erase(iter);
|
||||
}
|
||||
|
||||
epoll_event event = {0};
|
||||
if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, descriptor, &event) < 0)
|
||||
{
|
||||
throw std::runtime_error("Failed to remove socket: " + std::to_string(errno));
|
||||
}
|
||||
delete static_cast<Opaque*>(event.data.ptr);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
//Windows implementation coming soon(tm)
|
||||
@ -2,6 +2,8 @@
|
||||
// Created by fred on 06/12/16.
|
||||
//
|
||||
|
||||
#include <frnetlib/TcpListener.h>
|
||||
|
||||
#include "frnetlib/TcpListener.h"
|
||||
|
||||
namespace fr
|
||||
@ -118,7 +120,7 @@ namespace fr
|
||||
::shutdown(socket_descriptor, 0);
|
||||
}
|
||||
|
||||
int32_t TcpListener::get_socket_descriptor() const
|
||||
int32_t TcpListener::get_socket_descriptor() const noexcept
|
||||
{
|
||||
return socket_descriptor;
|
||||
}
|
||||
@ -136,4 +138,9 @@ namespace fr
|
||||
socket_descriptor = -1;
|
||||
}
|
||||
}
|
||||
|
||||
bool TcpListener::connected() const noexcept
|
||||
{
|
||||
return socket_descriptor > -1;
|
||||
}
|
||||
}
|
||||
@ -183,7 +183,7 @@ namespace fr
|
||||
is_blocking = should_block;
|
||||
}
|
||||
|
||||
int32_t TcpSocket::get_socket_descriptor() const
|
||||
int32_t TcpSocket::get_socket_descriptor() const noexcept
|
||||
{
|
||||
return socket_descriptor;
|
||||
}
|
||||
@ -216,4 +216,10 @@ namespace fr
|
||||
#endif
|
||||
}
|
||||
|
||||
bool TcpSocket::connected() const noexcept
|
||||
{
|
||||
return socket_descriptor > -1;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user