Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Thread\Barrier #5380

Merged
merged 4 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ext-src/php_swoole.cc
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ PHP_MINIT_FUNCTION(swoole) {
php_swoole_thread_minit(module_number);
php_swoole_thread_atomic_minit(module_number);
php_swoole_thread_lock_minit(module_number);
php_swoole_thread_barrier_minit(module_number);
php_swoole_thread_queue_minit(module_number);
php_swoole_thread_map_minit(module_number);
php_swoole_thread_arraylist_minit(module_number);
Expand Down
1 change: 1 addition & 0 deletions ext-src/php_swoole_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ void php_swoole_name_resolver_minit(int module_number);
void php_swoole_thread_minit(int module_number);
void php_swoole_thread_atomic_minit(int module_number);
void php_swoole_thread_lock_minit(int module_number);
void php_swoole_thread_barrier_minit(int module_number);
void php_swoole_thread_queue_minit(int module_number);
void php_swoole_thread_map_minit(int module_number);
void php_swoole_thread_arraylist_minit(int module_number);
Expand Down
8 changes: 8 additions & 0 deletions ext-src/stubs/php_swoole_thread_barrier.stub.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php
namespace Swoole\Thread {
class Barrier {
public function __construct(int $count) {}
public function wait(): void {}
public function __wakeup(): void {}
}
}
11 changes: 11 additions & 0 deletions ext-src/stubs/php_swoole_thread_barrier_arginfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: eac62993bfb3fbd87587a8e6997c16bca7dc5dbc */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Swoole_Thread_Barrier___construct, 0, 0, 1)
ZEND_ARG_TYPE_INFO(0, count, IS_LONG, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Swoole_Thread_Barrier_wait, 0, 0, IS_VOID, 0)
ZEND_END_ARG_INFO()

#define arginfo_class_Swoole_Thread_Barrier___wakeup arginfo_class_Swoole_Thread_Barrier_wait
4 changes: 0 additions & 4 deletions ext-src/swoole_thread_atomic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,15 @@ static PHP_METHOD(swoole_thread_atomic, set);
static PHP_METHOD(swoole_thread_atomic, cmpset);
static PHP_METHOD(swoole_thread_atomic, wait);
static PHP_METHOD(swoole_thread_atomic, wakeup);
#ifdef SW_THREAD
static PHP_METHOD(swoole_thread_atomic, __wakeup);
#endif

static PHP_METHOD(swoole_thread_atomic_long, __construct);
static PHP_METHOD(swoole_thread_atomic_long, add);
static PHP_METHOD(swoole_thread_atomic_long, sub);
static PHP_METHOD(swoole_thread_atomic_long, get);
static PHP_METHOD(swoole_thread_atomic_long, set);
static PHP_METHOD(swoole_thread_atomic_long, cmpset);
#ifdef SW_THREAD
static PHP_METHOD(swoole_thread_atomic_long, __wakeup);
#endif
SW_EXTERN_C_END

// clang-format off
Expand Down
153 changes: 153 additions & 0 deletions ext-src/swoole_thread_barrier.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| [email protected] so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Tianfeng Han <[email protected]> |
+----------------------------------------------------------------------+
*/

#include "php_swoole_private.h"
#include "php_swoole_thread.h"
#include "swoole_memory.h"
#include "swoole_lock.h"

#ifdef SW_THREAD

BEGIN_EXTERN_C()
#include "stubs/php_swoole_thread_barrier_arginfo.h"
END_EXTERN_C()

using swoole::Barrier;

static zend_class_entry *swoole_thread_barrier_ce;
static zend_object_handlers swoole_thread_barrier_handlers;

struct BarrierResource : public ThreadResource {
Barrier barrier_;
BarrierResource(int count) : ThreadResource() {
barrier_.init(false, count);
}
void wait() {
barrier_.wait();
}
~BarrierResource() {
barrier_.destroy();
}
};

struct BarrierObject {
BarrierResource *barrier;
zend_object std;
};

static sw_inline BarrierObject *php_swoole_thread_barrier_fetch_object(zend_object *obj) {
return (BarrierObject *) ((char *) obj - swoole_thread_barrier_handlers.offset);
}

static BarrierResource *php_swoole_thread_barrier_get_ptr(zval *zobject) {
return php_swoole_thread_barrier_fetch_object(Z_OBJ_P(zobject))->barrier;
}

static BarrierResource *php_swoole_thread_barrier_get_and_check_ptr(zval *zobject) {
BarrierResource *barrier = php_swoole_thread_barrier_get_ptr(zobject);
if (!barrier) {
php_swoole_fatal_error(E_ERROR, "must call constructor first");
}
return barrier;
}

