Skip to content

Commit

Permalink
Refactor 2
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Apr 8, 2024
1 parent aeeb79f commit b2fe971
Show file tree
Hide file tree
Showing 17 changed files with 99 additions and 77 deletions.
12 changes: 7 additions & 5 deletions examples/thread/thread_server.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
<?php
$http = new Swoole\Http\Server("0.0.0.0", 9503);
$http->set([
'worker_num' => 2,
'worker_num' => 4,
// 'task_worker_num' => 3,
'enable_coroutine' => false,
// 'trace_flags' => SWOOLE_TRACE_SERVER,
// 'log_level' => SWOOLE_LOG_TRACE,
'init_arguments' => function () use ($http) {
$map = new Swoole\Thread\Map;
return [$map];
Expand All @@ -15,10 +17,10 @@
$resp->end('hello world');
});

$http->addProcess(new \Swoole\Process(function () {
echo "user process, id=" . \Swoole\Thread::getId();
sleep(2000);
}));
//$http->addProcess(new \Swoole\Process(function () {
// echo "user process, id=" . \Swoole\Thread::getId();
// sleep(2000);
//}));

$http->on('Task', function () {
var_dump(func_get_args());
Expand Down
2 changes: 1 addition & 1 deletion ext-src/swoole_admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ static std::string handle_get_connections(Server *serv, const std::string &msg)
if (serv->is_process_mode() && conn->reactor_id != SwooleTG.id) {
return;
}
if (serv->is_base_mode() && SwooleWG.worker && conn->reactor_id != SwooleWG.worker->id) {
if (serv->is_base_mode() && sw_worker() && conn->reactor_id != sw_worker()->id) {
return;
}
list.push_back(get_connection_info(serv, conn));
Expand Down
8 changes: 4 additions & 4 deletions ext-src/swoole_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ bool swoole_http_server_onBeforeRequest(HttpContext *ctx) {
ctx->onBeforeRequest = nullptr;
ctx->onAfterResponse = swoole_http_server_onAfterResponse;
Server *serv = (Server *) ctx->private_data;
SwooleWG.worker->concurrency++;
sw_worker()->concurrency++;
sw_atomic_add_fetch(&serv->gs->concurrency, 1);
swoole_trace("serv->gs->concurrency=%u, max_concurrency=%u", serv->gs->concurrency, serv->gs->max_concurrency);
if (SwooleWG.worker->concurrency > serv->worker_max_concurrency) {
if (sw_worker()->concurrency > serv->worker_max_concurrency) {
swoole_trace_log(SW_TRACE_COROUTINE,
"exceed worker_max_concurrency[%u] limit, request[%p] queued",
serv->worker_max_concurrency,
Expand All @@ -412,13 +412,13 @@ bool swoole_http_server_onBeforeRequest(HttpContext *ctx) {
void swoole_http_server_onAfterResponse(HttpContext *ctx) {
ctx->onAfterResponse = nullptr;
Server *serv = (Server *) ctx->private_data;
SwooleWG.worker->concurrency--;
sw_worker()->concurrency--;
sw_atomic_sub_fetch(&serv->gs->concurrency, 1);
swoole_trace("serv->gs->concurrency=%u, max_concurrency=%u", serv->gs->concurrency, serv->gs->max_concurrency);
if (!queued_http_contexts.empty()) {
HttpContext *ctx = queued_http_contexts.front();
swoole_trace(
"[POP 1] concurrency=%u, ctx=%p, request=%p", SwooleWG.worker->concurrency, ctx, ctx->request.zobject);
"[POP 1] concurrency=%u, ctx=%p, request=%p", sw_worker()->concurrency, ctx, ctx->request.zobject);
queued_http_contexts.pop();
swoole_event_defer(
[](void *private_data) {
Expand Down
2 changes: 1 addition & 1 deletion ext-src/swoole_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ int php_swoole_process_start(Worker *process, zval *zobject) {

php_swoole_process_clean();
swoole_set_process_id(process->id);
SwooleWG.worker = process;
g_worker_instance = process;

zend_update_property_long(swoole_process_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("pid"), process->pid);
if (process->pipe_current) {
Expand Down
22 changes: 11 additions & 11 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1535,10 +1535,10 @@ static void php_swoole_server_onAfterReload(Server *serv) {
}

static void php_swoole_server_onWorkerStop(Server *serv, Worker *worker) {
if (SwooleWG.shutdown) {
if (worker->shutdown) {
return;
}
SwooleWG.shutdown = true;
worker->shutdown = true;

zval *zserv = php_swoole_server_zval_ptr(serv);
ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
Expand Down Expand Up @@ -2898,10 +2898,10 @@ static PHP_METHOD(swoole_server, stats) {
add_assoc_long_ex(return_value, ZEND_STRL("min_fd"), serv->gs->min_fd);
add_assoc_long_ex(return_value, ZEND_STRL("max_fd"), serv->gs->max_fd);

if (SwooleWG.worker) {
add_assoc_long_ex(return_value, ZEND_STRL("worker_request_count"), SwooleWG.worker->request_count);
add_assoc_long_ex(return_value, ZEND_STRL("worker_response_count"), SwooleWG.worker->response_count);
add_assoc_long_ex(return_value, ZEND_STRL("worker_dispatch_count"), SwooleWG.worker->dispatch_count);
if (sw_worker()) {
add_assoc_long_ex(return_value, ZEND_STRL("worker_request_count"), sw_worker()->request_count);
add_assoc_long_ex(return_value, ZEND_STRL("worker_response_count"), sw_worker()->response_count);
add_assoc_long_ex(return_value, ZEND_STRL("worker_dispatch_count"), sw_worker()->dispatch_count);
}

if (serv->task_ipc_mode > Server::TASK_IPC_UNIXSOCK && serv->gs->task_workers.queue) {
Expand Down Expand Up @@ -3802,7 +3802,7 @@ static PHP_METHOD(swoole_server, getWorkerId) {
if (!serv->is_worker() && !serv->is_task_worker()) {
RETURN_FALSE;
} else {
RETURN_LONG(SwooleWG.worker->id);
RETURN_LONG(sw_worker()->id);
}
}

Expand All @@ -3820,7 +3820,7 @@ static PHP_METHOD(swoole_server, getWorkerStatus) {

Worker *worker;
if (worker_id == -1) {
worker = SwooleWG.worker;
worker = sw_worker();
} else {
worker = serv->get_worker(worker_id);
}
Expand All @@ -3838,7 +3838,7 @@ static PHP_METHOD(swoole_server, getWorkerPid) {
if (zend_parse_parameters(ZEND_NUM_ARGS(), "|l", &worker_id) == FAILURE) {
RETURN_FALSE;
}
Worker *worker = worker_id < 0 ? SwooleWG.worker : serv->get_worker(worker_id);
Worker *worker = worker_id < 0 ? sw_worker() : serv->get_worker(worker_id);
if (!worker) {
RETURN_FALSE;
}
Expand Down Expand Up @@ -3885,15 +3885,15 @@ static PHP_METHOD(swoole_server, stop) {
}

zend_bool wait_reactor = 0;
zend_long worker_id = SwooleWG.worker->id;
zend_long worker_id = sw_worker()->id;

ZEND_PARSE_PARAMETERS_START(0, 2)
Z_PARAM_OPTIONAL
Z_PARAM_LONG(worker_id)
Z_PARAM_BOOL(wait_reactor)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

if (worker_id == SwooleWG.worker->id && wait_reactor == 0) {
if (worker_id == sw_worker()->id && wait_reactor == 0) {
if (SwooleTG.reactor != nullptr) {
SwooleTG.reactor->defer(
[](void *data) {
Expand Down
20 changes: 11 additions & 9 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,6 @@ static inline ExitStatus wait_process(pid_t _pid, int options) {
struct ProcessPool;
struct Worker;

struct WorkerGlobal {
bool run_always;
bool shutdown;
uint32_t max_request;
Worker *worker;
time_t exit_time;
};

struct Worker {
pid_t pid;
WorkerId id;
Expand All @@ -120,6 +112,11 @@ struct Worker {
bool redirect_stdin;
bool redirect_stderr;

bool run_always;
bool shutdown;
uint32_t max_request;
time_t exit_time;

/**
* worker status, IDLE or BUSY
*/
Expand Down Expand Up @@ -325,5 +322,10 @@ static sw_inline int swoole_kill(pid_t __pid, int __sig) {
return kill(__pid, __sig);
}

extern SW_THREAD_LOCAL swoole::WorkerGlobal SwooleWG; // Worker Global Variable
typedef swoole::ProtocolType swProtocolType;

extern SW_THREAD_LOCAL swoole::Worker *g_worker_instance;

static inline swoole::Worker *sw_worker() {
return g_worker_instance;
}
4 changes: 4 additions & 0 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,10 @@ class Server {
#endif
}

bool if_forward_message(Session *session) {
return session->reactor_id != swoole_get_process_id();
}

Worker *get_worker(uint16_t worker_id) {
// Event Worker
if (worker_id < worker_num) {
Expand Down
1 change: 1 addition & 0 deletions scripts/make.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ COMPILE_PARAMS="--enable-openssl \
--enable-cares \
--enable-swoole-pgsql \
--enable-swoole-thread \
--enable-trace-log \
--with-swoole-odbc=unixODBC,/usr \
--enable-swoole-sqlite"

Expand Down
4 changes: 2 additions & 2 deletions src/core/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,11 @@ void Logger::put(int level, const char *content, size_t length) {

n = sw_snprintf(log_str,
SW_LOG_BUFFER_SIZE,
"[%.*s %c%d.%d]\t%s\t%.*s\n",
"[%.*s %c%ld.%d]\t%s\t%.*s\n",
static_cast<int>(l_data_str),
date_str,
process_flag,
SwooleG.pid,
pthread_self(),
process_id,
level_str,
static_cast<int>(length),
Expand Down
2 changes: 1 addition & 1 deletion src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker
out.mtype = worker->id + 1;
}

while (pool->running && !SwooleWG.shutdown && task_n > 0) {
while (pool->running && !worker->shutdown && task_n > 0) {
/**
* fetch task
*/
Expand Down
25 changes: 19 additions & 6 deletions src/server/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ BaseFactory::BaseFactory(Server *server) : Factory(server) {}
BaseFactory::~BaseFactory() {}

bool BaseFactory::start() {
SwooleWG.run_always = true;
sw_worker()->run_always = true;
return true;
}

Expand Down Expand Up @@ -91,8 +91,14 @@ bool BaseFactory::dispatch(SendData *task) {
}
}

server_->message_bus.pass(task);
server_->worker_accept_event(&server_->message_bus.get_buffer()->info);
#ifdef SW_THREAD
MessageBus *bus = sw_likely(server_->is_thread_mode()) ? &server_->get_thread(swoole_get_thread_id())->message_bus
: &server_->message_bus;
#else
MessageBus *bus = &server_->message_bus;
#endif
bus->pass(task);
server_->worker_accept_event(&bus->get_buffer()->info);

return true;
}
Expand Down Expand Up @@ -137,7 +143,11 @@ bool BaseFactory::end(SessionId session_id, int flags) {
return false;
}

if (session->reactor_id != swoole_get_process_id()) {
if (server_->if_forward_message(session)) {
swoole_trace_log(SW_TRACE_SERVER, "session_id=%ld, fd=%d, session->reactor_id=%d",
session_id,
session->fd,
session->reactor_id);
Worker *worker = server_->get_worker(session->reactor_id);
if (worker->pipe_master->send_async((const char *) &_send.info, sizeof(_send.info)) < 0) {
swoole_sys_warning("failed to send %lu bytes to pipe_master", sizeof(_send.info));
Expand Down Expand Up @@ -204,8 +214,11 @@ bool BaseFactory::finish(SendData *data) {
SessionId session_id = data->info.fd;

Session *session = server_->get_session(session_id);
if (session->reactor_id != swoole_get_process_id()) {
swoole_trace("session->reactor_id=%d, SwooleG.process_id=%d", session->reactor_id, server_->get_worker_id());
if (server_->if_forward_message(session)) {
swoole_trace_log(SW_TRACE_SERVER, "session_id=%ld, fd=%d, session->reactor_id=%d",
session_id,
session->fd,
session->reactor_id);
Worker *worker = server_->gs->event_workers.get_worker(session->reactor_id);
EventData proxy_msg{};

Expand Down
16 changes: 8 additions & 8 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,11 +562,11 @@ void Server::destroy_worker(Worker *worker) {
*/
void Server::init_worker(Worker *worker) {
if (max_request < 1) {
SwooleWG.run_always = true;
worker->run_always = true;
} else {
SwooleWG.max_request = max_request;
worker->max_request = max_request;
if (max_request_grace > 0) {
SwooleWG.max_request += swoole_system_random(1, max_request_grace);
worker->max_request += swoole_system_random(1, max_request_grace);
}
}
worker->start_time = ::time(nullptr);
Expand Down Expand Up @@ -878,7 +878,7 @@ bool Server::shutdown() {
}
} else {
gs->event_workers.running = 0;
stop_async_worker(SwooleWG.worker);
stop_async_worker(sw_worker());
return true;
}
}
Expand Down Expand Up @@ -1051,7 +1051,7 @@ bool Server::command(WorkerId process_id,
if (is_process_mode() && !is_master()) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "command() can only be used in master process");
return false;
} else if (is_base_mode() && SwooleWG.worker->id != 0) {
} else if (is_base_mode() && sw_worker()->id != 0) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "command() can only be used in worker process 0");
return false;
}
Expand Down Expand Up @@ -1185,8 +1185,8 @@ bool Server::send(SessionId session_id, const void *data, uint32_t length) {
sw_atomic_fetch_add(&port->gs->response_count, 1);
sw_atomic_fetch_add(&port->gs->total_send_bytes, length);
}
if (SwooleWG.worker) {
SwooleWG.worker->response_count++;
if (sw_worker()) {
sw_worker()->response_count++;
}
return true;
}
Expand Down Expand Up @@ -1302,7 +1302,7 @@ int Server::send_to_connection(SendData *_send) {
assert(fd % reactor_num == SwooleTG.id);
}

if (is_base_mode() && conn->overflow) {
if (!is_process_mode() && conn->overflow) {
if (send_yield) {
swoole_set_last_error(SW_ERROR_OUTPUT_SEND_YIELD);
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/server/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ void ProcessFactory::kill_event_workers() {
}

SW_LOOP_N(server_->worker_num) {
swoole_trace("kill worker#%d[pid=%d]", workers[i].id, workers[i].pid);
swoole_trace_log(SW_TRACE_SERVER, "kill worker#%d[pid=%d]", server_->workers[i].id, server_->workers[i].pid);
swoole_kill(server_->workers[i].pid, SIGTERM);
}
SW_LOOP_N(server_->worker_num) {
swoole_trace("wait worker#%d[pid=%d]", workers[i].id, workers[i].pid);
swoole_trace_log(SW_TRACE_SERVER, "wait worker#%d[pid=%d]", server_->workers[i].id, server_->workers[i].pid);
if (swoole_waitpid(server_->workers[i].pid, &status, 0) < 0) {
swoole_sys_warning("waitpid(%d) failed", server_->workers[i].pid);
}
Expand Down Expand Up @@ -139,7 +139,7 @@ pid_t ProcessFactory::spawn_user_worker(Worker *worker) {
else if (pid == 0) {
swoole_set_process_type(SW_PROCESS_USERWORKER);
swoole_set_process_id(worker->id);
SwooleWG.worker = worker;
g_worker_instance = worker;
worker->pid = SwooleG.pid;
server_->onUserWorkerStart(server_, worker);
exit(0);
Expand Down
8 changes: 4 additions & 4 deletions src/server/reactor_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ static int ReactorProcess_onPipeRead(Reactor *reactor, Event *event) {
break;
}
case SW_SERVER_EVENT_COMMAND_REQUEST: {
serv->call_command_handler(serv->message_bus, SwooleWG.worker->id, serv->get_worker(0)->pipe_master);
serv->call_command_handler(serv->message_bus, sw_worker()->id, serv->get_worker(0)->pipe_master);
break;
}
case SW_SERVER_EVENT_COMMAND_RESPONSE: {
Expand All @@ -170,10 +170,10 @@ int Server::worker_main_loop(ProcessPool *pool, Worker *worker) {
swoole_set_process_id(worker->id);

if (serv->max_request > 0) {
SwooleWG.run_always = false;
worker->run_always = false;
}
SwooleWG.max_request = serv->max_request;
SwooleWG.worker = worker;
worker->max_request = serv->max_request;
g_worker_instance = worker;
SwooleTG.id = 0;

serv->init_worker(worker);
Expand Down
Loading

0 comments on commit b2fe971

Please sign in to comment.