Skip to content

Commit

Permalink
enable_reuse_port
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman committed Apr 6, 2024
1 parent 052e0fe commit 871dbb6
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 17 deletions.
70 changes: 64 additions & 6 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ static zend_class_entry *swoole_server_task_result_ce;
static zend_object_handlers swoole_server_task_result_handlers;

static SW_THREAD_LOCAL zval swoole_server_instance;
#ifdef SW_THREAD
static SW_THREAD_LOCAL WorkerFn worker_thread_fn;
static SW_THREAD_LOCAL std::vector<ServerPortProperty *> swoole_server_port_properties;
static SW_THREAD_LOCAL std::unordered_map<size_t, ServerPortProperty *> swoole_server_port_properties;
#endif

static sw_inline ServerObject *server_fetch_object(zend_object *obj) {
return (ServerObject *) ((char *) obj - swoole_server_handlers.offset);
Expand All @@ -164,18 +166,39 @@ zval *php_swoole_server_zval_ptr(Server *serv) {

ServerPortProperty *php_swoole_server_get_port_property(ListenPort *port) {
#ifdef SW_THREAD
return swoole_server_port_properties.at(port->socket->get_fd());
#if defined(__linux__) && defined(HAVE_REUSEPORT)
if (sw_server()->enable_reuse_port && sw_server()->is_worker_thread()) {
/**
* If the ListenPort of other threads is delivered here, we can return the callback function
* based on the number of the ListenPort. This is because ListenPorts with the same order will
* have the same callback function.
*/
return swoole_server_port_properties[port->number];
} else
#endif
{
return swoole_server_port_properties[port->socket->get_fd()];
}
#else
return (ServerPortProperty *) port->ptr;
#endif
}

void php_swoole_server_set_port_property(ListenPort *port, ServerPortProperty *property) {
#ifdef SW_THREAD
if (swoole_server_port_properties.size() < (size_t) port->socket->get_fd() + 1) {
swoole_server_port_properties.resize((size_t) port->socket->get_fd() + 1);
#if defined(__linux__) && defined(HAVE_REUSEPORT)
if (property->serv->enable_reuse_port && property->serv->is_worker_thread()) {
/**
* If the ListenPort of other threads is delivered here, we can return the callback function
* based on the number of the ListenPort. This is because ListenPorts with the same order will
* have the same callback function.
*/
swoole_server_port_properties[port->number] = property;
} else
#endif
{
swoole_server_port_properties[port->socket->get_fd()] = property;
}
swoole_server_port_properties[port->socket->get_fd()] = property;
#else
port->ptr = property;
#endif
Expand Down Expand Up @@ -916,6 +939,31 @@ void ServerObject::on_before_start() {
serv->onReceive = php_swoole_server_onReceive;
}

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* The specific process is as follows: If `enable_reuse_port` is enabled, the main thread will create
* `worker_num` copies of all ListenPorts. When the child threads are generated, these copies
* will be distributed based on the `worker_id`. At this point, the child thread will add these ListenPorts
* to its own `ServerObject::property` in the `ports` section.
*/
if (serv->enable_reuse_port) {
size_t port_count, index;
port_count = index = serv->ports.size();
for (uint32_t worker_id = 1; worker_id <= serv->worker_num; worker_id++) {
for (size_t i = 0; i < port_count; i++) {
ListenPort *ls = serv->duplicate_port(serv->ports[i], worker_id, index++, i);
if (ls) {
/**
* Add it to `property->ports` so that all the properties of the main thread's `ListenPort`
* can be copied to the newly generated `ListenPort`.
*/
php_swoole_server_add_port(this, ls);
}
}
}
}
#endif

for (size_t i = 1; i < property->ports.size(); i++) {
zval *zport = property->ports.at(i);
zval *zport_setting =
Expand Down Expand Up @@ -1862,7 +1910,17 @@ static void server_ctor(zval *zserv, Server *serv) {

/* primary port */
for (auto ls : serv->ports) {
php_swoole_server_add_port(server_object, ls);
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
if (serv->enable_reuse_port && serv->is_worker_thread()) {
// The child thread only adds its own ListenPort.
if (ls->worker_id == sw_get_process_id()) {
php_swoole_server_add_port(server_object, ls);
}
} else
#endif
{
php_swoole_server_add_port(server_object, ls);
}
}

/* iterator */
Expand Down
22 changes: 21 additions & 1 deletion include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ struct ListenPort {
*/
int kernel_socket_recv_buffer_size = 0;
int kernel_socket_send_buffer_size = 0;
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
WorkerId worker_id = 0;
size_t number = 0;
#endif

#ifdef SW_USE_OPENSSL
SSLContext *ssl_context = nullptr;
Expand Down Expand Up @@ -324,7 +328,11 @@ struct ListenPort {
void close();
bool import(int sock);
const char *get_protocols();
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
int create_socket(swoole::Server *server, bool bind = true);
#else
int create_socket(swoole::Server *server);
#endif
void close_socket_fd();

#ifdef SW_USE_OPENSSL
Expand Down Expand Up @@ -735,7 +743,14 @@ class Server {
std::vector<ListenPort *> ports;

ListenPort *get_primary_port() {
return ports.front();
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
if (enable_reuse_port && is_worker_thread()) {
return ports.at((ports.size() / (worker_num + 1)) * (sw_get_process_id() + 1));
} else
#endif
{
return ports.front();
}
}

enum Mode get_mode() const {
Expand Down Expand Up @@ -938,7 +953,12 @@ class Server {
bool shutdown();

int add_worker(Worker *worker);
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
ListenPort *add_port(SocketType type, const char *host, int port, bool bind = true);
ListenPort *duplicate_port(ListenPort *ls, int worker_id, size_t index, size_t number);
#else
ListenPort *add_port(SocketType type, const char *host, int port);
#endif
int add_systemd_socket();
int add_hook(enum HookType type, const Callback &func, int push_back);
bool add_command(const std::string &command, int accepted_process_types, const Command::Handler &func);
Expand Down
54 changes: 52 additions & 2 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,19 @@ int Server::create() {
return SW_ERR;
}

port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size(), sizeof(ServerPortGS));
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* When enable_reuse_port is enabled, we pre-allocate an array called port_gs_list
* with a size of ports.size() * (worker_num + 1).
*/
if (enable_reuse_port) {
port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size() * (worker_num + 1), sizeof(ServerPortGS));
} else
#endif
{
port_gs_list = (ServerPortGS *) sw_shm_calloc(ports.size(), sizeof(ServerPortGS));
}

if (port_gs_list == nullptr) {
swoole_error("sw_shm_calloc() for port_connnection_num_array failed");
return SW_ERR;
Expand Down Expand Up @@ -1741,8 +1753,23 @@ int Server::add_systemd_socket() {
return count;
}

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* When enable_reuse_port is enabled, we need to pre-copy some ListenPort generated during the main thread period
* to allocate them to the sub-threads, but without binding them, otherwise we will encounter an error of address
* already in use.
*
* `bind` is only set to false when `Server::duplicate_port` is executed.
*/
ListenPort *Server::add_port(SocketType type, const char *host, int port, bool bind) {
#else
ListenPort *Server::add_port(SocketType type, const char *host, int port) {
if (session_list) {
#endif
if (session_list
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
&& !enable_reuse_port && !is_worker_thread()
#endif
) {
swoole_error_log(SW_LOG_ERROR, SW_ERROR_WRONG_OPERATION, "must add port before server is created");
return nullptr;
}
Expand Down Expand Up @@ -1798,7 +1825,11 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) {
}
#endif

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
if (ls->create_socket(this, bind) < 0) {
#else
if (ls->create_socket(this) < 0) {
#endif
return nullptr;
}

Expand All @@ -1808,6 +1839,25 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) {
return ls;
}

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* When enable_reuse_port is enabled, we need to duplicate the ListenPort generated by the main thread.
*/
ListenPort *Server::duplicate_port(ListenPort *ls, int worker_id, size_t index, size_t number) {
ListenPort *port = add_port(ls->type, ls->host.c_str(), ls->port, false);
if (!port) {
swoole_warning("duplicate listen port failed");
return nullptr;
}

port->worker_id = worker_id;
port->number = number;
port->gs = &port_gs_list[index];
port->gs->connection_nums = (sw_atomic_t *) sw_shm_calloc(worker_num, sizeof(sw_atomic_t));
return port;
}
#endif

static void Server_signal_handler(int sig) {
swoole_trace_log(SW_TRACE_SERVER, "signal[%d] %s triggered in %d", sig, swoole_signal_to_str(sig), getpid());

Expand Down
30 changes: 26 additions & 4 deletions src/server/port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,22 @@ size_t ListenPort::get_connection_num() {
}
}

#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
/**
* When `bind` is set to false, it indicates that it is the process of duplicating `ListenPort`
* using `Server::duplicate_port`.
*/
int ListenPort::create_socket(Server *server, bool bind) {
/**
* When `worker_id` is greater than 0, the socket is created through `Server::duplicate_port`, and no additional
* operations are required. Simply execute the bind operation.
*/
if (worker_id > 0) {
goto start_bind;
}
#else
int ListenPort::create_socket(Server *server) {
#endif
if (socket) {
#if defined(__linux__) && defined(HAVE_REUSEPORT)
if (server->enable_reuse_port) {
Expand Down Expand Up @@ -830,11 +845,18 @@ int ListenPort::create_socket(Server *server) {
}
#endif

if (socket->bind(host, &port) < 0) {
swoole_set_last_error(errno);
socket->free();
return SW_ERR;
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
start_bind:
if (bind) {
#endif
if (socket->bind(host, &port) < 0) {
swoole_set_last_error(errno);
socket->free();
return SW_ERR;
}
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
}
#endif

socket->info.assign(type, host, port);
return SW_OK;
Expand Down
14 changes: 12 additions & 2 deletions src/server/reactor_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ int Server::worker_main_loop(ProcessPool *pool, Worker *worker) {
}
SwooleWG.max_request = serv->max_request;
SwooleWG.worker = worker;
SwooleTG.id = 0;
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
if (!serv->enable_reuse_port) {
#endif
SwooleTG.id = 0;
#if defined(__linux__) && defined(HAVE_REUSEPORT) && defined(SW_THREAD)
}
#endif

serv->init_worker(worker);

Expand All @@ -200,11 +206,15 @@ int Server::worker_main_loop(ProcessPool *pool, Worker *worker) {
for (auto ls : serv->ports) {
#if defined(__linux__) && defined(HAVE_REUSEPORT)
if (ls->is_stream() && serv->enable_reuse_port) {
#ifdef SW_THREAD
if (ls->worker_id != worker->id + 1) {
continue;
}
#endif
if (ls->create_socket(serv) < 0) {
swoole_event_free();
return SW_ERR;
}

if (ls->listen() < 0) {
return SW_ERR;
}
Expand Down
2 changes: 0 additions & 2 deletions src/server/worker_threads.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ int Server::start_worker_threads() {
return SW_ERR;
}
#if defined(__linux__) && defined(HAVE_REUSEPORT)
} else {
ls->close_socket_fd();
}
#endif
}
Expand Down

0 comments on commit 871dbb6

Please sign in to comment.