static void php_swoole_thread_barrier_free_object(zend_object *object) {
BarrierObject *bo = php_swoole_thread_barrier_fetch_object(object);
zend_long resource_id = zend::object_get_long(object, ZEND_STRL("id"));
if (bo->barrier && php_swoole_thread_resource_free(resource_id, bo->barrier)) {
delete bo->barrier;
bo->barrier = nullptr;
}
zend_object_std_dtor(object);
}

static zend_object *php_swoole_thread_barrier_create_object(zend_class_entry *ce) {
BarrierObject *bo = (BarrierObject *) zend_object_alloc(sizeof(BarrierObject), ce);
zend_object_std_init(&bo->std, ce);
object_properties_init(&bo->std, ce);
bo->std.handlers = &swoole_thread_barrier_handlers;
return &bo->std;
}

SW_EXTERN_C_BEGIN
static PHP_METHOD(swoole_thread_barrier, __construct);
static PHP_METHOD(swoole_thread_barrier, wait);
static PHP_METHOD(swoole_thread_barrier, __wakeup);
SW_EXTERN_C_END

// clang-format off
static const zend_function_entry swoole_thread_barrier_methods[] =
{
PHP_ME(swoole_thread_barrier, __construct, arginfo_class_Swoole_Thread_Barrier___construct, ZEND_ACC_PUBLIC)
PHP_ME(swoole_thread_barrier, wait, arginfo_class_Swoole_Thread_Barrier_wait, ZEND_ACC_PUBLIC)
PHP_ME(swoole_thread_barrier, __wakeup, arginfo_class_Swoole_Thread_Barrier___wakeup, ZEND_ACC_PUBLIC)
PHP_FE_END
};
// clang-format on

void php_swoole_thread_barrier_minit(int module_number) {
SW_INIT_CLASS_ENTRY(swoole_thread_barrier, "Swoole\\Thread\\Barrier", nullptr, swoole_thread_barrier_methods);
zend_declare_property_long(swoole_thread_barrier_ce, ZEND_STRL("id"), 0, ZEND_ACC_PUBLIC);
SW_SET_CLASS_CLONEABLE(swoole_thread_barrier, sw_zend_class_clone_deny);
SW_SET_CLASS_UNSET_PROPERTY_HANDLER(swoole_thread_barrier, sw_zend_class_unset_property_deny);
SW_SET_CLASS_CUSTOM_OBJECT(swoole_thread_barrier,
php_swoole_thread_barrier_create_object,
php_swoole_thread_barrier_free_object,
BarrierObject,
std);
}

static PHP_METHOD(swoole_thread_barrier, __construct) {
auto bo = php_swoole_thread_barrier_fetch_object(Z_OBJ_P(ZEND_THIS));
if (bo->barrier != nullptr) {
zend_throw_error(NULL, "Constructor of %s can only be called once", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS));
RETURN_FALSE;
}

zend_long count;
ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_LONG(count)
ZEND_PARSE_PARAMETERS_END();

if (count < 2) {
zend_throw_exception(
swoole_exception_ce, "The parameter $count must be greater than 1", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}

bo->barrier = new BarrierResource(count);
auto resource_id = php_swoole_thread_resource_insert(bo->barrier);
zend_update_property_long(swoole_thread_barrier_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("id"), resource_id);
RETURN_TRUE;
}

static PHP_METHOD(swoole_thread_barrier, wait) {
BarrierResource *barrier = php_swoole_thread_barrier_get_and_check_ptr(ZEND_THIS);
if (barrier) {
barrier->wait();
}
}

static PHP_METHOD(swoole_thread_barrier, __wakeup) {
auto bo = php_swoole_thread_barrier_fetch_object(Z_OBJ_P(ZEND_THIS));
zend_long resource_id = zend::object_get_long(ZEND_THIS, ZEND_STRL("id"));
bo->barrier = static_cast<BarrierResource *>(php_swoole_thread_resource_fetch(resource_id));
if (!bo->barrier) {
zend_throw_exception(swoole_exception_ce, EMSG_NO_RESOURCE, ECODE_NO_RESOURCE);
return;
}
}
#endif
4 changes: 0 additions & 4 deletions ext-src/swoole_thread_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ using swoole::RWLock;
static zend_class_entry *swoole_thread_lock_ce;
static zend_object_handlers swoole_thread_lock_handlers;

