Skip to content

Commit

Permalink
Optimize server code [2]
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 13, 2024
1 parent 44688fc commit 57a4671
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 88 deletions.
4 changes: 2 additions & 2 deletions core-tests/src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TEST(server, schedule) {
ASSERT_EQ(SW_OK, ret);

for (uint32_t i = 0; i < serv.worker_num; i++) {
serv.workers[i].status = SW_WORKER_BUSY;
serv.workers[i].set_status_to_busy();
}

std::set<int> _worker_id_set;
Expand All @@ -51,7 +51,7 @@ TEST(server, schedule) {
ASSERT_EQ(_worker_id_set.size(), serv.worker_num);

for (uint32_t i = 1; i < serv.worker_num - 1; i++) {
serv.workers[i].status = SW_WORKER_IDLE;
serv.workers[i].set_status_to_idle();
}

_worker_id_set.clear();
Expand Down
4 changes: 4 additions & 0 deletions ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,13 @@ static void process_pool_onMessage(ProcessPool *pool, RecvData *msg) {
ZVAL_STRINGL(&args[1], data, length);
}
}
auto *worker = sw_worker();
worker->set_status_to_busy();
if (UNEXPECTED(!zend::function::call(pp->onMessage, 2, args, nullptr, pp->enable_coroutine))) {
php_swoole_error(E_WARNING, "%s->onMessage handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
}
worker->add_request_count();
worker->set_status_to_idle();
zval_ptr_dtor(&args[1]);
}

Expand Down
24 changes: 13 additions & 11 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,7 @@ static int php_swoole_server_onTask(Server *serv, EventData *req) {
} else {
argc = 4;
argv[0] = *zserv;
ZVAL_LONG(&argv[1], (zend_long) req->info.fd);
ZVAL_LONG(&argv[1], (zend_long) serv->get_task_id(req));
ZVAL_LONG(&argv[2], (zend_long) req->info.reactor_id);
argv[3] = zresult.value;
}
Expand Down Expand Up @@ -1304,14 +1304,14 @@ static int php_swoole_server_onTask(Server *serv, EventData *req) {
static int php_swoole_server_onFinish(Server *serv, EventData *req) {
zval *zserv = php_swoole_server_zval_ptr(serv);
ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
TaskId task_id = serv->get_task_id(req);

zend::Variable zresult;
if (!php_swoole_server_task_unpack(zresult.ptr(), req)) {
return SW_ERR;
}

if (req->info.ext_flags & SW_TASK_COROUTINE) {
TaskId task_id = req->info.fd;
auto task_co_iterator = server_object->property->task_coroutine_map.find(task_id);
if (task_co_iterator == server_object->property->task_coroutine_map.end()) {
swoole_error_log(SW_LOG_WARNING, SW_ERROR_TASK_TIMEOUT, "task[%ld] has expired", task_id);
Expand Down Expand Up @@ -1349,7 +1349,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {

zend_fcall_info_cache *fci_cache = nullptr;
if (req->info.ext_flags & SW_TASK_CALLBACK) {
auto callback_iterator = server_object->property->task_callbacks.find(req->info.fd);
auto callback_iterator = server_object->property->task_callbacks.find(task_id);
if (callback_iterator == server_object->property->task_callbacks.end()) {
req->info.ext_flags = req->info.ext_flags & (~SW_TASK_CALLBACK);
} else {
Expand All @@ -1372,7 +1372,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {
zval *object = &args[1];
object_init_ex(object, swoole_server_task_result_ce);
zend_update_property_long(
swoole_server_task_result_ce, SW_Z8_OBJ_P(object), ZEND_STRL("task_id"), (zend_long) req->info.fd);
swoole_server_task_result_ce, SW_Z8_OBJ_P(object), ZEND_STRL("task_id"), (zend_long) task_id);
zend_update_property_long(swoole_server_task_result_ce,
SW_Z8_OBJ_P(object),
ZEND_STRL("task_worker_id"),
Expand All @@ -1382,7 +1382,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {
zend_update_property(swoole_server_task_result_ce, SW_Z8_OBJ_P(object), ZEND_STRL("data"), zresult.ptr());
argc = 2;
} else {
ZVAL_LONG(&args[1], req->info.fd);
ZVAL_LONG(&args[1], (zend_long) task_id);
args[2] = zresult.value;
argc = 3;
}
Expand All @@ -1392,7 +1392,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {
}
if (req->info.ext_flags & SW_TASK_CALLBACK) {
sw_zend_fci_cache_discard(fci_cache);
server_object->property->task_callbacks.erase(req->info.fd);
server_object->property->task_callbacks.erase(task_id);
}
if (serv->event_object) {
zval_ptr_dtor(&args[1]);
Expand Down Expand Up @@ -3076,7 +3076,7 @@ static PHP_METHOD(swoole_server, taskwait) {
}

int _dst_worker_id = (int) dst_worker_id;
TaskId task_id = buf.info.fd;
TaskId task_id = serv->get_task_id(&buf);

// coroutine
if (swoole_coroutine_is_in()) {
Expand Down Expand Up @@ -3125,7 +3125,7 @@ static PHP_METHOD(swoole_server, taskwait) {
break;
}
if (pipe->read(&notify, sizeof(notify)) > 0) {
if (task_result->info.fd != task_id) {
if (serv->get_task_id(task_result) != task_id) {
continue;
}
zval zresult;
Expand Down Expand Up @@ -3264,7 +3264,7 @@ static PHP_METHOD(swoole_server, taskWaitMulti) {

do {
EventData *result = (EventData *) (content->str + content->offset);
TaskId task_id = result->info.fd;
TaskId task_id = serv->get_task_id(result);
zval zresult;
if (!php_swoole_server_task_unpack(&zresult, result)) {
goto _next;
Expand Down Expand Up @@ -3409,12 +3409,14 @@ static PHP_METHOD(swoole_server, task) {
RETURN_FALSE;
}

TaskId task_id = serv->get_task_id(&buf);

if (!serv->is_worker()) {
buf.info.ext_flags |= SW_TASK_NOREPLY;
} else if (fci.size) {
buf.info.ext_flags |= SW_TASK_CALLBACK;
sw_zend_fci_cache_persist(&fci_cache);
server_object->property->task_callbacks[buf.info.fd] = fci_cache;
server_object->property->task_callbacks[task_id] = fci_cache;
}

buf.info.ext_flags |= SW_TASK_NONBLOCK;
Expand All @@ -3423,7 +3425,7 @@ static PHP_METHOD(swoole_server, task) {
sw_atomic_fetch_add(&serv->gs->tasking_num, 1);

if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) >= 0) {
RETURN_LONG(buf.info.fd);
RETURN_LONG(task_id);
}

sw_atomic_fetch_sub(&serv->gs->tasking_num, 1);
Expand Down
16 changes: 16 additions & 0 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ struct Worker {
status = _status;
}

void set_status_to_idle() {
set_status(SW_WORKER_IDLE);
}

void set_status_to_busy() {
set_status(SW_WORKER_BUSY);
}

void add_request_count() {
request_count++;
}

bool is_busy() {
return status == SW_WORKER_BUSY;
}
Expand Down Expand Up @@ -282,6 +294,10 @@ struct ProcessPool {
return iter->second;
}

TaskId get_task_id(EventData *task) {
return task->info.fd;
}

void set_max_packet_size(uint32_t _max_packet_size) {
max_packet_size_ = _max_packet_size;
}
Expand Down
7 changes: 6 additions & 1 deletion include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ class ThreadFactory : public BaseFactory {
void at_thread_exit(Worker *worker);
void create_message_bus();
void destroy_message_bus();

public:
ThreadFactory(Server *server);
~ThreadFactory();
Expand Down Expand Up @@ -996,6 +997,10 @@ class Server {
int get_idle_task_worker_num();
int get_task_count();

TaskId get_task_id(EventData *task) {
return gs->task_workers.get_task_id(task);
}

int get_minfd() {
return gs->min_fd;
}
Expand Down Expand Up @@ -1544,7 +1549,7 @@ class Server {
uint32_t key = 0;
SW_LOOP_N(worker_num + 1) {
key = sw_atomic_fetch_add(&worker_round_id, 1) % worker_num;
if (workers[key].status == SW_WORKER_IDLE) {
if (workers[key].is_idle()) {
found = true;
break;
}
Expand Down
52 changes: 41 additions & 11 deletions src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ int ProcessPool::schedule() {

for (i = 0; i < worker_num + 1; i++) {
target_worker_id = sw_atomic_fetch_add(&round_id, 1) % worker_num;
if (workers[target_worker_id].status == SW_WORKER_IDLE) {
if (workers[target_worker_id].is_idle()) {
found = 1;
break;
}
Expand Down Expand Up @@ -468,6 +468,7 @@ pid_t ProcessPool::spawn(Worker *worker) {
swoole_set_process_type(SW_PROCESS_WORKER);
swoole_set_process_id(worker->id);
SwooleWG.worker = worker;
SwooleWG.run_always = true;
if (async) {
if (swoole_event_init(SW_EVENTLOOP_WAIT_EXIT) < 0) {
exit(254);
Expand Down Expand Up @@ -526,10 +527,7 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker
EventData buf;
} out{};

ssize_t n = 0, ret;
if (pool->get_max_request() <= 0) {
SwooleWG.run_always = 1;
}
ssize_t n = 0;

/**
* Use from_fd save the task_worker->id
Expand Down Expand Up @@ -591,11 +589,10 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker
swoole_warning("bad task packet, The received data-length[%ld] is inconsistent with the packet-length[%ld]",
n,
out.buf.info.len + sizeof(out.buf.info));
continue;
} else if (pool->onTask(pool, worker, &out.buf) < 0) {
swoole_warning("[Worker#%d] the execution of task#%ld has failed", worker->id, pool->get_task_id(&out.buf));
}

ret = pool->onTask(pool, worker, &out.buf);

if (pool->use_socket && pool->stream_info_->last_connection) {
int _end = 0;
pool->stream_info_->last_connection->send_blocking((void *) &_end, sizeof(_end));
Expand All @@ -611,7 +608,7 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker
}

if (worker->has_exceeded_max_request()) {
break;
break;
}
}
return SW_OK;
Expand Down Expand Up @@ -758,7 +755,7 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke
}

if (worker->has_exceeded_max_request()) {
break;
break;
}
}
return SW_OK;
Expand Down Expand Up @@ -801,7 +798,7 @@ static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Work
return SW_OK;
}
if (worker->has_exceeded_max_request()) {
break;
break;
}
}

Expand Down Expand Up @@ -994,4 +991,37 @@ void ProcessPool::destroy() {
sw_mem_pool()->free(workers);
}

bool Worker::has_exceeded_max_request() {
return !SwooleWG.run_always && request_count >= SwooleWG.max_request;
}

ssize_t Worker::send_pipe_message(const void *buf, size_t n, int flags) {
Socket *pipe_sock;

if (flags & SW_PIPE_MASTER) {
pipe_sock = pipe_master;
} else {
pipe_sock = pipe_worker;
}

// message-queue
if (pool->use_msgqueue) {
struct {
long mtype;
EventData buf;
} msg;

msg.mtype = id + 1;
memcpy(&msg.buf, buf, n);

return pool->queue->push((QueueNode *) &msg, n) ? n : -1;
}

if ((flags & SW_PIPE_NONBLOCK) && swoole_event_is_available()) {
return swoole_event_write(pipe_sock, buf, n);
} else {
return pipe_sock->send_blocking(buf, n);
}
}

} // namespace swoole
4 changes: 2 additions & 2 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ int Server::get_idle_worker_num() {

for (i = 0; i < worker_num; i++) {
Worker *worker = get_worker(i);
if (worker->status == SW_WORKER_IDLE) {
if (worker->is_idle()) {
idle_worker_num++;
}
}
Expand All @@ -2069,7 +2069,7 @@ int Server::get_idle_task_worker_num() {

for (uint32_t i = worker_num; i < (worker_num + task_worker_num); i++) {
Worker *worker = get_worker(i);
if (worker->status == SW_WORKER_IDLE) {
if (worker->is_idle()) {
idle_worker_num++;
}
}
Expand Down
20 changes: 12 additions & 8 deletions src/server/task_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task)
Server *serv = (Server *) pool->ptr;
serv->last_task = task;

worker->status = SW_WORKER_BUSY;
worker->set_status_to_busy();
if (task->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) {
serv->onPipeMessage(serv, task);
} else if (task->info.type == SW_SERVER_EVENT_SHUTDOWN) {
Expand All @@ -95,9 +95,13 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task)
ret = TaskWorker_call_command_handler(pool, task);
} else {
ret = serv->onTask(serv, task);
worker->request_count++;
/**
* only server task as requests,
* do not increase the count for pipeline communication and command processing.
*/
worker->add_request_count();
}
worker->status = SW_WORKER_IDLE;
worker->set_status_to_idle();

return ret;
}
Expand Down Expand Up @@ -202,7 +206,7 @@ static void TaskWorker_onStart(ProcessPool *pool, Worker *worker) {

worker->start_time = ::time(nullptr);
worker->request_count = 0;
worker->status = SW_WORKER_IDLE;
worker->set_status_to_idle();
/**
* task_max_request
*/
Expand Down Expand Up @@ -248,7 +252,7 @@ static int TaskWorker_onPipeReceive(Reactor *reactor, Event *event) {
static int TaskWorker_loop_async(ProcessPool *pool, Worker *worker) {
Server *serv = (Server *) pool->ptr;
Socket *socket = worker->pipe_worker;
worker->status = SW_WORKER_IDLE;
worker->set_status_to_idle();

socket->set_nonblock();
sw_reactor()->ptr = pool;
Expand Down Expand Up @@ -316,10 +320,10 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even
uint32_t _len = htonl(data_len);
retval = worker->pool->stream_info_->last_connection->send_blocking((void *) &_len, sizeof(_len));
if (retval > 0) {
retval = worker->pool->stream_info_->last_connection->send_blocking(data, data_len);
retval = worker->pool->stream_info_->last_connection->send_blocking(data, data_len);
}
} else {
retval = send_to_worker_from_worker(worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER);
retval = send_to_worker_from_worker(worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER);
}
} else {
uint64_t flag = 1;
Expand Down Expand Up @@ -367,7 +371,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even
worker->lock->unlock();

while (1) {
retval = pipe->write(&flag, sizeof(flag));
retval = pipe->write(&flag, sizeof(flag));
auto _sock = pipe->get_socket(true);
if (retval < 0 && _sock->catch_write_error(errno) == SW_WAIT) {
if (_sock->wait_event(-1, SW_EVENT_WRITE) == 0) {
Expand Down
Loading

0 comments on commit 57a4671

Please sign in to comment.