diff --git a/core-tests/src/server/server.cpp b/core-tests/src/server/server.cpp index e67b63a90f6..19ae3732d17 100644 --- a/core-tests/src/server/server.cpp +++ b/core-tests/src/server/server.cpp @@ -549,6 +549,8 @@ TEST(server, task_worker) { EXPECT_EQ(serv->get_tasking_num(), 1); EXPECT_EQ(string(task->data, task->info.len), string(packet)); serv->gs->task_workers.running = 0; + serv->gs->task_count++; + serv->gs->tasking_num--; return 0; }; @@ -558,9 +560,7 @@ TEST(server, task_worker) { thread t1([&serv]() { SwooleWG.run_always = true; serv.gs->task_workers.running = 1; - serv.gs->tasking_num++; serv.gs->task_workers.main_loop(&serv.gs->task_workers, &serv.gs->task_workers.workers[0]); - serv.gs->tasking_num--; EXPECT_EQ(serv.get_tasking_num(), 0); serv.gs->tasking_num--; EXPECT_EQ(serv.get_tasking_num(), 0); @@ -578,12 +578,13 @@ TEST(server, task_worker) { int _dst_worker_id = 0; - ASSERT_GE(serv.gs->task_workers.dispatch(&buf, &_dst_worker_id), 0); + ASSERT_TRUE(serv.task(&buf, &_dst_worker_id)); + ASSERT_EQ(serv.gs->task_count, 1); t1.join(); serv.gs->task_workers.destroy(); - ASSERT_EQ(serv.gs->) + ASSERT_EQ(serv.gs->task_count, 2); } // PHP_METHOD(swoole_server, task) @@ -603,8 +604,7 @@ TEST(server, task_worker2) { serv.onTask = [](Server *serv, swEventData *task) -> int { EXPECT_EQ(string(task->data, task->info.len), string(packet)); - int ret = serv->reply_task_result(task->data, task->info.len, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task)); return 0; }; @@ -652,8 +652,7 @@ TEST(server, task_worker3) { serv.onTask = [](Server *serv, swEventData *task) -> int { EXPECT_EQ(string(task->data, task->info.len), string(packet)); - int ret = serv->reply_task_result(task->data, task->info.len, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task)); return 0; }; @@ -701,8 +700,7 @@ TEST(server, task_worker4) { serv.onTask = [](Server *serv, swEventData *task) -> int { EXPECT_EQ(string(task->data, task->info.len), string(packet)); - int ret = serv->reply_task_result(task->data, task->info.len, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task)); return 0; }; @@ -770,8 +768,7 @@ TEST(server, task_worker5) { ifs.close(); EXPECT_EQ(string(resp), string(data)); - int ret = serv->reply_task_result(resp, SW_IPC_MAX_SIZE * 2, 0, task); - EXPECT_GT(ret, 0); + EXPECT_TRUE(serv->finish(resp, SW_IPC_MAX_SIZE * 2, 0, task)); return 0; }; diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index 476cbe692cc..458170e9cb1 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -85,7 +85,7 @@ static void php_swoole_server_onWorkerError(Server *serv, Worker *worker, const static void php_swoole_server_onManagerStart(Server *serv); static void php_swoole_server_onManagerStop(Server *serv); -static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task); +static bool php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task); static TaskId php_swoole_server_task_pack(zval *data, EventData *task); static bool php_swoole_server_task_unpack(zval *zresult, EventData *task_result); static int php_swoole_server_dispatch_func(Server *serv, Connection *conn, SendData *data); @@ -1025,13 +1025,12 @@ void ServerObject::register_callback() { } } -static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task) { +static bool php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task) { int flags = 0; smart_str serialized_data = {}; php_serialize_data_t var_hash; char *data_str; size_t data_len = 0; - int ret; // need serialize if (Z_TYPE_P(zdata) != IS_STRING) { @@ -1049,9 +1048,9 @@ static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *c data_len = Z_STRLEN_P(zdata); } - ret = serv->reply_task_result(data_str, data_len, flags, current_task); + bool success = serv->finish(data_str, data_len, flags, current_task); smart_str_free(&serialized_data); - return ret; + return success; } static void php_swoole_server_onPipeMessage(Server *serv, EventData *req) { @@ -3101,7 +3100,7 @@ static PHP_METHOD(swoole_server, taskwait) { RETURN_FALSE; } } else { - auto retval = serv->task_sync(&buf, (int *) &dst_worker_id, timeout); + auto retval = serv->task_sync(&buf, (int *) &dst_worker_id, timeout); if (!retval) { RETURN_FALSE; } @@ -3299,7 +3298,7 @@ static PHP_METHOD(swoole_server, taskCo) { zval *ztask; SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask) { - EventData buf; + EventData buf; task_id = php_swoole_server_task_pack(ztask, &buf); if (task_id < 0) { php_swoole_fatal_error(E_WARNING, "failed to pack task"); @@ -3503,7 +3502,7 @@ static PHP_METHOD(swoole_server, finish) { Z_PARAM_ZVAL(zdata) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - SW_CHECK_RETURN(php_swoole_server_task_finish(serv, zdata, nullptr)); + RETURN_BOOL(php_swoole_server_task_finish(serv, zdata, nullptr)); } static PHP_METHOD(swoole_server_task, finish) { @@ -3520,7 +3519,7 @@ static PHP_METHOD(swoole_server_task, finish) { ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); DataHead *info = php_swoole_server_task_get_info(ZEND_THIS); - SW_CHECK_RETURN(php_swoole_server_task_finish(serv, zdata, (EventData *) info)); + RETURN_BOOL(php_swoole_server_task_finish(serv, zdata, (EventData *) info)); } static PHP_METHOD(swoole_server_task, pack) { diff --git a/include/swoole_server.h b/include/swoole_server.h index 4585584b9b0..f9c95627f9e 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -1003,7 +1003,7 @@ class Server { } EventData *get_task_result() { - return &(task_results[swoole_get_process_id()]); + return &(task_results[swoole_get_process_id()]); } WorkerId get_task_src_worker_id(EventData *task) { @@ -1371,7 +1371,6 @@ class Server { } ssize_t send_to_reactor_thread(const EventData *ev_data, size_t sendn, SessionId session_id); - int reply_task_result(const char *data, size_t data_len, int flags, EventData *current_task); bool send(SessionId session_id, const void *data, uint32_t length); bool sendfile(SessionId session_id, const char *file, uint32_t l_file, off_t offset, size_t length); @@ -1387,6 +1386,7 @@ class Server { const Command::Callback &fn); bool task(EventData *task, int *dst_worker_id, bool blocking = false); + bool finish(const char *data, size_t data_len, int flags, EventData *current_task); bool task_sync(EventData *task, int *dst_worker_id, double timeout); bool send_pipe_message(WorkerId worker_id, EventData *msg); diff --git a/src/server/master.cc b/src/server/master.cc index 06498be4238..9a8be720ea0 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -528,8 +528,8 @@ int Server::create_task_workers() { } /* - * For Server::task_sync(), create notify pipe and result shared memory. - */ + * For Server::task_sync(), create notify pipe and result shared memory. + */ task_results = (EventData *) sw_shm_calloc(worker_num, sizeof(EventData)); if (!task_results) { swoole_warning("malloc[task_result] failed"); diff --git a/src/server/task_worker.cc b/src/server/task_worker.cc index 2e106542f1d..4cbc202ccaf 100644 --- a/src/server/task_worker.cc +++ b/src/server/task_worker.cc @@ -325,23 +325,21 @@ static int TaskWorker_loop_async(ProcessPool *pool, Worker *worker) { /** * Send the task result to worker */ -int Server::reply_task_result(const char *data, size_t data_len, int flags, EventData *current_task) { - EventData buf; - sw_memset_zero(&buf.info, sizeof(buf.info)); +bool Server::finish(const char *data, size_t data_len, int flags, EventData *current_task) { if (task_worker_num < 1) { swoole_warning("cannot use Server::task()/Server::finish() method, because no set [task_worker_num]"); - return SW_ERR; + return false; } if (current_task == nullptr) { current_task = last_task; } if (current_task->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) { swoole_warning("Server::task()/Server::finish() is not supported in onPipeMessage callback"); - return SW_ERR; + return false; } if (current_task->info.ext_flags & SW_TASK_NOREPLY) { swoole_warning("Server::finish() can only be used in the worker process"); - return SW_ERR; + return false; } uint16_t source_worker_id = current_task->info.reactor_id; @@ -349,16 +347,17 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even if (worker == nullptr) { swoole_warning("invalid worker_id[%d]", source_worker_id); - return SW_ERR; + return false; } ssize_t retval; // for swoole_server_task if (current_task->info.ext_flags & SW_TASK_NONBLOCK) { // write to file + EventData buf; if (!task_pack(&buf, data, data_len)) { swoole_warning("large task pack failed()"); - return SW_ERR; + return false; } // callback function if (current_task->info.ext_flags & SW_TASK_CALLBACK) { @@ -396,6 +395,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even char *_tmpfile = result->data + 4; File file(_tmpfile, O_APPEND | O_WRONLY); if (file.ready()) { + EventData buf; if (!task_pack(&buf, data, data_len)) { swoole_warning("large task pack failed()"); buf.info.len = 0; @@ -414,7 +414,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even // unlock worker worker->lock->unlock(); swoole_warning("large task pack failed()"); - return SW_ERR; + return false; } result->info.ext_flags |= flags; result->info.type = SW_SERVER_EVENT_FINISH; @@ -442,6 +442,6 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even swoole_sys_warning("send result to worker failed"); } } - return retval; + return true; } } // namespace swoole