Skip to content

Commit

Permalink
Added php_socket support for thread data container
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 20, 2024
1 parent 17465c3 commit 99cb543
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 9 deletions.
21 changes: 21 additions & 0 deletions examples/thread/socket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php


use Swoole\Thread;
use Swoole\Thread\Lock;
use Swoole\Thread\Map;

$args = Thread::getArguments();

if (empty($args)) {
$map = new Map();
$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$map['socket'] = $sock;
$thread = new Thread(__FILE__, $map);
echo "main thread\n";
$thread->join();
} else {
$map = $args[0];
$sock = $map['socket'];
$retval = socket_connect($sock, '127.0.0.1', 80);
}
18 changes: 18 additions & 0 deletions ext-src/php_swoole_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,24 @@ static sw_inline void sw_zval_free(zval *val) {
efree(val);
}

#ifdef SWOOLE_SOCKETS_SUPPORT
static inline bool sw_zval_is_php_socket(zval *val) {
return instanceof_function(Z_OBJCE_P(val), socket_ce);
}
#endif

static inline bool sw_zval_is_co_socket(zval *val) {
return instanceof_function(Z_OBJCE_P(val), swoole_socket_coro_ce);
}

static inline bool sw_zval_is_client(zval *val) {
return instanceof_function(Z_OBJCE_P(val), swoole_client_ce);
}

static inline bool sw_zval_is_process(zval *val) {
return instanceof_function(Z_OBJCE_P(val), swoole_process_ce);
}

//----------------------------------Constant API------------------------------------

