diff --git a/core-tests/src/server/server.cpp b/core-tests/src/server/server.cpp index 0b2f5574787..0648801c164 100644 --- a/core-tests/src/server/server.cpp +++ b/core-tests/src/server/server.cpp @@ -163,6 +163,50 @@ TEST(server, process) { delete lock; } +TEST(server, thread) { + Server serv(Server::MODE_THREAD); + serv.worker_num = 2; + + sw_logger()->set_level(SW_LOG_WARNING); + + ListenPort *port = serv.add_port(SW_SOCK_TCP, TEST_HOST, 0); + ASSERT_TRUE(port); + + mutex lock; + lock.lock(); + + ASSERT_EQ(serv.create(), SW_OK); + + std::thread t1([&]() { + swoole_signal_block_all(); + + lock.lock(); + + network::SyncClient c(SW_SOCK_TCP); + c.connect(TEST_HOST, port->port); + c.send(packet, strlen(packet)); + char buf[1024]; + c.recv(buf, sizeof(buf)); + c.close(); + + serv.shutdown(); + }); + + serv.onWorkerStart = [&lock](Server *serv, Worker *worker) { lock.unlock(); }; + + serv.onReceive = [](Server *serv, RecvData *req) -> int { + EXPECT_EQ(string(req->data, req->info.len), string(packet)); + + string resp = string("Server: ") + string(packet); + serv->send(req->info.fd, resp.c_str(), resp.length()); + + return SW_OK; + }; + + serv.start(); + t1.join(); +} + TEST(server, reload_all_workers) { Server serv(Server::MODE_PROCESS); serv.worker_num = 2; diff --git a/ext-src/swoole_process.cc b/ext-src/swoole_process.cc index 969dfee9945..3cce2a1dd55 100644 --- a/ext-src/swoole_process.cc +++ b/ext-src/swoole_process.cc @@ -33,7 +33,8 @@ using namespace swoole; zend_class_entry *swoole_process_ce; static zend_object_handlers swoole_process_handlers; -static uint32_t php_swoole_worker_round_id = 0; +static uint32_t round_process_id = 0; +static thread_local uint32_t server_user_worker_id = 0; static zend_fcall_info_cache *signal_fci_caches[SW_SIGNO_MAX] = {}; struct ProcessObject { @@ -45,6 +46,10 @@ static sw_inline ProcessObject *php_swoole_process_fetch_object(zend_object *obj return (ProcessObject *) ((char *) obj - swoole_process_handlers.offset); } +static sw_inline ProcessObject *php_swoole_process_fetch_object(zval *zobj) { + return php_swoole_process_fetch_object(Z_OBJ_P(zobj)); +} + Worker *php_swoole_process_get_worker(zval *zobject) { return php_swoole_process_fetch_object(Z_OBJ_P(zobject))->worker; } @@ -67,19 +72,17 @@ static void php_swoole_process_free_object(zend_object *object) { if (worker) { UnixSocket *_pipe = worker->pipe_object; - if (_pipe) { + if (_pipe && !worker->shared) { delete _pipe; } - if (worker->queue) { delete worker->queue; } - zend::Process *proc = (zend::Process *) worker->ptr2; if (proc) { delete proc; } - efree(worker); + delete worker; } zend_object_std_dtor(object); @@ -237,9 +240,10 @@ void php_swoole_process_minit(int module_number) { } static PHP_METHOD(swoole_process, __construct) { - Worker *process = php_swoole_process_get_worker(ZEND_THIS); + auto po = php_swoole_process_fetch_object(ZEND_THIS); + Server *server = sw_server(); - if (process) { + if (po->worker) { zend_throw_error(NULL, "Constructor of %s can only be called once", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS)); RETURN_FALSE; } @@ -250,7 +254,7 @@ static PHP_METHOD(swoole_process, __construct) { RETURN_FALSE; } - if (sw_server() && sw_server()->is_started() && sw_server()->is_master()) { + if (server && server->is_started() && server->is_master()) { zend_throw_error(NULL, "%s can't be used in master process", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS)); RETURN_FALSE; } @@ -265,6 +269,9 @@ static PHP_METHOD(swoole_process, __construct) { zend_long pipe_type = zend::PIPE_TYPE_DGRAM; zend_bool enable_coroutine = false; + po->worker = new Worker(); + Worker *process = po->worker; + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 4) Z_PARAM_FUNC(func.fci, func.fci_cache); Z_PARAM_OPTIONAL @@ -273,45 +280,60 @@ static PHP_METHOD(swoole_process, __construct) { Z_PARAM_BOOL(enable_coroutine) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - process = (Worker *) ecalloc(1, sizeof(Worker)); - - uint32_t base = 1; - if (sw_server() && sw_server()->is_started()) { - base = sw_server()->worker_num + sw_server()->task_worker_num + sw_server()->get_user_worker_num(); - } - if (php_swoole_worker_round_id == 0) { - php_swoole_worker_round_id = base; - } - process->id = php_swoole_worker_round_id++; - - if (redirect_stdin_and_stdout) { - process->redirect_stdin = true; - process->redirect_stdout = true; - process->redirect_stderr = true; - /** - * Forced to use stream pipe - */ - pipe_type = zend::PIPE_TYPE_STREAM; - } + if (server && server->is_worker_thread()) { + Worker *shared_worker; + if (server->is_user_worker()) { + shared_worker = server->get_worker(swoole_get_process_id()); + } else { + shared_worker = server->get_worker((server_user_worker_id++) + server->get_core_worker_num()); + } + *process = *shared_worker; + process->shared = true; + if (server->is_user_worker()) { + process->pipe_current = process->pipe_worker; + } else { + process->pipe_current = process->pipe_master; + } + } else { + if (redirect_stdin_and_stdout) { + process->redirect_stdin = true; + process->redirect_stdout = true; + process->redirect_stderr = true; + /** + * Forced to use stream pipe + */ + pipe_type = zend::PIPE_TYPE_STREAM; + } - if (pipe_type > 0) { - int socket_type = pipe_type == zend::PIPE_TYPE_STREAM ? SOCK_STREAM : SOCK_DGRAM; - UnixSocket *_pipe = new UnixSocket(true, socket_type); - if (!_pipe->ready()) { - zend_throw_exception(swoole_exception_ce, "failed to create unix soccket", errno); - delete _pipe; - efree(process); - RETURN_FALSE; + uint32_t base = 1; + if (server && server->is_started()) { + base = server->get_all_worker_num(); + } + if (round_process_id == 0) { + round_process_id = base; } + process->id = round_process_id++; + process->shared = false; + + if (pipe_type > 0) { + int socket_type = pipe_type == zend::PIPE_TYPE_STREAM ? SOCK_STREAM : SOCK_DGRAM; + UnixSocket *_pipe = new UnixSocket(true, socket_type); + if (!_pipe->ready()) { + zend_throw_exception(swoole_exception_ce, "failed to create unix soccket", errno); + delete _pipe; + efree(process); + RETURN_FALSE; + } - process->pipe_master = _pipe->get_socket(true); - process->pipe_worker = _pipe->get_socket(false); + process->pipe_master = _pipe->get_socket(true); + process->pipe_worker = _pipe->get_socket(false); - process->pipe_object = _pipe; - process->pipe_current = process->pipe_master; + process->pipe_object = _pipe; + process->pipe_current = process->pipe_master; - zend_update_property_long( - swoole_process_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("pipe"), process->pipe_master->fd); + zend_update_property_long( + swoole_process_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("pipe"), process->pipe_master->fd); + } } zend::Process *proc = new zend::Process((enum zend::PipeType) pipe_type, enable_coroutine); diff --git a/ext-src/swoole_process_pool.cc b/ext-src/swoole_process_pool.cc index 3a79859c534..ec7f4902994 100644 --- a/ext-src/swoole_process_pool.cc +++ b/ext-src/swoole_process_pool.cc @@ -502,10 +502,10 @@ static PHP_METHOD(swoole_process_pool, start) { if (pp->onWorkerStart == nullptr && pp->onMessage == nullptr) { if (pool->async) { - php_swoole_fatal_error(E_ERROR, "require onWorkerStart callback"); + php_swoole_fatal_error(E_ERROR, "require 'onWorkerStart' callback"); RETURN_FALSE; } else if (pool->ipc_mode != SW_IPC_NONE && pp->onMessage == nullptr) { - php_swoole_fatal_error(E_ERROR, "require onMessage callback"); + php_swoole_fatal_error(E_ERROR, "require 'onMessage' callback"); RETURN_FALSE; } } diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index f9a34cae8a2..1097a367090 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -100,6 +100,7 @@ void php_swoole_server_rshutdown() { serv->drain_worker_pipe(); if (serv->is_started() && serv->worker_is_running() && !serv->is_user_worker()) { + serv->abort_worker(sw_worker()); if (php_swoole_is_fatal_error()) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_PHP_FATAL_ERROR, @@ -1579,10 +1580,19 @@ static void php_swoole_server_onWorkerExit(Server *serv, Worker *worker) { } static void php_swoole_server_onUserWorkerStart(Server *serv, Worker *worker) { - zval *object = (zval *) worker->ptr; - zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(object), ZEND_STRL("id"), worker->id); - + zval *object; zval *zserv = php_swoole_server_zval_ptr(serv); + + if (serv->is_thread_mode()) { + ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv)); + int index = worker->id - serv->worker_num - serv->task_worker_num; + object = server_object->property->user_processes[index]; + serv->get_worker_message_bus()->set_allocator(sw_zend_string_allocator()); + } else { + object = (zval *) worker->ptr; + } + + zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(object), ZEND_STRL("id"), worker->id); zend_update_property_long(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("master_pid"), serv->gs->master_pid); zend_update_property_long(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("manager_pid"), serv->gs->manager_pid); @@ -2559,6 +2569,7 @@ static PHP_METHOD(swoole_server, addProcess) { } worker_id = swoole_get_process_id(); worker = serv->get_worker(worker_id); + worker->redirect_stdin = worker->redirect_stdout = worker->redirect_stderr = 0; worker_id -= (serv->worker_num + serv->task_worker_num); } else { worker = php_swoole_process_get_and_check_worker(process); @@ -2567,9 +2578,8 @@ static PHP_METHOD(swoole_server, addProcess) { php_swoole_fatal_error(E_WARNING, "failed to add worker"); RETURN_FALSE; } + worker->ptr = process; } - - worker->ptr = process; zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(process), ZEND_STRL("id"), worker_id); RETURN_LONG(worker_id); } diff --git a/include/swoole.h b/include/swoole.h index eb9f4269703..3ae29b22107 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -691,9 +691,7 @@ struct ThreadGlobal { String *buffer_stack; Reactor *reactor; Timer *timer; -#ifdef SW_THREAD MessageBus *message_bus; -#endif AsyncThreads *async_threads; #ifdef SW_USE_IOURING AsyncIouring *async_iouring; diff --git a/include/swoole_atomic.h b/include/swoole_atomic.h index 6fe231b4eb3..abf49880182 100644 --- a/include/swoole_atomic.h +++ b/include/swoole_atomic.h @@ -27,6 +27,7 @@ typedef sw_atomic_uint64_t sw_atomic_ulong_t; typedef sw_atomic_uint32_t sw_atomic_t; #define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set) +#define sw_atomic_value_cmp_set(value, expected, set) __sync_val_compare_and_swap(value, expected, set) #define sw_atomic_fetch_add(value, add) __sync_fetch_and_add(value, add) #define sw_atomic_fetch_sub(value, sub) __sync_fetch_and_sub(value, sub) #define sw_atomic_memory_barrier() __sync_synchronize() diff --git a/include/swoole_message_bus.h b/include/swoole_message_bus.h index 9ceceab6bd8..faae0d1d69d 100644 --- a/include/swoole_message_bus.h +++ b/include/swoole_message_bus.h @@ -62,6 +62,7 @@ class MessageBus { private: const Allocator *allocator_; std::unordered_map> packet_pool_; + std::vector pipe_sockets_; std::function id_generator_; size_t buffer_size_; PipeBuffer *buffer_ = nullptr; @@ -76,9 +77,7 @@ class MessageBus { buffer_size_ = SW_BUFFER_SIZE_STD; } - ~MessageBus() { - - } + ~MessageBus(); bool empty() { return packet_pool_.empty(); @@ -131,6 +130,7 @@ class MessageBus { */ void free_buffer() { allocator_->free(buffer_); + buffer_ = nullptr; } void pass(SendData *task) { @@ -146,6 +146,7 @@ class MessageBus { /** * Send data to socket. If the data sent is larger than Server::ipc_max_size, then it is sent in chunks. * Otherwise send it directly. + * When sending data in multi-thread environment, must use get_pipe_socket() to separate socket memory. * @return: send success returns true, send failure returns false. */ bool write(network::Socket *sock, SendData *packet); @@ -190,5 +191,13 @@ class MessageBus { packet_pool_.erase(buffer_->info.msg_id); } } + /** + * It is possible to operate the same pipe in multiple threads. + * Each thread must have a unique buffer and the socket memory must be separated. + */ + network::Socket *get_pipe_socket(network::Socket *sock) { + return pipe_sockets_[sock->get_fd()]; + } + void init_pipe_socket(network::Socket *sock); }; } // namespace swoole diff --git a/include/swoole_process_pool.h b/include/swoole_process_pool.h index cda0d47900d..f95b61c9b61 100644 --- a/include/swoole_process_pool.h +++ b/include/swoole_process_pool.h @@ -117,6 +117,7 @@ struct Worker { WorkerId id; ProcessPool *pool; MsgQueue *queue; + bool shared; bool redirect_stdout; bool redirect_stdin; diff --git a/include/swoole_server.h b/include/swoole_server.h index 91ff5e1beb4..9251aa04768 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -152,9 +152,7 @@ struct ReactorThread { int id; std::thread thread; network::Socket *notify_pipe = nullptr; - uint32_t pipe_num = 0; uint64_t dispatch_count = 0; - network::Socket *pipe_sockets = nullptr; network::Socket *pipe_command = nullptr; MessageBus message_bus; @@ -434,6 +432,7 @@ class BaseFactory : public Factory { bool finish(SendData *) override; bool notify(DataHead *) override; bool end(SessionId sesion_id, int flags) override; + bool forward_message(Session *session, SendData *data); }; class ProcessFactory : public Factory { @@ -458,6 +457,8 @@ class ThreadFactory : public BaseFactory { template void create_thread(int i, _Callable fn); void at_thread_exit(Worker *worker); + void create_message_bus(); + void destroy_message_bus(); public: ThreadFactory(Server *server); ~ThreadFactory(); @@ -533,6 +534,7 @@ class Server { }; enum ThreadType { + THREAD_NORMAL = 0, THREAD_MASTER = 1, THREAD_REACTOR = 2, THREAD_HEARTBEAT = 3, @@ -801,17 +803,18 @@ class Server { return connection_list[fd].socket; } - /** - * [ReactorThread] - */ - network::Socket *get_worker_pipe_socket(Worker *worker) { - return &get_thread(SwooleTG.id)->pipe_sockets[worker->pipe_master->fd]; - } - network::Socket *get_command_reply_socket() { return is_base_mode() ? get_worker(0)->pipe_master : pipe_command->get_socket(false); } + network::Socket *get_worker_pipe_master(WorkerId id) { + return get_worker(id)->pipe_master; + } + + network::Socket *get_worker_pipe_worker(WorkerId id) { + return get_worker(id)->pipe_worker; + } + /** * [Worker|Master] */ @@ -982,6 +985,8 @@ class Server { bool add_command(const std::string &command, int accepted_process_types, const Command::Handler &func); Connection *add_connection(ListenPort *ls, network::Socket *_socket, int server_fd); void abort_connection(Reactor *reactor, ListenPort *ls, network::Socket *_socket); + void abort_worker(Worker *worker); + void reset_worker_counter(Worker *worker); int connection_incoming(Reactor *reactor, Connection *conn); int get_idle_worker_num(); @@ -1138,13 +1143,17 @@ class Server { } size_t get_all_worker_num() { - return worker_num + task_worker_num + get_user_worker_num(); + return get_core_worker_num() + get_user_worker_num(); } size_t get_user_worker_num() { return user_worker_list.size(); } + size_t get_core_worker_num() { + return worker_num + task_worker_num; + } + ReactorThread *get_thread(int reactor_id) { return &reactor_threads[reactor_id]; } @@ -1366,6 +1375,7 @@ class Server { void init_port_protocol(ListenPort *port); void init_signal_handler(); void init_ipc_max_size(); + void init_pipe_sockets(MessageBus *mb); void set_max_connection(uint32_t _max_connection); @@ -1460,6 +1470,7 @@ class Server { static void reactor_thread_main_loop(Server *serv, int reactor_id); static bool task_pack(EventData *task, const void *data, size_t data_len); static bool task_unpack(EventData *task, String *buffer, PacketPtr *packet); + static void master_signal_handler(int signo); int start_master_thread(Reactor *reactor); int start_event_worker(Worker *worker); diff --git a/src/protocol/message_bus.cc b/src/protocol/message_bus.cc index 87a3bdd299a..0f6676daf09 100644 --- a/src/protocol/message_bus.cc +++ b/src/protocol/message_bus.cc @@ -229,7 +229,7 @@ bool MessageBus::write(Socket *sock, SendData *resp) { iov[1].iov_base = (void *) payload; iov[1].iov_len = l_payload; - if (send_fn(sock, iov, 2) == (ssize_t)(sizeof(resp->info) + l_payload)) { + if (send_fn(sock, iov, 2) == (ssize_t) (sizeof(resp->info) + l_payload)) { return true; } if (sock->catch_write_pipe_error(errno) == SW_REDUCE_SIZE && max_length > SW_BUFFER_SIZE_STD) { @@ -287,4 +287,26 @@ size_t MessageBus::get_memory_size() { return size; } +void MessageBus::init_pipe_socket(network::Socket *sock) { + int pipe_fd = sock->get_fd(); + if ((size_t) pipe_fd >= pipe_sockets_.size()) { + pipe_sockets_.resize(pipe_fd + 1); + } + auto _socket = make_socket(pipe_fd, SW_FD_PIPE); + _socket->buffer_size = UINT_MAX; + if (!_socket->nonblock) { + _socket->set_nonblock(); + } + pipe_sockets_[pipe_fd] = _socket; +} + +MessageBus::~MessageBus() { + for (auto _socket : pipe_sockets_) { + if (_socket) { + _socket->fd = -1; + _socket->free(); + } + } +} + } // namespace swoole diff --git a/src/server/base.cc b/src/server/base.cc index 9ce1964e15b..8e8a465287a 100644 --- a/src/server/base.cc +++ b/src/server/base.cc @@ -143,12 +143,7 @@ bool BaseFactory::end(SessionId session_id, int flags) { session_id, session->fd, session->reactor_id); - Worker *worker = server_->get_worker(session->reactor_id); - if (worker->pipe_master->send_async((const char *) &_send.info, sizeof(_send.info)) < 0) { - swoole_sys_warning("failed to send %lu bytes to pipe_master", sizeof(_send.info)); - return false; - } - return true; + return forward_message(session, &_send); } Connection *conn = server_->get_connection_verify_no_ssl(session_id); @@ -215,29 +210,34 @@ bool BaseFactory::finish(SendData *data) { session_id, session->fd, session->reactor_id); - Worker *worker = server_->gs->event_workers.get_worker(session->reactor_id); - EventData proxy_msg{}; - - if (data->info.type == SW_SERVER_EVENT_SEND_DATA) { - if (!server_->get_worker_message_bus()->write(worker->pipe_master, data)) { - swoole_sys_warning("failed to send %u bytes to pipe_master", data->info.len); - return false; - } - swoole_trace( - "proxy message, fd=%d, len=%ld", worker->pipe_master->fd, sizeof(proxy_msg.info) + proxy_msg.info.len); - } else if (data->info.type == SW_SERVER_EVENT_SEND_FILE) { - memcpy(&proxy_msg.info, &data->info, sizeof(proxy_msg.info)); - memcpy(proxy_msg.data, data->data, data->info.len); - size_t __len = sizeof(proxy_msg.info) + proxy_msg.info.len; - return worker->pipe_master->send_async((const char *) &proxy_msg, __len); + + if (data->info.type == SW_SERVER_EVENT_SEND_DATA || data->info.type == SW_SERVER_EVENT_SEND_FILE) { + return forward_message(session, data); } else { swoole_warning("unknown event type[%d]", data->info.type); return false; } - return true; } else { return server_->send_to_connection(data) == SW_OK; } } +bool BaseFactory::forward_message(Session *session, SendData *data) { + Worker *worker = server_->gs->event_workers.get_worker(session->reactor_id); + swoole_trace_log(SW_TRACE_SERVER, + "fd=%d, worker_id=%d, type=%d, len=%ld", + worker->pipe_master->get_fd(), + session->reactor_id, + data->info.type, + data->info.len); + + auto mb = server_->get_worker_message_bus(); + auto sock = server_->is_thread_mode() ? mb->get_pipe_socket(worker->pipe_master) : worker->pipe_master; + if (!mb->write(sock, data)) { + swoole_sys_warning("failed to send %u bytes to pipe_master", data->info.len); + return false; + } + return true; +} + } // namespace swoole diff --git a/src/server/master.cc b/src/server/master.cc index ed93a6c4847..a6d99b1e61a 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -29,8 +29,6 @@ swoole::Server *g_server_instance = nullptr; namespace swoole { -static void Server_signal_handler(int sig); - TimerCallback Server::get_timeout_callback(ListenPort *port, Reactor *reactor, Connection *conn) { return [this, port, conn, reactor](Timer *, TimerNode *) { if (conn->protect) { @@ -949,6 +947,9 @@ void Server::stop_master_thread() { if (is_thread_mode()) { stop_worker_threads(); } + if (is_process_mode() && single_thread) { + get_thread(0)->shutdown(reactor); + } } bool Server::signal_handler_shutdown() { @@ -1610,18 +1611,18 @@ void Server::init_signal_handler() { swoole_signal_set(SIGPIPE, nullptr); swoole_signal_set(SIGHUP, nullptr); if (is_process_mode()) { - swoole_signal_set(SIGCHLD, Server_signal_handler); + swoole_signal_set(SIGCHLD, master_signal_handler); } else { - swoole_signal_set(SIGIO, Server_signal_handler); + swoole_signal_set(SIGIO, master_signal_handler); } - swoole_signal_set(SIGUSR1, Server_signal_handler); - swoole_signal_set(SIGUSR2, Server_signal_handler); - swoole_signal_set(SIGTERM, Server_signal_handler); + swoole_signal_set(SIGUSR1, master_signal_handler); + swoole_signal_set(SIGUSR2, master_signal_handler); + swoole_signal_set(SIGTERM, master_signal_handler); #ifdef SIGRTMIN - swoole_signal_set(SIGRTMIN, Server_signal_handler); + swoole_signal_set(SIGRTMIN, master_signal_handler); #endif // for test - swoole_signal_set(SIGVTALRM, Server_signal_handler); + swoole_signal_set(SIGVTALRM, master_signal_handler); if (SwooleG.signal_fd > 0) { set_minfd(SwooleG.signal_fd); @@ -1838,7 +1839,7 @@ ListenPort *Server::add_port(SocketType type, const char *host, int port) { return ls; } -static void Server_signal_handler(int sig) { +void Server::master_signal_handler(int signo) { swoole_trace_log(SW_TRACE_SERVER, "signal[%d] %s triggered in %d", sig, swoole_signal_to_str(sig), getpid()); Server *serv = sw_server(); @@ -1846,7 +1847,7 @@ static void Server_signal_handler(int sig) { return; } - switch (sig) { + switch (signo) { case SIGTERM: serv->signal_handler_shutdown(); break; @@ -1858,14 +1859,14 @@ static void Server_signal_handler(int sig) { break; case SIGUSR1: case SIGUSR2: - serv->signal_handler_reload(sig == SIGUSR1); + serv->signal_handler_reload(signo == SIGUSR1); break; case SIGIO: serv->signal_handler_read_message(); break; default: #ifdef SIGRTMIN - if (sig == SIGRTMIN) { + if (signo == SIGRTMIN) { serv->signal_handler_reopen_logger(); } #endif @@ -1892,6 +1893,28 @@ void Server::abort_connection(Reactor *reactor, ListenPort *ls, Socket *_socket) } } +void Server::reset_worker_counter(Worker *worker) { + auto value = worker->concurrency; + if (value > 0 && sw_atomic_value_cmp_set(&worker->concurrency, value, 0) == value) { + sw_atomic_sub_fetch(&gs->concurrency, worker->concurrency); + } + worker->request_count = 0; + worker->response_count = 0; + worker->dispatch_count = 0; +} + +void Server::abort_worker(Worker *worker) { + reset_worker_counter(worker); + if (!is_process_mode()) { + SW_LOOP_N(SW_SESSION_LIST_SIZE) { + Session *session = get_session(i); + if (session->reactor_id == worker->id) { + session->fd = 0; + } + } + } +} + /** * new connection */ @@ -2004,6 +2027,17 @@ void Server::init_ipc_max_size() { #endif } +void Server::init_pipe_sockets(MessageBus *mb) { + assert(is_started()); + size_t n = get_core_worker_num(); + + SW_LOOP_N(n) { + Worker *worker = get_worker(i); + mb->init_pipe_socket(worker->pipe_master); + mb->init_pipe_socket(worker->pipe_worker); + } +} + /** * allocate memory for Server::pipe_buffers */ diff --git a/src/server/process.cc b/src/server/process.cc index 624bb037c80..1d8704122b8 100644 --- a/src/server/process.cc +++ b/src/server/process.cc @@ -117,15 +117,8 @@ pid_t Factory::spawn_event_worker(Worker *worker) { } // see https://github.com/swoole/swoole-src/issues/5407 - if (worker->concurrency > 0 && server_->worker_num > 1) { - sw_atomic_sub_fetch(&server_->gs->concurrency, worker->concurrency); - worker->concurrency = 0; - } - // see https://github.com/swoole/swoole-src/issues/5432 - worker->request_count = 0; - worker->response_count = 0; - worker->dispatch_count = 0; + server_->reset_worker_counter(worker); if (server_->is_base_mode()) { server_->gs->connection_nums[worker->id] = 0; @@ -286,10 +279,18 @@ bool ProcessFactory::dispatch(SendData *task) { SendData _task; memcpy(&_task, task, sizeof(SendData)); + network::Socket *sock; + MessageBus *mb; + + if (server_->is_reactor_thread()) { + mb = &server_->get_thread(swoole_get_thread_id())->message_bus; + sock = mb->get_pipe_socket(worker->pipe_master); + } else { + mb = &server_->message_bus; + sock = worker->pipe_master; + } - network::Socket *pipe_socket = - server_->is_reactor_thread() ? server_->get_worker_pipe_socket(worker) : worker->pipe_master; - return server_->message_bus.write(pipe_socket, &_task); + return mb->write(sock, &_task); } static bool inline process_is_supported_send_yield(Server *serv, Connection *conn) { diff --git a/src/server/reactor_thread.cc b/src/server/reactor_thread.cc index e3eb8342cc1..c677732e50c 100644 --- a/src/server/reactor_thread.cc +++ b/src/server/reactor_thread.cc @@ -316,7 +316,16 @@ void ReactorThread::shutdown(Reactor *reactor) { } if (serv->is_thread_mode()) { - reactor->del(serv->get_worker(reactor->id)->pipe_worker); + Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker(reactor->id)); + reactor->del(socket); + } + + SW_LOOP_N(serv->worker_num) { + if (i % serv->reactor_num != reactor->id) { + continue; + } + Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i)); + reactor->del(socket); } serv->foreach_connection([serv, reactor](Connection *conn) { @@ -328,6 +337,10 @@ void ReactorThread::shutdown(Reactor *reactor) { } }); + if (serv->is_thread_mode()) { + serv->stop_async_worker(serv->get_worker(reactor->id)); + } + reactor->set_wait_exit(true); } @@ -389,11 +402,7 @@ static int ReactorThread_onPipeRead(Reactor *reactor, Event *ev) { auto packet = thread->message_bus.get_packet(); serv->call_command_callback(resp->info.fd, std::string(packet.data, packet.length)); } else if (resp->info.type == SW_SERVER_EVENT_SHUTDOWN) { - if (serv->is_thread_mode()) { - serv->stop_async_worker(serv->get_worker(reactor->id)); - } else { - thread->shutdown(reactor); - } + thread->shutdown(reactor); } else if (resp->info.type == SW_SERVER_EVENT_FINISH) { serv->onFinish(serv, (EventData *) resp); } else if (resp->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) { @@ -710,11 +719,6 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) { reactor->wait_exit = 0; reactor->max_socket = serv->get_max_connection(); reactor->close = Server::close_connection; - - reactor->set_exit_condition(Reactor::EXIT_CONDITION_DEFAULT, [this](Reactor *reactor, size_t &event_num) -> bool { - return event_num == (size_t) pipe_num; - }); - reactor->default_error_handler = ReactorThread_onClose; reactor->set_handler(SW_FD_PIPE | SW_EVENT_READ, ReactorThread_onPipeRead); @@ -747,23 +751,17 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) { } serv->init_reactor(reactor); + serv->init_pipe_sockets(&message_bus); + if (serv->is_thread_mode()) { Worker *worker = serv->get_worker(reactor_id); serv->init_worker(worker); - worker->pipe_worker->set_nonblock(); - worker->pipe_worker->buffer_size = UINT_MAX; - reactor->add(worker->pipe_worker, SW_EVENT_READ); - } - - int max_pipe_fd = serv->get_worker(serv->worker_num - 1)->pipe_master->fd + 2; - pipe_sockets = (Socket *) sw_calloc(max_pipe_fd, sizeof(Socket)); - if (!pipe_sockets) { - swoole_sys_error("calloc(%d, %ld) failed", max_pipe_fd, sizeof(Socket)); - return SW_ERR; + auto pipe_worker = message_bus.get_pipe_socket(worker->pipe_worker); + reactor->add(pipe_worker, SW_EVENT_READ); } if (serv->pipe_command) { - pipe_command = make_socket(serv->pipe_command->get_socket(false)->get_fd(), SW_FD_PIPE); + pipe_command = message_bus.get_pipe_socket(serv->pipe_command->get_socket(false)); pipe_command->buffer_size = UINT_MAX; } @@ -775,38 +773,26 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) { } SW_LOOP_N(serv->worker_num) { - int pipe_fd = serv->workers[i].pipe_master->fd; - Socket *socket = &pipe_sockets[pipe_fd]; - - socket->fd = pipe_fd; - socket->fd_type = SW_FD_PIPE; - socket->buffer_size = UINT_MAX; - if (i % serv->reactor_num != reactor_id) { continue; } - - socket->set_nonblock(); - + Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i)); if (reactor->add(socket, SW_EVENT_READ) < 0) { return SW_ERR; } + /** + * It will only send data to the notify pipeline synchronously, + * which is thread-safe and does not require separate memory + */ if (notify_pipe == nullptr) { notify_pipe = serv->workers[i].pipe_worker; } - pipe_num++; } return SW_OK; } void ReactorThread::clean() { - sw_free(pipe_sockets); - if (pipe_command) { - pipe_command->fd = -1; - delete pipe_command; - } - pipe_num = 0; message_bus.free_buffer(); } @@ -816,13 +802,13 @@ void Server::reactor_thread_main_loop(Server *serv, int reactor_id) { ReactorThread *thread = serv->get_thread(reactor_id); thread->id = reactor_id; + SwooleTG.message_bus = &thread->message_bus; if (swoole_event_init(0) < 0) { return; } if (serv->is_thread_mode()) { - SwooleTG.message_bus = &thread->message_bus; serv->call_worker_start_callback(serv->get_worker(reactor_id)); } diff --git a/src/server/task_worker.cc b/src/server/task_worker.cc index 57a17dbe3fc..0c692c3d9ad 100644 --- a/src/server/task_worker.cc +++ b/src/server/task_worker.cc @@ -194,16 +194,6 @@ static void TaskWorker_onStart(ProcessPool *pool, Worker *worker) { SwooleTG.reactor = nullptr; } - if (serv->is_thread_mode()) { - SwooleTG.message_bus = new MessageBus(); - SwooleTG.message_bus->set_id_generator([serv]() { return sw_atomic_fetch_add(&serv->gs->pipe_packet_msg_id, 1); }); - SwooleTG.message_bus->set_buffer_size(serv->ipc_max_size); - SwooleTG.message_bus->set_always_chunked_transfer(); - if (!SwooleTG.message_bus->alloc_buffer()) { - throw std::bad_alloc(); - } - } - TaskWorker_signal_init(pool); serv->worker_start_callback(worker); @@ -225,12 +215,6 @@ static void TaskWorker_onStop(ProcessPool *pool, Worker *worker) { swoole_event_free(); Server *serv = (Server *) pool->ptr; serv->worker_stop_callback(worker); - - if (serv->is_thread_mode() ) { - SwooleTG.message_bus->clear(); - delete SwooleTG.message_bus; - SwooleTG.message_bus = nullptr; - } } /** diff --git a/src/server/thread.cc b/src/server/thread.cc index d117ac19cdb..72e0014b754 100644 --- a/src/server/thread.cc +++ b/src/server/thread.cc @@ -29,6 +29,7 @@ Factory *Server::create_thread_factory() { } reactor_threads = new ReactorThread[reactor_num](); reactor_pipe_num = 1; + worker_thread_start = [](const WorkerFn &fn) { fn(); }; return new ThreadFactory(this); } @@ -75,6 +76,25 @@ void ThreadFactory::at_thread_exit(Worker *worker) { cv_.notify_one(); } +void ThreadFactory::create_message_bus() { + auto mb = new MessageBus(); + auto server = server_; + mb->set_id_generator([server]() { return sw_atomic_fetch_add(&server->gs->pipe_packet_msg_id, 1); }); + mb->set_buffer_size(server->ipc_max_size); + mb->set_always_chunked_transfer(); + if (!mb->alloc_buffer()) { + throw std::bad_alloc(); + } + server_->init_pipe_sockets(mb); + SwooleTG.message_bus = mb; +} + +void ThreadFactory::destroy_message_bus() { + SwooleTG.message_bus->clear(); + delete SwooleTG.message_bus; + SwooleTG.message_bus = nullptr; +} + template void ThreadFactory::create_thread(int i, _Callable fn) { if (threads_[i].joinable()) { @@ -103,6 +123,7 @@ void ThreadFactory::spawn_task_worker(WorkerId i) { swoole_set_thread_type(Server::THREAD_WORKER); swoole_set_process_id(i); swoole_set_thread_id(i); + create_message_bus(); Worker *worker = server_->get_worker(i); worker->type = SW_PROCESS_TASKWORKER; worker->status = SW_WORKER_IDLE; @@ -117,6 +138,7 @@ void ThreadFactory::spawn_task_worker(WorkerId i) { pool->onWorkerStop(pool, worker); } }); + destroy_message_bus(); at_thread_exit(worker); }); } @@ -128,9 +150,11 @@ void ThreadFactory::spawn_user_worker(WorkerId i) { swoole_set_thread_type(Server::THREAD_WORKER); swoole_set_process_id(i); swoole_set_thread_id(i); + create_message_bus(); worker->type = SW_PROCESS_USERWORKER; SwooleWG.worker = worker; server_->worker_thread_start([=]() { server_->onUserWorkerStart(server_, worker); }); + destroy_message_bus(); at_thread_exit(worker); }); } diff --git a/tests/include/functions.php b/tests/include/functions.php index 891e638d8b0..3701b3dee9a 100644 --- a/tests/include/functions.php +++ b/tests/include/functions.php @@ -519,8 +519,11 @@ function pstree() } $y = function ($pid, $path = []) use (&$y, $pinfo) { if (isset($pinfo[$pid])) { - list($ppid,) = $pinfo[$pid]; - $ppid = $ppid; + if (isset($pinfo[$pid][0])) { + list($ppid,) = $pinfo[$pid]; + } else { + $ppid = null; + } $path[] = $pid; return $y($ppid, $path); } else { diff --git a/tests/swoole_thread/server/send_in_user_process.phpt b/tests/swoole_thread/server/send_in_user_process.phpt new file mode 100644 index 00000000000..5cd2890512b --- /dev/null +++ b/tests/swoole_thread/server/send_in_user_process.phpt @@ -0,0 +1,73 @@ +--TEST-- +swoole_thread/server: send in user process +--SKIPIF-- + +--FILE-- +pop(-1); + $reqUid = uniqid(); + Assert::eq(file_get_contents('http://127.0.0.1:' . $port . '/?uid=' . $reqUid), $reqUid); + echo "done\n"; + $serv->shutdown(); +}); +$serv->addProcess($proc); + +$proc2 = new Swoole\Process(function ($process) use ($serv) { + $json = $process->read(); + $data = json_decode($json, true); + $response = Swoole\Http\Response::create($data['fd']); + $response->end($data['uid']); + $response->close(); +}); +$serv->addProcess($proc2); + +$serv->set(array( + 'worker_num' => 1, + 'log_level' => SWOOLE_LOG_ERROR, + 'init_arguments' => function () { + global $queue, $atomic; + $queue = new Swoole\Thread\Queue(); + $atomic = new Swoole\Thread\Atomic(0); + return [$queue, $atomic]; + } +)); +$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) { + [$queue, $atomic] = Thread::getArguments(); + $atomic->add(); + $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); +}); +$serv->on('WorkerStop', function (Swoole\Server $serv, $workerId) { + [$queue, $atomic] = Thread::getArguments(); + $atomic->add(); +}); +$serv->on('Request', function ($req, $resp) use ($serv, $proc2) { + $resp->detach(); + $proc2->write(json_encode(['fd' => $resp->fd, 'uid' => $req->get['uid']])); +}); +$serv->on('shutdown', function () { + global $queue, $atomic; + Assert::eq($atomic->get(), 2); + echo "shutdown\n"; +}); + +$serv->start(); +?> +--EXPECT-- +begin +done +shutdown