Skip to content

Commit

Permalink
Optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 13, 2024
1 parent 93dbf3b commit 0a6950f
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 35 deletions.
21 changes: 9 additions & 12 deletions core-tests/src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ TEST(server, task_worker) {
EXPECT_EQ(serv->get_tasking_num(), 1);
EXPECT_EQ(string(task->data, task->info.len), string(packet));
serv->gs->task_workers.running = 0;
serv->gs->task_count++;
serv->gs->tasking_num--;
return 0;
};

Expand All @@ -558,9 +560,7 @@ TEST(server, task_worker) {
thread t1([&serv]() {
SwooleWG.run_always = true;
serv.gs->task_workers.running = 1;
serv.gs->tasking_num++;
serv.gs->task_workers.main_loop(&serv.gs->task_workers, &serv.gs->task_workers.workers[0]);
serv.gs->tasking_num--;
EXPECT_EQ(serv.get_tasking_num(), 0);
serv.gs->tasking_num--;
EXPECT_EQ(serv.get_tasking_num(), 0);
Expand All @@ -578,12 +578,13 @@ TEST(server, task_worker) {

int _dst_worker_id = 0;

ASSERT_GE(serv.gs->task_workers.dispatch(&buf, &_dst_worker_id), 0);
ASSERT_TRUE(serv.task(&buf, &_dst_worker_id));
ASSERT_EQ(serv.gs->task_count, 1);

t1.join();
serv.gs->task_workers.destroy();

ASSERT_EQ(serv.gs->)
ASSERT_EQ(serv.gs->task_count, 2);
}

// PHP_METHOD(swoole_server, task)
Expand All @@ -603,8 +604,7 @@ TEST(server, task_worker2) {

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(string(task->data, task->info.len), string(packet));
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
return 0;
};

Expand Down Expand Up @@ -652,8 +652,7 @@ TEST(server, task_worker3) {

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(string(task->data, task->info.len), string(packet));
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
return 0;
};

Expand Down Expand Up @@ -701,8 +700,7 @@ TEST(server, task_worker4) {

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(string(task->data, task->info.len), string(packet));
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
return 0;
};

Expand Down Expand Up @@ -770,8 +768,7 @@ TEST(server, task_worker5) {
ifs.close();

EXPECT_EQ(string(resp), string(data));
int ret = serv->reply_task_result(resp, SW_IPC_MAX_SIZE * 2, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(resp, SW_IPC_MAX_SIZE * 2, 0, task));
return 0;
};

Expand Down
17 changes: 8 additions & 9 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static void php_swoole_server_onWorkerError(Server *serv, Worker *worker, const
static void php_swoole_server_onManagerStart(Server *serv);
static void php_swoole_server_onManagerStop(Server *serv);

static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task);
static bool php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task);
static TaskId php_swoole_server_task_pack(zval *data, EventData *task);
static bool php_swoole_server_task_unpack(zval *zresult, EventData *task_result);
static int php_swoole_server_dispatch_func(Server *serv, Connection *conn, SendData *data);
Expand Down Expand Up @@ -1025,13 +1025,12 @@ void ServerObject::register_callback() {
}
}

static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task) {
static bool php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task) {
int flags = 0;
smart_str serialized_data = {};
php_serialize_data_t var_hash;
char *data_str;
size_t data_len = 0;
int ret;

// need serialize
if (Z_TYPE_P(zdata) != IS_STRING) {
Expand All @@ -1049,9 +1048,9 @@ static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *c
data_len = Z_STRLEN_P(zdata);
}

ret = serv->reply_task_result(data_str, data_len, flags, current_task);
bool success = serv->finish(data_str, data_len, flags, current_task);
smart_str_free(&serialized_data);
return ret;
return success;
}

