Skip to content

Commit

Permalink
fix tests [3]
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 14, 2024
1 parent 90a1190 commit 7cd6dd2
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 21 deletions.
64 changes: 57 additions & 7 deletions core-tests/src/os/process_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,72 @@ using namespace swoole;

static void test_func(ProcessPool &pool) {
EventData data{};
data.info.len = strlen(TEST_JPG_MD5SUM);
memcpy(data.data, TEST_JPG_MD5SUM, data.info.len);
size_t size = swoole_system_random(1024, 4096);
String rmem(size);
rmem.append_random_bytes(size - 1);
rmem.append("\0");

data.info.len = size;
memcpy(data.data, rmem.value(), size);

int worker_id = -1;
ASSERT_EQ(pool.dispatch_blocking(&data, &worker_id), SW_OK);

pool.running = true;
pool.ptr = &rmem;
SwooleWG.run_always = true;
pool.main_loop(&pool, pool.get_worker(0));
pool.destroy();
}

static void test_func_task_protocol(ProcessPool &pool) {
pool.set_protocol(SW_PROTOCOL_TASK);
pool.onTask = [](ProcessPool *pool, Worker *worker, EventData *task) -> int {
pool->running = false;
usleep(10000);
EXPECT_MEMEQ(task->data, TEST_JPG_MD5SUM, task->info.len);
return 0;
};
pool.main_loop(&pool, pool.get_worker(0));
pool.destroy();
test_func(pool);
}

static void test_func_message_protocol(ProcessPool &pool) {
pool.set_protocol(SW_PROTOCOL_MESSAGE);
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
pool->running = false;
String *_data = (String *) pool->ptr;
usleep(10000);
EXPECT_MEMEQ(_data->str, rdata->data, rdata->info.len);
};
test_func(pool);
}

static void test_func_stream_protocol(ProcessPool &pool) {
pool.set_protocol(SW_PROTOCOL_STREAM);
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
pool->running = false;
String *_data = (String *) pool->ptr;
EventData *msg = (EventData *) rdata->data;
usleep(10000);
EXPECT_MEMEQ(_data->str, msg->data, msg->len());
};
test_func(pool);
}

TEST(process_pool, tcp) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK);
ASSERT_EQ(pool.listen(TEST_HOST, TEST_PORT, 128), SW_OK);

test_func(pool);
test_func_task_protocol(pool);
}

TEST(process_pool, unix_sock) {
ProcessPool pool{};
signal(SIGPIPE, SIG_IGN);
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);

test_func(pool);
test_func_task_protocol(pool);
}

TEST(process_pool, tcp_raw) {
Expand Down Expand Up @@ -72,7 +108,21 @@ TEST(process_pool, msgqueue) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0x9501, SW_IPC_MSGQUEUE), SW_OK);

test_func(pool);
test_func_task_protocol(pool);
}

TEST(process_pool, message_protocol) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);

test_func_message_protocol(pool);
}

TEST(process_pool, stream_protocol) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);

test_func_stream_protocol(pool);
}

constexpr int magic_number = 99900011;
Expand Down
7 changes: 7 additions & 0 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,14 @@ struct ProcessPool {
int listen(const char *host, int port, int blacklog);
int schedule();
bool is_worker_running(Worker *worker);

static void kill_timeout_worker(Timer *timer, TimerNode *tnode);

private:
static int run_with_task_protocol(ProcessPool *pool, Worker *worker);
static int run_with_stream_protocol(ProcessPool *pool, Worker *worker);
static int run_with_message_protocol(ProcessPool *pool, Worker *worker);
static int run_async(ProcessPool *pool, Worker *worker);
};
}; // namespace swoole

Expand Down
37 changes: 23 additions & 14 deletions src/os/process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ namespace swoole {
using network::Socket;
using network::Stream;

static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker *worker);
static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worker *worker);
static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Worker *worker);
static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker);

