Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize thread server #5446

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions core-tests/src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,50 @@ TEST(server, process) {
delete lock;
}

TEST(server, thread) {
Server serv(Server::MODE_THREAD);
serv.worker_num = 2;

sw_logger()->set_level(SW_LOG_WARNING);

ListenPort *port = serv.add_port(SW_SOCK_TCP, TEST_HOST, 0);
ASSERT_TRUE(port);

mutex lock;
lock.lock();

ASSERT_EQ(serv.create(), SW_OK);

std::thread t1([&]() {
swoole_signal_block_all();

lock.lock();

network::SyncClient c(SW_SOCK_TCP);
c.connect(TEST_HOST, port->port);
c.send(packet, strlen(packet));
char buf[1024];
c.recv(buf, sizeof(buf));
c.close();

serv.shutdown();
});

serv.onWorkerStart = [&lock](Server *serv, Worker *worker) { lock.unlock(); };

serv.onReceive = [](Server *serv, RecvData *req) -> int {
EXPECT_EQ(string(req->data, req->info.len), string(packet));

string resp = string("Server: ") + string(packet);
serv->send(req->info.fd, resp.c_str(), resp.length());

return SW_OK;
};

serv.start();
t1.join();
}

TEST(server, reload_all_workers) {
Server serv(Server::MODE_PROCESS);
serv.worker_num = 2;
Expand Down
106 changes: 64 additions & 42 deletions ext-src/swoole_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ using namespace swoole;
zend_class_entry *swoole_process_ce;
static zend_object_handlers swoole_process_handlers;

static uint32_t php_swoole_worker_round_id = 0;
static uint32_t round_process_id = 0;
static thread_local uint32_t server_user_worker_id = 0;
static zend_fcall_info_cache *signal_fci_caches[SW_SIGNO_MAX] = {};

struct ProcessObject {
Expand All @@ -45,6 +46,10 @@ static sw_inline ProcessObject *php_swoole_process_fetch_object(zend_object *obj
return (ProcessObject *) ((char *) obj - swoole_process_handlers.offset);
}

static sw_inline ProcessObject *php_swoole_process_fetch_object(zval *zobj) {
return php_swoole_process_fetch_object(Z_OBJ_P(zobj));
}

Worker *php_swoole_process_get_worker(zval *zobject) {
return php_swoole_process_fetch_object(Z_OBJ_P(zobject))->worker;
}
Expand All @@ -67,19 +72,17 @@ static void php_swoole_process_free_object(zend_object *object) {

if (worker) {
UnixSocket *_pipe = worker->pipe_object;
if (_pipe) {
if (_pipe && !worker->shared) {
delete _pipe;
}

if (worker->queue) {
delete worker->queue;
}

zend::Process *proc = (zend::Process *) worker->ptr2;
if (proc) {
delete proc;
}
efree(worker);
delete worker;
}

zend_object_std_dtor(object);
Expand Down Expand Up @@ -237,9 +240,10 @@ void php_swoole_process_minit(int module_number) {
}

static PHP_METHOD(swoole_process, __construct) {
Worker *process = php_swoole_process_get_worker(ZEND_THIS);
auto po = php_swoole_process_fetch_object(ZEND_THIS);
Server *server = sw_server();

if (process) {
if (po->worker) {
zend_throw_error(NULL, "Constructor of %s can only be called once", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS));
RETURN_FALSE;
}
Expand All @@ -250,7 +254,7 @@ static PHP_METHOD(swoole_process, __construct) {
RETURN_FALSE;
}

if (sw_server() && sw_server()->is_started() && sw_server()->is_master()) {
if (server && server->is_started() && server->is_master()) {
zend_throw_error(NULL, "%s can't be used in master process", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS));
RETURN_FALSE;
}
Expand All @@ -265,6 +269,9 @@ static PHP_METHOD(swoole_process, __construct) {
zend_long pipe_type = zend::PIPE_TYPE_DGRAM;
zend_bool enable_coroutine = false;

po->worker = new Worker();
Worker *process = po->worker;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 4)
Z_PARAM_FUNC(func.fci, func.fci_cache);
Z_PARAM_OPTIONAL
Expand All @@ -273,45 +280,60 @@ static PHP_METHOD(swoole_process, __construct) {
Z_PARAM_BOOL(enable_coroutine)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

process = (Worker *) ecalloc(1, sizeof(Worker));

uint32_t base = 1;
if (sw_server() && sw_server()->is_started()) {
base = sw_server()->worker_num + sw_server()->task_worker_num + sw_server()->get_user_worker_num();
}
if (php_swoole_worker_round_id == 0) {
php_swoole_worker_round_id = base;
}
process->id = php_swoole_worker_round_id++;

if (redirect_stdin_and_stdout) {
process->redirect_stdin = true;
process->redirect_stdout = true;
process->redirect_stderr = true;
/**
* Forced to use stream pipe
*/
pipe_type = zend::PIPE_TYPE_STREAM;
}
if (server && server->is_worker_thread()) {
Worker *shared_worker;
if (server->is_user_worker()) {
shared_worker = server->get_worker(swoole_get_process_id());
} else {
shared_worker = server->get_worker((server_user_worker_id++) + server->get_core_worker_num());
}
*process = *shared_worker;
process->shared = true;
if (server->is_user_worker()) {
process->pipe_current = process->pipe_worker;
} else {
process->pipe_current = process->pipe_master;
}
} else {
if (redirect_stdin_and_stdout) {
process->redirect_stdin = true;
process->redirect_stdout = true;
process->redirect_stderr = true;
/**
* Forced to use stream pipe
*/
pipe_type = zend::PIPE_TYPE_STREAM;
}

if (pipe_type > 0) {
int socket_type = pipe_type == zend::PIPE_TYPE_STREAM ? SOCK_STREAM : SOCK_DGRAM;
UnixSocket *_pipe = new UnixSocket(true, socket_type);
if (!_pipe->ready()) {
zend_throw_exception(swoole_exception_ce, "failed to create unix soccket", errno);
delete _pipe;
efree(process);
RETURN_FALSE;
uint32_t base = 1;
if (server && server->is_started()) {
base = server->get_all_worker_num();
}
if (round_process_id == 0) {
round_process_id = base;
}
process->id = round_process_id++;
process->shared = false;

if (pipe_type > 0) {
int socket_type = pipe_type == zend::PIPE_TYPE_STREAM ? SOCK_STREAM : SOCK_DGRAM;
UnixSocket *_pipe = new UnixSocket(true, socket_type);
if (!_pipe->ready()) {
zend_throw_exception(swoole_exception_ce, "failed to create unix soccket", errno);
delete _pipe;
efree(process);
RETURN_FALSE;
}

process->pipe_master = _pipe->get_socket(true);
process->pipe_worker = _pipe->get_socket(false);
process->pipe_master = _pipe->get_socket(true);
process->pipe_worker = _pipe->get_socket(false);

process->pipe_object = _pipe;
process->pipe_current = process->pipe_master;
process->pipe_object = _pipe;
process->pipe_current = process->pipe_master;

zend_update_property_long(
swoole_process_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("pipe"), process->pipe_master->fd);
zend_update_property_long(
swoole_process_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("pipe"), process->pipe_master->fd);
}
}

zend::Process *proc = new zend::Process((enum zend::PipeType) pipe_type, enable_coroutine);
Expand Down
4 changes: 2 additions & 2 deletions ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,10 @@ static PHP_METHOD(swoole_process_pool, start) {

if (pp->onWorkerStart == nullptr && pp->onMessage == nullptr) {
if (pool->async) {
php_swoole_fatal_error(E_ERROR, "require onWorkerStart callback");
php_swoole_fatal_error(E_ERROR, "require 'onWorkerStart' callback");
RETURN_FALSE;
} else if (pool->ipc_mode != SW_IPC_NONE && pp->onMessage == nullptr) {
php_swoole_fatal_error(E_ERROR, "require onMessage callback");
php_swoole_fatal_error(E_ERROR, "require 'onMessage' callback");
RETURN_FALSE;
}
}
Expand Down
20 changes: 15 additions & 5 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void php_swoole_server_rshutdown() {
serv->drain_worker_pipe();

if (serv->is_started() && serv->worker_is_running() && !serv->is_user_worker()) {
serv->abort_worker(sw_worker());
if (php_swoole_is_fatal_error()) {
swoole_error_log(SW_LOG_ERROR,
SW_ERROR_PHP_FATAL_ERROR,
Expand Down Expand Up @@ -1579,10 +1580,19 @@ static void php_swoole_server_onWorkerExit(Server *serv, Worker *worker) {
}

static void php_swoole_server_onUserWorkerStart(Server *serv, Worker *worker) {
zval *object = (zval *) worker->ptr;
zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(object), ZEND_STRL("id"), worker->id);

zval *object;
zval *zserv = php_swoole_server_zval_ptr(serv);

if (serv->is_thread_mode()) {
ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
int index = worker->id - serv->worker_num - serv->task_worker_num;
object = server_object->property->user_processes[index];
serv->get_worker_message_bus()->set_allocator(sw_zend_string_allocator());
} else {
object = (zval *) worker->ptr;
}

zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(object), ZEND_STRL("id"), worker->id);
zend_update_property_long(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("master_pid"), serv->gs->master_pid);
zend_update_property_long(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("manager_pid"), serv->gs->manager_pid);

Expand Down Expand Up @@ -2559,6 +2569,7 @@ static PHP_METHOD(swoole_server, addProcess) {
}
worker_id = swoole_get_process_id();
worker = serv->get_worker(worker_id);
worker->redirect_stdin = worker->redirect_stdout = worker->redirect_stderr = 0;
worker_id -= (serv->worker_num + serv->task_worker_num);
} else {
worker = php_swoole_process_get_and_check_worker(process);
Expand All @@ -2567,9 +2578,8 @@ static PHP_METHOD(swoole_server, addProcess) {
php_swoole_fatal_error(E_WARNING, "failed to add worker");
RETURN_FALSE;
}
worker->ptr = process;
}

worker->ptr = process;
zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(process), ZEND_STRL("id"), worker_id);
RETURN_LONG(worker_id);
}
Expand Down
2 changes: 0 additions & 2 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,7 @@ struct ThreadGlobal {
String *buffer_stack;
Reactor *reactor;
Timer *timer;
#ifdef SW_THREAD
MessageBus *message_bus;
#endif
AsyncThreads *async_threads;
#ifdef SW_USE_IOURING
AsyncIouring *async_iouring;
Expand Down
1 change: 1 addition & 0 deletions include/swoole_atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ typedef sw_atomic_uint64_t sw_atomic_ulong_t;
typedef sw_atomic_uint32_t sw_atomic_t;

#define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set)
#define sw_atomic_value_cmp_set(value, expected, set) __sync_val_compare_and_swap(value, expected, set)
#define sw_atomic_fetch_add(value, add) __sync_fetch_and_add(value, add)
#define sw_atomic_fetch_sub(value, sub) __sync_fetch_and_sub(value, sub)
#define sw_atomic_memory_barrier() __sync_synchronize()
Expand Down
15 changes: 12 additions & 3 deletions include/swoole_message_bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class MessageBus {
private:
const Allocator *allocator_;
std::unordered_map<uint64_t, std::shared_ptr<String>> packet_pool_;
std::vector<network::Socket *> pipe_sockets_;
std::function<uint64_t(void)> id_generator_;
size_t buffer_size_;
PipeBuffer *buffer_ = nullptr;
Expand All @@ -76,9 +77,7 @@ class MessageBus {
buffer_size_ = SW_BUFFER_SIZE_STD;
}

~MessageBus() {

}
~MessageBus();

bool empty() {
return packet_pool_.empty();
Expand Down Expand Up @@ -131,6 +130,7 @@ class MessageBus {
*/
void free_buffer() {
allocator_->free(buffer_);
buffer_ = nullptr;
}

void pass(SendData *task) {
Expand All @@ -146,6 +146,7 @@ class MessageBus {
/**
* Send data to socket. If the data sent is larger than Server::ipc_max_size, then it is sent in chunks.
* Otherwise send it directly.
* When sending data in multi-thread environment, must use get_pipe_socket() to separate socket memory.
* @return: send success returns true, send failure returns false.
*/
bool write(network::Socket *sock, SendData *packet);
Expand Down Expand Up @@ -190,5 +191,13 @@ class MessageBus {
packet_pool_.erase(buffer_->info.msg_id);
}
}
/**
* It is possible to operate the same pipe in multiple threads.
* Each thread must have a unique buffer and the socket memory must be separated.
*/
network::Socket *get_pipe_socket(network::Socket *sock) {
return pipe_sockets_[sock->get_fd()];
}
void init_pipe_socket(network::Socket *sock);
};
} // namespace swoole
1 change: 1 addition & 0 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct Worker {
WorkerId id;
ProcessPool *pool;
MsgQueue *queue;
bool shared;

bool redirect_stdout;
bool redirect_stdin;
Expand Down
Loading
Loading