static void php_swoole_server_onPipeMessage(Server *serv, EventData *req) {
Expand Down Expand Up @@ -3101,7 +3100,7 @@ static PHP_METHOD(swoole_server, taskwait) {
RETURN_FALSE;
}
} else {
auto retval = serv->task_sync(&buf, (int *) &dst_worker_id, timeout);
auto retval = serv->task_sync(&buf, (int *) &dst_worker_id, timeout);
if (!retval) {
RETURN_FALSE;
}
Expand Down Expand Up @@ -3299,7 +3298,7 @@ static PHP_METHOD(swoole_server, taskCo) {

zval *ztask;
SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask) {
EventData buf;
EventData buf;
task_id = php_swoole_server_task_pack(ztask, &buf);
if (task_id < 0) {
php_swoole_fatal_error(E_WARNING, "failed to pack task");
Expand Down Expand Up @@ -3503,7 +3502,7 @@ static PHP_METHOD(swoole_server, finish) {
Z_PARAM_ZVAL(zdata)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

SW_CHECK_RETURN(php_swoole_server_task_finish(serv, zdata, nullptr));
RETURN_BOOL(php_swoole_server_task_finish(serv, zdata, nullptr));
}

static PHP_METHOD(swoole_server_task, finish) {
Expand All @@ -3520,7 +3519,7 @@ static PHP_METHOD(swoole_server_task, finish) {
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

DataHead *info = php_swoole_server_task_get_info(ZEND_THIS);
SW_CHECK_RETURN(php_swoole_server_task_finish(serv, zdata, (EventData *) info));
RETURN_BOOL(php_swoole_server_task_finish(serv, zdata, (EventData *) info));
}

static PHP_METHOD(swoole_server_task, pack) {
Expand Down
4 changes: 2 additions & 2 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ class Server {
}

EventData *get_task_result() {
return &(task_results[swoole_get_process_id()]);
return &(task_results[swoole_get_process_id()]);
}

WorkerId get_task_src_worker_id(EventData *task) {
Expand Down Expand Up @@ -1371,7 +1371,6 @@ class Server {
}

ssize_t send_to_reactor_thread(const EventData *ev_data, size_t sendn, SessionId session_id);
int reply_task_result(const char *data, size_t data_len, int flags, EventData *current_task);

bool send(SessionId session_id, const void *data, uint32_t length);
bool sendfile(SessionId session_id, const char *file, uint32_t l_file, off_t offset, size_t length);
Expand All @@ -1387,6 +1386,7 @@ class Server {
const Command::Callback &fn);

bool task(EventData *task, int *dst_worker_id, bool blocking = false);
bool finish(const char *data, size_t data_len, int flags, EventData *current_task);
bool task_sync(EventData *task, int *dst_worker_id, double timeout);
bool send_pipe_message(WorkerId worker_id, EventData *msg);

Expand Down
4 changes: 2 additions & 2 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,8 @@ int Server::create_task_workers() {
}

/*
* For Server::task_sync(), create notify pipe and result shared memory.
*/
* For Server::task_sync(), create notify pipe and result shared memory.
*/
task_results = (EventData *) sw_shm_calloc(worker_num, sizeof(EventData));
if (!task_results) {
swoole_warning("malloc[task_result] failed");
Expand Down
20 changes: 10 additions & 10 deletions src/server/task_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,40 +325,39 @@ static int TaskWorker_loop_async(ProcessPool *pool, Worker *worker) {
/**
* Send the task result to worker
*/
int Server::reply_task_result(const char *data, size_t data_len, int flags, EventData *current_task) {
EventData buf;
sw_memset_zero(&buf.info, sizeof(buf.info));
bool Server::finish(const char *data, size_t data_len, int flags, EventData *current_task) {
if (task_worker_num < 1) {
swoole_warning("cannot use Server::task()/Server::finish() method, because no set [task_worker_num]");
return SW_ERR;
return false;
}
if (current_task == nullptr) {
current_task = last_task;
}
if (current_task->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) {
swoole_warning("Server::task()/Server::finish() is not supported in onPipeMessage callback");
return SW_ERR;
return false;
}
if (current_task->info.ext_flags & SW_TASK_NOREPLY) {
swoole_warning("Server::finish() can only be used in the worker process");
return SW_ERR;
return false;
}

uint16_t source_worker_id = current_task->info.reactor_id;
Worker *worker = get_worker(source_worker_id);

if (worker == nullptr) {
swoole_warning("invalid worker_id[%d]", source_worker_id);
return SW_ERR;
return false;
}

ssize_t retval;
// for swoole_server_task
if (current_task->info.ext_flags & SW_TASK_NONBLOCK) {
// write to file
EventData buf;
if (!task_pack(&buf, data, data_len)) {
swoole_warning("large task pack failed()");
return SW_ERR;
return false;
}
// callback function
if (current_task->info.ext_flags & SW_TASK_CALLBACK) {
Expand Down Expand Up @@ -396,6 +395,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even
char *_tmpfile = result->data + 4;
File file(_tmpfile, O_APPEND | O_WRONLY);
if (file.ready()) {
EventData buf;
if (!task_pack(&buf, data, data_len)) {
swoole_warning("large task pack failed()");
buf.info.len = 0;
Expand All @@ -414,7 +414,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even
// unlock worker
worker->lock->unlock();
swoole_warning("large task pack failed()");
return SW_ERR;
return false;
}
result->info.ext_flags |= flags;
result->info.type = SW_SERVER_EVENT_FINISH;
Expand Down Expand Up @@ -442,6 +442,6 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even
swoole_sys_warning("send result to worker failed");
}
}
return retval;
return true;
}
} // namespace swoole

0 comments on commit 0a6950f

Please sign in to comment.