Skip to content

Commit

Permalink
Fix iouring error (#5270)
Browse files Browse the repository at this point in the history
* fix iouring error

* Fix macro error
  • Loading branch information
NathanFreeman authored Mar 11, 2024
1 parent a0f4ae1 commit 9f1c48a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 55 deletions.
7 changes: 7 additions & 0 deletions src/coroutine/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ int swoole_coroutine_iouring_open(const char *pathname, int flags, mode_t mode)
return async(AsyncIouring::SW_IORING_OP_OPENAT, pathname, nullptr, mode, flags);
}

int swoole_coroutine_iouring_close_file(int fd) {
if (sw_unlikely(is_no_coro())) {
return close(fd);
}
return async(AsyncIouring::SW_IORING_OP_CLOSE, fd);
}

ssize_t swoole_coroutine_iouring_read(int sockfd, void *buf, size_t count) {
if (sw_unlikely(is_no_coro())) {
return read(sockfd, buf, count);
Expand Down
104 changes: 51 additions & 53 deletions src/os/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,70 +104,68 @@ void AsyncIouring::delete_event() {
}

bool AsyncIouring::wakeup() {
unsigned num = entries * 2;
unsigned num = 8192;
struct io_uring_cqe *cqes[num];
unsigned count = get_iouring_cqes(cqes, num);
if (count == 0) {
return true;
}
if (count < 0) {
return false;
}
size_t cqes_size = num * sizeof(struct io_uring_cqe *);
unsigned count = 0;

unsigned i = 0;
AsyncEvent *tasks[count];
void *data = nullptr;
AsyncEvent *task = nullptr;
struct io_uring_cqe *cqe = nullptr;
for (i = 0; i < count; i++) {
cqe = cqes[i];
data = get_iouring_cqe_data(cqe);
task = reinterpret_cast<AsyncEvent *>(data);
task->retval = (cqe->res >= 0 ? cqe->res : -1);
if (cqe->res < 0) {
errno = abs(cqe->res);
}
tasks[i] = task;
task_num--;
}
finish_iouring_cqes(count);

AsyncEvent *waitEvent = nullptr;
for (i = 0; i < count; i++) {
task = tasks[i];
if (is_empty_wait_events()) {
task->callback(task);
continue;
}

waitEvent = waitEvents.front();
waitEvents.pop();
if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_OPENAT) {
open(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_CLOSE) {
close(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSTAT ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_LSTAT) {
statx(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_READ ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_WRITE) {
wr(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) {
rename(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) {
unlink(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) {
mkdir(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSYNC ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) {
fsync(waitEvent);
while (true) {
memset(cqes, 0, cqes_size);
count = get_iouring_cqes(cqes, num);
if (count == 0) {
return true;
}

task->callback(task);
}
for (i = 0; i < count; i++) {
cqe = cqes[i];
data = get_iouring_cqe_data(cqe);
task = reinterpret_cast<AsyncEvent *>(data);
task->retval = (cqe->res >= 0 ? cqe->res : -1);
if (cqe->res < 0) {
errno = abs(cqe->res);
}

task_num--;

if (is_empty_wait_events()) {
task->callback(task);
continue;
}

waitEvent = waitEvents.front();
waitEvents.pop();
if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_OPENAT) {
open(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_CLOSE) {
close(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSTAT ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_LSTAT) {
statx(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_READ ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_WRITE) {
wr(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) {
rename(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) {
unlink(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) {
mkdir(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSYNC ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) {
fsync(waitEvent);
}

return true;
task->callback(task);
}
finish_iouring_cqes(count);
}
}

bool AsyncIouring::open(AsyncEvent *event) {
Expand Down
18 changes: 17 additions & 1 deletion tests/swoole_runtime/file_hook/iouring.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ require __DIR__ . '/../../include/skipif.inc';
<?php
use Swoole\Runtime;
use function Swoole\Coroutine\run;
use Swoole\Coroutine\WaitGroup;
require __DIR__ . '/../../include/bootstrap.php';

Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
run(function(){
$results = [];
for ($i = 1; $i <= 10000; $i++) {
$results[$i] = random_bytes(rand(8192, 8192 * 3));
}

run(function() use ($results) {
$filesize = 1048576;
$content = random_bytes($filesize);
$fileName = '/tmp/test_file';
Expand Down Expand Up @@ -47,6 +53,16 @@ run(function(){
rmdir($directory);
Assert::true(!is_dir($directory));

$waitGroup = new WaitGroup();
for ($i = 1; $i <= 10000; $i++) {
go(function() use ($waitGroup, $i, $results){
$waitGroup->add();
file_put_contents('/tmp/file'.$i, $results[$i]);
Assert::true($results[$i] == file_get_contents('/tmp/file'.$i));
$waitGroup->done();
});
}
$waitGroup->wait();
echo 'SUCCESS';
});
?>
Expand Down
2 changes: 1 addition & 1 deletion thirdparty/php/streams/plain_wrapper.c
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ static php_stream *php_plain_files_stream_opener(php_stream_wrapper *wrapper,
return stream;
}

return sw_php_stream_fopen_rel(path, mode, opened_path, options STREAMS_REL_CC);
return sw_php_stream_fopen_rel(path, mode, opened_path, options);
}

static int php_plain_files_url_stater(
Expand Down

0 comments on commit 9f1c48a

Please sign in to comment.