void ProcessPool::kill_timeout_worker(Timer *timer, TimerNode *tnode) {
uint32_t i;
pid_t reload_worker_pid = 0;
Expand Down Expand Up @@ -113,7 +108,7 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPCMode _ip

map_ = new std::unordered_map<pid_t, Worker *>;
ipc_mode = _ipc_mode;
main_loop = ProcessPool_worker_loop_with_task_protocol;
main_loop = run_with_task_protocol;
protocol_type_ = SW_PROTOCOL_TASK;
max_packet_size_ = SW_INPUT_BUFFER_SIZE;

Expand Down Expand Up @@ -207,13 +202,13 @@ int ProcessPool::listen(const char *host, int port, int blacklog) {
void ProcessPool::set_protocol(enum ProtocolType _protocol_type) {
switch (_protocol_type) {
case SW_PROTOCOL_TASK:
main_loop = ProcessPool_worker_loop_with_task_protocol;
main_loop = run_with_task_protocol;
break;
case SW_PROTOCOL_STREAM:
main_loop = ProcessPool_worker_loop_with_stream_protocol;
main_loop = run_with_stream_protocol;
break;
case SW_PROTOCOL_MESSAGE:
main_loop = ProcessPool_worker_loop_with_message_protocol;
main_loop = run_with_message_protocol;
break;
default:
abort();
Expand All @@ -234,7 +229,7 @@ int ProcessPool::start_check() {
swoole_set_process_type(SW_PROCESS_MASTER);

if (async) {
main_loop = ProcessPool_worker_loop_async;
main_loop = run_async;
}

SW_LOOP_N(worker_num) {
Expand Down Expand Up @@ -507,7 +502,7 @@ bool ProcessPool::is_worker_running(Worker *worker) {
return running && !SwooleWG.shutdown && !worker->has_exceeded_max_request();
}

static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker *worker) {
int ProcessPool::run_with_task_protocol(ProcessPool *pool, Worker *worker) {
struct {
long mtype;
EventData buf;
Expand Down Expand Up @@ -626,7 +621,7 @@ static int ProcessPool_recv_message(Reactor *reactor, Event *event) {
return SW_OK;
}

static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker) {
int ProcessPool::run_async(ProcessPool *pool, Worker *worker) {
if (pool->ipc_mode == SW_IPC_UNIXSOCK && pool->onMessage) {
swoole_event_add(worker->pipe_worker, SW_EVENT_READ);
if (pool->message_bus) {
Expand All @@ -642,7 +637,7 @@ static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker) {
return swoole_event_wait();
}

static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worker *worker) {
int ProcessPool::run_with_stream_protocol(ProcessPool *pool, Worker *worker) {
ssize_t n;
RecvData msg{};
msg.info.reactor_id = -1;
Expand All @@ -652,6 +647,10 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke
pool->stream_info_->response_buffer = new String(SW_BUFFER_SIZE_STD);
}

if (pool->ipc_mode == SW_IPC_UNIXSOCK && pool->message_bus == nullptr) {
pool->create_message_bus();
}

QueueNode *outbuf = (QueueNode *) pool->packet_buffer;
outbuf->mtype = 0;

Expand Down Expand Up @@ -739,7 +738,7 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke
return SW_OK;
}

static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Worker *worker) {
int ProcessPool::run_with_message_protocol(ProcessPool *pool, Worker *worker) {
auto fn = [&]() -> int {
if (worker->pipe_worker->wait_event(-1, SW_EVENT_READ) < 0) {
return errno == EINTR ? 0 : -1;
Expand All @@ -758,6 +757,16 @@ static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Work
return 1;
};

if (pool->ipc_mode != SW_IPC_UNIXSOCK) {
swoole_error_log(
SW_LOG_WARNING, SW_ERROR_OPERATION_NOT_SUPPORT, "not support, ipc_mode must be SW_IPC_UNIXSOCK");
return SW_ERR;
}

if (pool->message_bus == nullptr) {
pool->create_message_bus();
}

worker->pipe_worker->dont_restart = 1;

while (pool->is_worker_running(worker)) {
Expand Down

0 comments on commit 7cd6dd2

Please sign in to comment.