#ifdef SW_THREAD
struct LockResource : public ThreadResource {
Lock *lock_;
LockResource(int type) : ThreadResource() {
Expand All @@ -62,7 +61,6 @@ struct LockResource : public ThreadResource {
delete lock_;
}
};
#endif

struct LockObject {
LockResource *lock;
Expand Down Expand Up @@ -113,9 +111,7 @@ static PHP_METHOD(swoole_thread_lock, lock_read);
static PHP_METHOD(swoole_thread_lock, trylock_read);
static PHP_METHOD(swoole_thread_lock, unlock);
static PHP_METHOD(swoole_thread_lock, destroy);
#ifdef SW_THREAD
static PHP_METHOD(swoole_thread_lock, __wakeup);
#endif
SW_EXTERN_C_END

// clang-format off
Expand Down
2 changes: 1 addition & 1 deletion include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ typedef unsigned long ulong_t;
#define SW_ASSERT(e)
#define SW_ASSERT_1BYTE(v)
#endif
#define SW_START_SLEEP usleep(100000) // sleep 1s,wait fork and pthread_create
#define SW_START_SLEEP usleep(100000) // sleep 0.1s, wait fork and pthread_create

#ifdef SW_THREAD
#define SW_THREAD_LOCAL thread_local
Expand Down
19 changes: 19 additions & 0 deletions include/swoole_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,23 @@ class SpinLock : public Lock {
int trylock() override;
};
#endif

#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
#define SW_USE_PTHREAD_BARRIER
#endif

struct Barrier {
#ifdef SW_USE_PTHREAD_BARRIER
pthread_barrier_t barrier_;
pthread_barrierattr_t barrier_attr_;
bool shared_;
#else
sw_atomic_t count_;
sw_atomic_t barrier_;
#endif
void init(bool shared, int count);
void wait();
void destroy();
};

} // namespace swoole
9 changes: 2 additions & 7 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,7 @@ struct ServerGS {

sw_atomic_t spinlock;

#ifdef HAVE_PTHREAD_BARRIER
pthread_barrier_t manager_barrier;
pthread_barrierattr_t manager_barrier_attr;
#endif
Barrier manager_barrier;

ProcessPool task_workers;
ProcessPool event_workers;
Expand Down Expand Up @@ -858,9 +855,7 @@ class Server {
std::shared_ptr<std::vector<std::string>> http_index_files = nullptr;
std::shared_ptr<std::unordered_set<std::string>> http_compression_types = nullptr;

#ifdef HAVE_PTHREAD_BARRIER
pthread_barrier_t reactor_thread_barrier = {};
#endif
Barrier reactor_thread_barrier = {};

/**
* temporary directory for HTTP uploaded file.
Expand Down
63 changes: 63 additions & 0 deletions src/lock/barrier.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| [email protected] so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Tianfeng Han <[email protected]> |
+----------------------------------------------------------------------+
*/

#include "swoole.h"
#include "swoole_lock.h"

namespace swoole {

#define BARRIER_USEC 10000

void Barrier::init(bool shared, int count) {
#ifdef SW_USE_PTHREAD_BARRIER
if (shared) {
pthread_barrierattr_setpshared(&barrier_attr_, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&barrier_, &barrier_attr_, count);
} else {
pthread_barrier_init(&barrier_, nullptr, count);
}
shared_ = shared;
#else
barrier_ = 0;
count_ = count;
#endif
}

void Barrier::wait() {
#ifdef SW_USE_PTHREAD_BARRIER
pthread_barrier_wait(&barrier_);
#else
sw_atomic_add_fetch(&barrier_, 1);
SW_LOOP {
if (barrier_ == count_) {
break;
}
usleep(BARRIER_USEC);
sw_atomic_memory_barrier();
}
#endif
}

void Barrier::destroy() {
#ifdef SW_USE_PTHREAD_BARRIER
pthread_barrier_destroy(&barrier_);
if (shared_) {
pthread_barrierattr_destroy(&barrier_attr_);
}
#endif
}

}; // namespace swoole
7 changes: 1 addition & 6 deletions src/server/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,7 @@ void Manager::wait(Server *_server) {
int sigid = SIGTERM;
procctl(P_PID, 0, PROC_PDEATHSIG_CTL, &sigid);
#endif

#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
pthread_barrier_wait(&_server->gs->manager_barrier);
#else
SW_START_SLEEP;
#endif
_server->gs->manager_barrier.wait();
}

if (_server->isset_hook(Server::HOOK_MANAGER_START)) {
Expand Down
Loading
Loading