#define SW_REGISTER_NULL_CONSTANT(name) REGISTER_NULL_CONSTANT(name, CONST_CS | CONST_PERSISTENT)
Expand Down
1 change: 1 addition & 0 deletions ext-src/php_swoole_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum {
IS_BARRIER = 84,
IS_ATOMIC = 85,
IS_ATOMIC_LONG = 86,
IS_PHP_SOCKET = 96,
IS_CO_SOCKET = 97,
IS_STREAM_SOCKET = 98,
IS_SERIALIZED_OBJECT = 99,
Expand Down
8 changes: 4 additions & 4 deletions ext-src/swoole_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,14 @@ int php_swoole_convert_to_fd(zval *zsocket) {
}
case IS_OBJECT: {
zval *zfd = nullptr;
if (instanceof_function(Z_OBJCE_P(zsocket), swoole_socket_coro_ce)) {
if (sw_zval_is_co_socket(zsocket)) {
zfd = sw_zend_read_property_ex(Z_OBJCE_P(zsocket), zsocket, SW_ZSTR_KNOWN(SW_ZEND_STR_FD), 0);
} else if (instanceof_function(Z_OBJCE_P(zsocket), swoole_client_ce)) {
} else if (sw_zval_is_client(zsocket)) {
zfd = sw_zend_read_property_ex(Z_OBJCE_P(zsocket), zsocket, SW_ZSTR_KNOWN(SW_ZEND_STR_SOCK), 0);
} else if (instanceof_function(Z_OBJCE_P(zsocket), swoole_process_ce)) {
} else if (sw_zval_is_process(zsocket)) {
zfd = sw_zend_read_property_ex(Z_OBJCE_P(zsocket), zsocket, SW_ZSTR_KNOWN(SW_ZEND_STR_PIPE), 0);
#ifdef SWOOLE_SOCKETS_SUPPORT
} else if (instanceof_function(Z_OBJCE_P(zsocket), socket_ce)) {
} else if (sw_zval_is_php_socket(zsocket)) {
php_socket *php_sock = SW_Z_SOCKET_P(zsocket);
if (IS_INVALID_SOCKET(php_sock)) {
php_swoole_fatal_error(E_WARNING, "contains a closed socket");
Expand Down
2 changes: 1 addition & 1 deletion ext-src/swoole_http_response.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ static PHP_METHOD(swoole_http_response, create) {
php_swoole_fatal_error(E_WARNING, "parameter $2 must be valid connection session id");
RETURN_FALSE;
}
} else if (instanceof_function(Z_OBJCE_P(zobject), swoole_socket_coro_ce)) {
} else if (sw_zval_is_co_socket(zobject)) {
sock = php_swoole_get_socket(zobject);
fd = sock->get_fd();
} else {
Expand Down
45 changes: 41 additions & 4 deletions ext-src/swoole_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ void php_swoole_thread_stream_create(zval *return_value, zend_long sockfd) {
void php_swoole_thread_co_socket_create(zval *return_value, zend_long sockfd, swSocketType type) {
int newfd = dup(sockfd);
if (newfd < 0) {
_error:
object_init_ex(return_value, swoole_thread_error_ce);
zend::object_set(return_value, ZEND_STRL("code"), errno);
return;
Expand All @@ -425,11 +426,27 @@ void php_swoole_thread_co_socket_create(zval *return_value, zend_long sockfd, sw
if (sockobj) {
ZVAL_OBJ(return_value, sockobj);
} else {
// never here
abort();
goto _error;
}
}

#ifdef SWOOLE_SOCKETS_SUPPORT
void php_swoole_thread_php_socket_create(zval *return_value, zend_long sockfd) {
int newfd = dup(sockfd);
if (newfd < 0) {
_error:
object_init_ex(return_value, swoole_thread_error_ce);
zend::object_set(return_value, ZEND_STRL("code"), errno);
return;
}
object_init_ex(return_value, socket_ce);
auto retsock = Z_SOCKET_P(return_value);
if (!socket_import_file_descriptor(newfd, retsock)) {
goto _error;
}
}
#endif

static PHP_METHOD(swoole_thread, getTsrmInfo) {
array_init(return_value);
add_assoc_bool(return_value, "is_main_thread", tsrm_is_main_thread());
Expand Down Expand Up @@ -477,14 +494,29 @@ void ArrayItem::store(zval *zvalue) {
break;
}
case IS_OBJECT: {
if (instanceof_function(Z_OBJCE_P(zvalue), swoole_socket_coro_ce)) {
if (sw_zval_is_co_socket(zvalue)) {
value.socket.fd = php_swoole_thread_co_socket_cast(zvalue, &value.socket.type);
type = IS_CO_SOCKET;
if (value.socket.fd == -1) {
zend_throw_exception(swoole_exception_ce, "failed to convert to socket fd", errno);
}
break;
}
#ifdef SWOOLE_SOCKETS_SUPPORT
else if (sw_zval_is_php_socket(zvalue)) {
php_socket *php_sock = SW_Z_SOCKET_P(zvalue);
if (php_sock->bsd_socket == -1) {
zend_throw_exception(swoole_exception_ce, "invalid socket fd", EBADF);
break;
}
value.socket.fd = dup(php_sock->bsd_socket);
if (value.socket.fd == -1) {
zend_throw_exception(swoole_exception_ce, "failed to dup socket fd", errno);
}
type = IS_PHP_SOCKET;
break;
}
#endif
CAST_OBJ_TO_RESOURCE(arraylist, IS_ARRAYLIST)
CAST_OBJ_TO_RESOURCE(map, IS_MAP)
CAST_OBJ_TO_RESOURCE(queue, IS_QUEUE)
Expand Down Expand Up @@ -579,6 +611,11 @@ void ArrayItem::fetch(zval *return_value) {
case IS_CO_SOCKET:
php_swoole_thread_co_socket_create(return_value, value.socket.fd, value.socket.type);
break;
#ifdef SWOOLE_SOCKETS_SUPPORT
case IS_PHP_SOCKET:
php_swoole_thread_php_socket_create(return_value, value.socket.fd);
break;
#endif
case IS_SERIALIZED_OBJECT:
php_swoole_unserialize(value.serialized_object, return_value);
break;
Expand All @@ -591,7 +628,7 @@ void ArrayItem::release() {
if (type == IS_STRING) {
zend_string_release(value.str);
value.str = nullptr;
} else if (type == IS_STREAM_SOCKET || type == IS_CO_SOCKET) {
} else if (type == IS_STREAM_SOCKET || type == IS_CO_SOCKET || type == IS_PHP_SOCKET) {
close(value.socket.fd);
value.socket.fd = -1;
} else if (type == IS_SERIALIZED_OBJECT) {
Expand Down
60 changes: 60 additions & 0 deletions tests/swoole_thread/php_socket.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
--TEST--
swoole_thread: php_socket
--SKIPIF--
<?php
require __DIR__ . '/../include/skipif.inc';
skip_if_nts();
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use Swoole\Thread;
use Swoole\Thread\Queue;
use SwooleTest\ThreadManager;

$tm = new ThreadManager();
$tm->initFreePorts(increment: crc32(__FILE__) % 1000);

$tm->parentFunc = function () use ($tm) {
$queue = new Queue();
$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1);
socket_bind($sock, '127.0.0.1', $tm->getFreePort());
$queue->push($sock);
$thread = new Thread(__FILE__, $queue, 0);
var_dump('main thread');
$thread->join();
};

$tm->childFunc = function ($queue, $id) use ($tm) {
if ($id === 0) {
var_dump('child thread 0');
$svr_sock = $queue->pop();
socket_listen($svr_sock, 128);
$thread = new Thread(__FILE__, $queue, 1);
$conn = socket_accept($svr_sock);
socket_write($conn, "Swoole: hello world\n");
socket_close($conn);
socket_close($svr_sock);
$thread->join();
} else {
var_dump('child thread 1');
$sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_connect($sock, '127.0.0.1', $tm->getFreePort());
socket_send($sock, "hello world", 0, 0);
socket_recv($sock, $buf, 1024, 0);
Assert::eq($buf, "Swoole: hello world\n");
socket_close($sock);
}
exit(0);
};

$tm->run();
echo "Done\n";
?>
--EXPECT--
string(11) "main thread"
string(14) "child thread 0"
string(14) "child thread 1"
Done

0 comments on commit 99cb543

Please sign in to comment.