Skip to content

Commit

Permalink
Refactor iouring (#5551)
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman authored Oct 31, 2024
1 parent cd7ff6f commit ca9d9d7
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 289 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/iouring.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on: [push, pull_request]
jobs:
test-linux:
if: "!contains(github.event.head_commit.message, '--filter=') || contains(github.event.head_commit.message, '[iouring]')"
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
strategy:
fail-fast: false
matrix:
Expand Down
9 changes: 5 additions & 4 deletions ext-src/php_swoole.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "php_swoole_library.h"
#include "php_swoole_process.h"
#include "php_swoole_thread.h"
#include "swoole_iouring.h"

BEGIN_EXTERN_C()
#include "zend_exceptions.h"
Expand Down Expand Up @@ -61,7 +62,7 @@ END_EXTERN_C()
using swoole::Server;
using swoole::network::Socket;
#ifdef SW_USE_IOURING
using swoole::AsyncIouring;
using swoole::Iouring;
#endif

ZEND_DECLARE_MODULE_GLOBALS(swoole)
Expand Down Expand Up @@ -723,8 +724,8 @@ PHP_MINIT_FUNCTION(swoole) {
* iouring
*/
#ifdef SW_USE_IOURING
SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_DEFAULT", AsyncIouring::SW_IOURING_DEFAULT);
SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_SQPOLL", AsyncIouring::SW_IOURING_SQPOLL);
SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_DEFAULT", Iouring::SW_IOURING_DEFAULT);
SW_REGISTER_LONG_CONSTANT("SWOOLE_IOURING_SQPOLL", Iouring::SW_IOURING_SQPOLL);
#endif

// clang-format on
Expand Down Expand Up @@ -1068,7 +1069,7 @@ PHP_RINIT_FUNCTION(swoole) {
* This would cause php_swoole_load_library function not to execute correctly, so it must be replaced
* with the execute_ex function.
*/
void (*old_zend_execute_ex)(zend_execute_data * execute_data) = nullptr;
void (*old_zend_execute_ex)(zend_execute_data *execute_data) = nullptr;
if (UNEXPECTED(zend_execute_ex != execute_ex)) {
old_zend_execute_ex = zend_execute_ex;
zend_execute_ex = execute_ex;
Expand Down
8 changes: 2 additions & 6 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@
#include <functional>
#include <mutex>

#ifdef SW_USE_IOURING
#include <liburing.h>
#endif

typedef unsigned long ulong_t;

#ifndef PRId64
Expand Down Expand Up @@ -221,7 +217,7 @@ struct Address;
} // namespace network
class AsyncThreads;
#ifdef SW_USE_IOURING
class AsyncIouring;
class Iouring;
#endif
namespace async {
class ThreadPool;
Expand Down Expand Up @@ -701,7 +697,7 @@ struct ThreadGlobal {
MessageBus *message_bus;
AsyncThreads *async_threads;
#ifdef SW_USE_IOURING
AsyncIouring *async_iouring;
Iouring *iouring;
#endif
uint32_t signal_listener_num;
uint32_t co_signal_listener_num;
Expand Down
106 changes: 0 additions & 106 deletions include/swoole_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
#include <atomic>
#include <queue>

#ifdef SW_USE_IOURING
#include <liburing.h>
#endif

#ifndef O_DIRECT
#define O_DIRECT 040000
#endif
Expand All @@ -45,32 +41,16 @@ struct AsyncRequest {

struct AsyncEvent {
size_t task_id;
#ifdef SW_USE_IOURING
size_t count;
#endif
uint8_t canceled;
int error;
/**
* input & output
*/
std::shared_ptr<AsyncRequest> data;
#ifdef SW_USE_IOURING
const char *pathname;
const char *pathname2;
struct statx *statxbuf;
void *rbuf;
const void *wbuf;
#endif
/**
* output
*/
ssize_t retval;
#ifdef SW_USE_IOURING
int fd;
int flags;
int opcode;
mode_t mode;
#endif
/**
* internal use only
*/
Expand Down Expand Up @@ -148,92 +128,6 @@ class AsyncThreads {
static int callback(Reactor *reactor, Event *event);
};

#ifdef SW_USE_IOURING
class AsyncIouring {
private:
int ring_fd;
uint64_t task_num = 0;
uint64_t entries = 8192;
struct io_uring ring;
std::queue<AsyncEvent *> waiting_tasks;
network::Socket *iou_socket = nullptr;
Reactor *reactor = nullptr;

inline struct io_uring_sqe *get_iouring_sqe() {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
// We need to reset the values of each sqe structure so that they can be used in a loop.
if (sqe) {
memset(sqe, 0, sizeof(struct io_uring_sqe));
}
return sqe;
}

inline bool submit_iouring_sqe(AsyncEvent *event) {
int ret = io_uring_submit(&ring);

if (ret < 0) {
errno = -ret;
if (ret == -EAGAIN) {
waiting_tasks.push(event);
return true;
}
return false;
}

task_num++;
return true;
}

public:
AsyncIouring(Reactor *reactor_);
~AsyncIouring();

enum opcodes {
SW_IORING_OP_OPENAT = IORING_OP_OPENAT,
SW_IORING_OP_CLOSE = IORING_OP_CLOSE,
SW_IORING_OP_STATX = IORING_OP_STATX,
SW_IORING_OP_READ = IORING_OP_READ,
SW_IORING_OP_WRITE = IORING_OP_WRITE,
SW_IORING_OP_RENAMEAT = IORING_OP_RENAMEAT,
SW_IORING_OP_UNLINKAT = IORING_OP_UNLINKAT,
SW_IORING_OP_MKDIRAT = IORING_OP_MKDIRAT,

SW_IORING_OP_FSTAT = 1000,
SW_IORING_OP_LSTAT = 1001,
SW_IORING_OP_UNLINK_FILE = 1002,
SW_IORING_OP_UNLINK_DIR = 1003,
SW_IORING_OP_FSYNC = 1004,
SW_IORING_OP_FDATASYNC = 1005,
};

enum flags {
SW_IOURING_DEFAULT = 0,
SW_IOURING_SQPOLL = IORING_SETUP_SQPOLL,
};

void add_event();
void delete_event();
bool wakeup();
bool open(AsyncEvent *event);
bool close(AsyncEvent *event);
bool wr(AsyncEvent *event);
bool statx(AsyncEvent *event);
bool mkdir(AsyncEvent *event);
bool unlink(AsyncEvent *event);
bool rename(AsyncEvent *event);
bool fsync(AsyncEvent *event);
inline bool is_empty_waiting_tasks() {
return waiting_tasks.size() == 0;
}

inline uint64_t get_task_num() {
return task_num;
}

static int callback(Reactor *reactor, Event *event);
};
#endif

namespace async {

typedef void (*Handler)(AsyncEvent *event);
Expand Down
16 changes: 0 additions & 16 deletions include/swoole_coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,22 +301,6 @@ class Coroutine {
namespace coroutine {
bool async(async::Handler handler, AsyncEvent &event, double timeout = -1);
bool async(const std::function<void(void)> &fn, double timeout = -1);
#ifdef SW_USE_IOURING
int async(AsyncIouring::opcodes opcode,
const char *pathname,
const char *pathname2 = nullptr,
mode_t mode = 0,
int flags = 0,
struct statx *statxbuf = nullptr,
double timeout = -1);
int async(AsyncIouring::opcodes opcode,
int fd,
void *rbuf = nullptr,
const void *wbuf = nullptr,
struct statx *statxbuf = nullptr,
size_t count = 0,
double timeout = -1);
#endif
bool run(const CoroutineFunc &fn, void *arg = nullptr);
} // namespace coroutine
//-------------------------------------------------------------------------------
Expand Down
145 changes: 145 additions & 0 deletions include/swoole_iouring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| [email protected] so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: NathanFreeman <[email protected]> |
+----------------------------------------------------------------------+
*/

#ifndef SWOOLE_SRC_SWOOLE_IOURING_H
#define SWOOLE_SRC_SWOOLE_IOURING_H

#include "swoole_coroutine.h"

#ifdef SW_USE_IOURING
#include <liburing.h>

using swoole::Coroutine;

enum Opcodes {
SW_IORING_OP_OPENAT = IORING_OP_OPENAT,
SW_IORING_OP_CLOSE = IORING_OP_CLOSE,
SW_IORING_OP_STATX = IORING_OP_STATX,
SW_IORING_OP_READ = IORING_OP_READ,
SW_IORING_OP_WRITE = IORING_OP_WRITE,
SW_IORING_OP_RENAMEAT = IORING_OP_RENAMEAT,
SW_IORING_OP_UNLINKAT = IORING_OP_UNLINKAT,
SW_IORING_OP_MKDIRAT = IORING_OP_MKDIRAT,

SW_IORING_OP_FSTAT = 1000,
SW_IORING_OP_LSTAT = 1001,
SW_IORING_OP_UNLINK_FILE = 1002,
SW_IORING_OP_UNLINK_DIR = 1003,
SW_IORING_OP_FSYNC = 1004,
SW_IORING_OP_FDATASYNC = 1005,
};

namespace swoole {
struct IouringEvent {
int fd;
int flags;
int opcode;
mode_t mode;
uint64_t count; // share with offset
ssize_t result;
void *rbuf;
Coroutine *coroutine;
const void *wbuf;
const char *pathname;
const char *pathname2;
struct statx *statxbuf;
uint8_t canceled = 0;
};

class Iouring {
private:
int ring_fd;
uint64_t task_num = 0;
uint64_t entries = 8192;
struct io_uring ring;
std::queue<IouringEvent *> waiting_tasks;
network::Socket *iou_socket = nullptr;
Reactor *reactor = nullptr;

void add_event();
void delete_event();
bool wakeup();
bool open(IouringEvent *event);
bool close(IouringEvent *event);
bool wr(IouringEvent *event);
bool statx(IouringEvent *event);
bool mkdir(IouringEvent *event);
bool unlink(IouringEvent *event);
bool rename(IouringEvent *event);
bool fsync(IouringEvent *event);
int dispatch(IouringEvent *event);

inline struct io_uring_sqe *get_iouring_sqe() {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
// We need to reset the values of each sqe structure so that they can be used in a loop.
if (sqe) {
memset(sqe, 0, sizeof(struct io_uring_sqe));
}
return sqe;
}

inline bool submit_iouring_sqe(IouringEvent *event) {
int ret = io_uring_submit(&ring);

if (ret < 0) {
errno = -ret;
if (ret == -EAGAIN) {
waiting_tasks.push(event);
return true;
}
return false;
}

task_num++;
return true;
}

static Iouring *create_iouring();

public:
Iouring(Reactor *reactor_);
~Iouring();

enum flags {
SW_IOURING_DEFAULT = 0,
SW_IOURING_SQPOLL = IORING_SETUP_SQPOLL,
};

inline bool is_empty_waiting_tasks() {
return waiting_tasks.size() == 0;
}

inline uint64_t get_task_num() {
return task_num;
}

static int async(Opcodes type,
int fd = 0,
uint64_t count = 0,
void *rbuf = nullptr,
const void *wbuf = nullptr,
struct statx *statxbuf = nullptr);
static int async(Opcodes type,
const char *pathname = nullptr,
const char *pathname2 = nullptr,
struct statx *statxbuf = nullptr,
int flags = 0,
mode_t mode = 0);
static int callback(Reactor *reactor, Event *event);
};
}; // namespace swoole
#endif
#endif
Loading

0 comments on commit ca9d9d7

Please sign in to comment.