Skip to content

Commit

Permalink
onPipeMessage/onTask/onFinish, fix message bus
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Apr 8, 2024
1 parent 993bcdb commit e824c86
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 44 deletions.
14 changes: 14 additions & 0 deletions examples/thread/argv.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

use Swoole\Thread;

$args = Thread::getArguments();

if (empty($args)) {
var_dump($GLOBALS['argv']);
$thread = Thread::exec(__FILE__, 'thread-1', $argc, $argv);
$thread->join();
} else {
var_dump($args[0], $args[1], $args[2]);
sleep(1);
}
31 changes: 25 additions & 6 deletions examples/thread/thread_server.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<?php
$http = new Swoole\Http\Server("0.0.0.0", 9503);
$http->set([
'worker_num' => 4,
// 'task_worker_num' => 3,
'enable_coroutine' => false,
'worker_num' => 2,
'task_worker_num' => 3,
'enable_coroutine' => true,
'hook_flags' => SWOOLE_HOOK_ALL,
// 'trace_flags' => SWOOLE_TRACE_SERVER,
// 'log_level' => SWOOLE_LOG_TRACE,
'init_arguments' => function () use ($http) {
Expand All @@ -14,20 +15,38 @@

$http->on('Request', function ($req, $resp) use ($http) {
// $resp->end("tid=" . \Swoole\Thread::getId() . ', fd=' . $req->fd);
if ($req->server['request_uri'] == '/task') {
$http->task(['code' => uniqid()]);
} elseif ($req->server['request_uri'] == '/msg') {
$dstWorkerId = random_int(0, 4);
if ($dstWorkerId != $http->getWorkerId()) {
$http->sendMessage('hello ' . base64_encode(random_bytes(16)), $dstWorkerId);
echo "[worker#" . $http->getWorkerId() . "]\tsend pipe message to " . $dstWorkerId . "\n";
}
}
$resp->end('hello world');
});

$http->on('pipeMessage', function ($http, $srcWorkerId, $msg) {
echo "[worker#" . $http->getWorkerId() . "]\treceived pipe message[$msg] from " . $srcWorkerId . "\n";
});

//$http->addProcess(new \Swoole\Process(function () {
// echo "user process, id=" . \Swoole\Thread::getId();
// sleep(2000);
//}));

$http->on('Task', function () {
var_dump(func_get_args());
$http->on('Task', function ($server, $taskId, $srcWorkerId, $data) {
var_dump($taskId, $srcWorkerId, $data);
return ['result' => uniqid()];
});

$http->on('Finish', function ($server, $taskId, $data) {
var_dump($taskId, $data);
});

$http->on('WorkerStart', function ($serv, $wid) {
var_dump(\Swoole\Thread::getArguments());
var_dump(\Swoole\Thread::getArguments(), $wid);
});

$http->on('WorkerStop', function ($serv, $wid) {
Expand Down
6 changes: 4 additions & 2 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ void php_swoole_get_recv_data(Server *serv, zval *zdata, RecvData *req) {
} else {
if (req->info.flags & SW_EVENT_DATA_OBJ_PTR) {
zend::assign_zend_string_by_val(zdata, (char *) data, length);
serv->message_bus.move_packet();
serv->get_worker_message_bus()->move_packet();
} else if (req->info.flags & SW_EVENT_DATA_POP_PTR) {
String *recv_buffer = serv->get_recv_buffer(serv->get_connection_by_session_id(req->info.fd)->socket);
zend::assign_zend_string_by_val(zdata, recv_buffer->pop(serv->recv_buffer_size), length);
Expand Down Expand Up @@ -834,7 +834,7 @@ void ServerObject::on_before_start() {

serv->message_bus.set_allocator(sw_zend_string_allocator());

if (serv->is_base_mode()) {
if (serv->is_base_mode() || serv->is_thread_mode()) {
serv->recv_buffer_allocator = sw_zend_string_allocator();
}

Expand Down Expand Up @@ -1493,6 +1493,8 @@ static void php_swoole_server_onWorkerStart(Server *serv, Worker *worker) {
PHPCoroutine::disable_hook();
}

serv->get_worker_message_bus()->set_allocator(sw_zend_string_allocator());

zval args[2];
args[0] = *zserv;
ZVAL_LONG(&args[1], worker->id);
Expand Down
1 change: 1 addition & 0 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ struct ProcessPool {
int get_max_request();
bool detach();
int wait();
int start_check();
int start();
void shutdown();
bool reload();
Expand Down
10 changes: 9 additions & 1 deletion include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,14 @@ class Server {
return buffer;
}

MessageBus *get_worker_message_bus() {
#ifdef SW_THREAD
return sw_likely(is_thread_mode()) ? &get_thread(swoole_get_thread_id())->message_bus : &message_bus;
#else
return &message_bus;
#endif
}

uint32_t get_worker_buffer_num() {
return is_base_mode() ? 1 : reactor_num + dgram_port_num;
}
Expand Down Expand Up @@ -1408,11 +1416,11 @@ class Server {
void worker_start_callback(Worker *worker);
void worker_stop_callback(Worker *worker);
void worker_accept_event(DataHead *info);
void worker_signal_init(void);
std::function<void(const WorkerFn &fn)> worker_thread_start;

static int worker_main_loop(ProcessPool *pool, Worker *worker);
static void worker_signal_handler(int signo);
static void worker_signal_init(void);
static void reactor_thread_main_loop(Server *serv, int reactor_id);
static bool task_pack(EventData *task, const void *data, size_t data_len);
static bool task_unpack(EventData *task, String *buffer, PacketPtr *packet);
Expand Down
21 changes: 13 additions & 8 deletions src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,12 @@ void ProcessPool::set_protocol(enum ProtocolType _protocol_type) {
protocol_type_ = _protocol_type;
}

/**
* start workers
*/
int ProcessPool::start() {
int ProcessPool::start_check() {
if (ipc_mode == SW_IPC_SOCKET && (stream_info_ == nullptr || stream_info_->socket == 0)) {
swoole_warning("must first listen to an tcp port");
return SW_ERR;
}

uint32_t i;
running = started = true;
master_pid = getpid();
reload_workers = new Worker[worker_num]();
Expand All @@ -239,7 +235,7 @@ int ProcessPool::start() {
main_loop = ProcessPool_worker_loop_async;
}

for (i = 0; i < worker_num; i++) {
SW_LOOP_N(worker_num) {
workers[i].pool = this;
workers[i].id = start_id + i;
workers[i].type = type;
Expand All @@ -251,12 +247,21 @@ int ProcessPool::start() {
}
}

for (i = 0; i < worker_num; i++) {
return SW_OK;
}

/**
* start workers
*/
int ProcessPool::start() {
if (start_check() < 0) {
return SW_ERR;
}
SW_LOOP_N(worker_num) {
if (spawn(&(workers[i])) < 0) {
return SW_ERR;
}
}

return SW_OK;
}

Expand Down
9 changes: 2 additions & 7 deletions src/server/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,7 @@ bool BaseFactory::dispatch(SendData *task) {
}
}

#ifdef SW_THREAD
MessageBus *bus = sw_likely(server_->is_thread_mode()) ? &server_->get_thread(swoole_get_thread_id())->message_bus
: &server_->message_bus;
#else
MessageBus *bus = &server_->message_bus;
#endif
auto bus = server_->get_worker_message_bus();
bus->pass(task);
server_->worker_accept_event(&bus->get_buffer()->info);

Expand Down Expand Up @@ -225,7 +220,7 @@ bool BaseFactory::finish(SendData *data) {
EventData proxy_msg{};

if (data->info.type == SW_SERVER_EVENT_SEND_DATA) {
if (!server_->message_bus.write(worker->pipe_master, data)) {
if (!server_->get_worker_message_bus()->write(worker->pipe_master, data)) {
swoole_sys_warning("failed to send %u bytes to pipe_master", data->info.len);
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/reactor_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ int Server::worker_main_loop(ProcessPool *pool, Worker *worker) {
SwooleTG.timer->reinit(reactor);
}

worker_signal_init();
serv->worker_signal_init();

for (auto ls : serv->ports) {
#ifdef HAVE_REUSEPORT
Expand Down
10 changes: 9 additions & 1 deletion src/server/reactor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ static int ReactorThread_onPipeRead(Reactor *reactor, Event *ev) {
return SW_OK;
} else if (resp->info.type == SW_SERVER_EVENT_SHUTDOWN) {
ReactorThread_shutdown(reactor);
} else if (resp->info.type == SW_SERVER_EVENT_FINISH) {
serv->onFinish(serv, (EventData *) resp);
} else if (resp->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) {
serv->onPipeMessage(serv, (EventData *) resp);
} else if (resp->info.type == SW_SERVER_EVENT_CLOSE_FORCE) {
SessionId session_id = resp->info.fd;
Connection *conn = serv->get_connection_verify_no_ssl(session_id);
Expand Down Expand Up @@ -728,7 +732,11 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) {

serv->init_reactor(reactor);
if (serv->is_thread_mode()) {
serv->init_worker(serv->get_worker(reactor_id));
Worker *worker = serv->get_worker(reactor_id);
serv->init_worker(worker);
worker->pipe_worker->set_nonblock();
worker->pipe_worker->buffer_size = UINT_MAX;
reactor->add(worker->pipe_worker, SW_EVENT_READ);
}

int max_pipe_fd = serv->get_worker(serv->worker_num - 1)->pipe_master->fd + 2;
Expand Down
4 changes: 4 additions & 0 deletions src/server/task_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ bool Server::task_unpack(EventData *task, String *buffer, PacketPtr *packet) {
}

static void TaskWorker_signal_init(ProcessPool *pool) {
Server *serv = (Server *) pool->ptr;
if (serv->is_thread_mode()) {
return;
}
swoole_signal_set(SIGHUP, nullptr);
swoole_signal_set(SIGPIPE, nullptr);
swoole_signal_set(SIGUSR1, Server::worker_signal_handler);
Expand Down
30 changes: 24 additions & 6 deletions src/server/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ ThreadFactory::ThreadFactory(Server *server) : BaseFactory(server) {
}

bool ThreadFactory::start() {
return server_->create_worker_pipes();
if (!server_->create_worker_pipes()) {
return false;
}
if (server_->task_worker_num > 0 &&
(server_->create_task_workers() < 0 || server_->gs->task_workers.start_check() < 0)) {
return false;
}
if (server_->get_user_worker_num() > 0 && server_->create_user_workers() < 0) {
return false;
}
return true;
}

bool ThreadFactory::shutdown() {
Expand Down Expand Up @@ -91,8 +101,17 @@ void ThreadFactory::spawn_task_worker(int i) {
swoole_set_thread_id(i);
Worker *worker = server_->get_worker(i);
worker->type = SW_PROCESS_TASKWORKER;
worker->status = SW_WORKER_IDLE;
auto pool = &server_->gs->task_workers;
server_->worker_thread_start([=]() { pool->main_loop(pool, worker); });
server_->worker_thread_start([=]() {
if (pool->onWorkerStart != nullptr) {
pool->onWorkerStart(pool, worker);
}
pool->main_loop(pool, worker);
if (pool->onWorkerStop != nullptr) {
pool->onWorkerStop(pool, worker);
}
});
at_thread_exit(worker);
});
}
Expand Down Expand Up @@ -127,7 +146,9 @@ void ThreadFactory::spawn_manager_thread(int i) {
server_->onManagerStop(server_);
}
});
at_thread_exit(&manager);
if (server_->running) {
swoole_warning("Fatal Error: manager thread exits abnormally");
}
});
}

Expand All @@ -147,9 +168,6 @@ void ThreadFactory::wait() {
case SW_PROCESS_USERWORKER:
spawn_user_worker(exited_worker->id);
break;
case SW_PROCESS_MANAGER:
swoole_warning("Fatal Error: manager process exit");
break;
default:
abort();
break;
Expand Down
Loading

0 comments on commit e824c86

Please sign in to comment.