Skip to content

Commit

Permalink
Scrapped msgpack task message serialization option.
Browse files Browse the repository at this point in the history
Because for both msgpack PHP extension or `rybakit/msgpack` Composer
package there seem to be unsolvable problems with differentiating of
how to serialize empty lists vs empty dicts (PHP only has arrays
for both use cases).
  • Loading branch information
smuuf committed Sep 16, 2023
1 parent 76f854d commit 115332d
Show file tree
Hide file tree
Showing 20 changed files with 87 additions and 135 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@
tests/**/output/**
!tests/**/output/.gitkeep

# Tests - Python compiled bytecode cache.
*.pyc

# Installed dependencies.
vendor
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
A modern PHP client library for [Celery - Distributed Task Queue](https://docs.celeryq.dev).

## Requirements
- PHP 8.0+
- PHP 8.1+

## Example

Expand Down Expand Up @@ -36,7 +36,9 @@ $task = new TaskSignature(
// Send the task into Celery.
$asyncResult = $celery->sendTask($task);

// Wait for the result and retrieve it.
// Wait for the result (up to 10 seconds by default) and return it.
// Alternatively a \Smuuf\CeleryForPhp\Exc\CeleryTimeoutException exception will
// be thrown if the task won't finish in time.
$result = $asyncResult->get();
// $result === 9
```
14 changes: 12 additions & 2 deletions bin/tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ set -e

cd $(dirname $0)/..

TEST_PATH="${1:-./tests/suite}"

# Test configuration
export CELERYFORPHP_TASK_SERIALIZER='json'
export CELERYFORPHP_TASK_MESSAGE_PROTOCOL_VERSION=2

function _info {
echo -e ""
echo -e "$@"
echo -e ""
}


function _compose {
docker compose -f ./tests/infra/docker/test-services.yml $@
}
Expand All @@ -21,12 +26,17 @@ _compose up \
--quiet-pull \
--build

_info "Tests will use this configuration:"
echo CELERYFORPHP_TASK_SERIALIZER=$CELERYFORPHP_TASK_SERIALIZER
echo CELERYFORPHP_TASK_MESSAGE_PROTOCOL_VERSION=$CELERYFORPHP_TASK_MESSAGE_PROTOCOL_VERSION

_info "Running tests"
php ./vendor/nette/tester/src/tester \
-C `# Use system-wide php-ini` \
--coverage ./tests/output/coverage.html \
--coverage-src ./src \
-p phpdbg \
./tests/suite;
$TEST_PATH;

_info "Stopping test infrastructure services"
_compose down --timeout 2
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@
"predis/predis": "^2"
},
"suggest": {
"predis/predis": "Adds support for the predis, PHP client library for Redis, backend"
"predis/predis": "Adds support for Predis - a PHP library providing a Redis result backend"
}
}
4 changes: 2 additions & 2 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions src/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ public static function createDefault(): self {
* Name of the Celery control exchange (used for sending control commands,
* such as 'revoke', to Celery workers).
* Default value is 'celery' but can be specified to some other name.
* @param int $messageProtocolVersion
* Celery message protocol version to use.
* @param int $taskMessageProtocolVersion
* Celery task message protocol version to use.
* See https://docs.celeryq.dev/en/stable/internals/protocol.html
* @param ISerializer $taskSerializer
* Serializer to use when serializing task message body. If not
* specified, `JsonSerializer` is used by default.
*/
public function __construct(
private int $messageProtocolVersion = MessageBuilder::MESSAGE_PROTOCOL_V2,
private int $taskMessageProtocolVersion = MessageBuilder::MESSAGE_PROTOCOL_V2,
private ?ISerializer $taskSerializer = null,
private ?ITaskIdFactory $taskIdFactory = null,
private string $controlExchangeName = 'celery',
Expand All @@ -46,8 +46,8 @@ public function __construct(
throw new InvalidArgumentException('Control exchange name must be a non-empty string');
}

if (!in_array($this->messageProtocolVersion, MessageBuilder::VALID_MESSAGE_PROTOCOL_VERSIONS, true)) {
throw new InvalidArgumentException("Invalid message protocol version '{$this->messageProtocolVersion}'");
if (!in_array($this->taskMessageProtocolVersion, MessageBuilder::VALID_MESSAGE_PROTOCOL_VERSIONS, true)) {
throw new InvalidArgumentException("Invalid message protocol version '{$this->taskMessageProtocolVersion}'");
}

}
Expand All @@ -61,7 +61,7 @@ public function getControlExchangeName(): string {
}

public function getTaskProtocolVersion(): int {
return $this->messageProtocolVersion;
return $this->taskMessageProtocolVersion;
}

public function getTaskSerializer(): ISerializer {
Expand Down
49 changes: 0 additions & 49 deletions src/Serializers/MsgpackSerializer.php

This file was deleted.

48 changes: 40 additions & 8 deletions tests/bootstrap.php
Original file line number Diff line number Diff line change
@@ -1,21 +1,53 @@
<?php

use Predis\Client as PredisClient;

use Smuuf\CeleryForPhp\Celery;
use Smuuf\CeleryForPhp\Config;
use Smuuf\CeleryForPhp\Brokers\RedisBroker;
use Smuuf\CeleryForPhp\Drivers\PredisDriver;
use Smuuf\CeleryForPhp\Backends\RedisBackend;
use Smuuf\CeleryForPhp\Serializers\JsonSerializer;

require __DIR__ . '/../vendor/autoload.php';

\Tester\Environment::setup();

class TestEnv {
class CeleryFactory {

public static function getCelery(): Celery {

$envConfig = self::readEnv();

$config = new Config(
taskMessageProtocolVersion: $envConfig['task_message_protocol_version'],
taskSerializer: match ($envConfig['serializer']) {
'json' => new JsonSerializer(),
},
);

private static string $redisUri;
$predis = new PredisClient(['host' => '127.0.0.1']);
$redisDriver = new PredisDriver($predis);
$broker = new RedisBroker($redisDriver);
$backend = new RedisBackend($redisDriver);

return new Celery($broker, $backend, $config);

public static function init(): void {
self::$redisUri = '127.0.0.1';
}

public static function getRedisUri(): string {
return self::$redisUri;
/**
* @return array{
* serializer: string,
* message_protocol: int,
* }
*/
private static function readEnv(): array {

return [
'serializer' => getenv('CELERYFORPHP_TASK_SERIALIZER') ?: '',
'task_message_protocol_version' => (int) getenv('CELERYFORPHP_TASK_MESSAGE_PROTOCOL_VERSION'),
];

}

}

TestEnv::init();
8 changes: 4 additions & 4 deletions tests/infra/celery-app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ FROM python:3.11-alpine
RUN adduser my_user --disabled-password
USER my_user

RUN pip install --user celery "celery[redis,msgpack]"
RUN pip install --user celery "celery[redis]"

RUN mkdir ~/celery-app
COPY main.py start.sh ~/celery-app/
RUN mkdir /home/my_user/celery-app
COPY main.py start.sh /home/my_user/celery-app/

WORKDIR ~/celery-app
WORKDIR /home/my_user/celery-app
ENTRYPOINT ["sh", "./start.sh"]
Binary file removed tests/infra/celery-app/__pycache__/main.pypy39.pyc
Binary file not shown.
2 changes: 0 additions & 2 deletions tests/infra/celery-app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@

CONNECTION = 'redis://127.0.0.1'
BROKER = CONNECTION
#BROKER = 'amqp://myuser:mypassword@localhost:5672/myvhost'


class Config:
task_serializer = 'json'
#task_serializer = 'msgpack'


number = Union[int, float]
Expand Down
2 changes: 1 addition & 1 deletion tests/suite/Backends.RedisBackend.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use Smuuf\CeleryForPhp\Drivers\PredisDriver;

require __DIR__ . '/../bootstrap.php';

$predis = new PredisClient(['host' => TestEnv::getRedisUri()]);
$predis = new PredisClient(['host' => '127.0.0.1']);
$redisDriver = new PredisDriver($predis);

$x = serialize($predis);
Expand Down
5 changes: 2 additions & 3 deletions tests/suite/Brokers.RedisBroker.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use Tester\Assert;

use Predis\Client as PredisClient;

use Smuuf\CeleryForPhp\TaskSignature;
use Smuuf\CeleryForPhp\Brokers\RedisBroker;
use Smuuf\CeleryForPhp\DeliveryInfo;
use Smuuf\CeleryForPhp\Brokers\RedisBroker;
use Smuuf\CeleryForPhp\Drivers\PredisDriver;
use Smuuf\CeleryForPhp\Messaging\CeleryMessage;
use Smuuf\CeleryForPhp\Serializers\JsonSerializer;
Expand All @@ -24,7 +23,7 @@ function _prepare_random_queue(): string {

}

$predis = new PredisClient(['host' => TestEnv::getRedisUri()]);
$predis = new PredisClient(['host' => '127.0.0.1']);
$redisDriver = new PredisDriver($predis);
$broker = new RedisBroker($redisDriver);

Expand Down
13 changes: 1 addition & 12 deletions tests/suite/Integration/RealTask.countdown.phpt
Original file line number Diff line number Diff line change
@@ -1,26 +1,15 @@
<?php

use Tester\Assert;
use Predis\Client as PredisClient;

use Smuuf\CeleryForPhp\Celery;
use Smuuf\CeleryForPhp\AsyncResult;
use Smuuf\CeleryForPhp\TaskSignature;
use Smuuf\CeleryForPhp\Backends\RedisBackend;
use Smuuf\CeleryForPhp\Brokers\RedisBroker;
use Smuuf\CeleryForPhp\Drivers\PredisDriver;
use Smuuf\CeleryForPhp\Exc\InvalidArgumentException;
use Smuuf\CeleryForPhp\State;

require __DIR__ . '/../../bootstrap.php';

$predis = new PredisClient(['host' => TestEnv::getRedisUri()]);
$redisDriver = new PredisDriver($predis);

$c = new Celery(
new RedisBroker($redisDriver),
new RedisBackend($redisDriver),
);
$c = CeleryFactory::getCelery();

// Call real-life Python Celery's task.
$ts = new TaskSignature('main.add');
Expand Down
8 changes: 4 additions & 4 deletions tests/suite/Integration/RealTask.eta.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use Smuuf\CeleryForPhp\State;

require __DIR__ . '/../../bootstrap.php';

$predis = new PredisClient(['host' => TestEnv::getRedisUri()]);
$predis = new PredisClient(['host' => '127.0.0.1']);
$redisDriver = new PredisDriver($predis);

$c = new Celery(
Expand Down Expand Up @@ -49,13 +49,13 @@ function test_task_with_eta($eta, int $wait): void {

}

test_task_with_eta('now + 3 seconds', 6);
test_task_with_eta('now + 3 seconds', 9);
test_task_with_eta(new \DateTime('now + 2 seconds'), 6);

Assert::exception(function() {
test_task_with_eta('just some garbage', 1);
test_task_with_eta('just some garbage', 3);
}, InvalidArgumentException::class, '#cannot convert#i');

Assert::exception(function() {
test_task_with_eta(['wtf lol'], 1);
test_task_with_eta(['wtf lol'], 3);
}, \TypeError::class);
15 changes: 2 additions & 13 deletions tests/suite/Integration/RealTask.task.justWait.phpt
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
<?php

use Tester\Assert;
use Predis\Client as PredisClient;

use Smuuf\CeleryForPhp\Celery;
use Smuuf\CeleryForPhp\State;
use Smuuf\CeleryForPhp\TaskSignature;
use Smuuf\CeleryForPhp\Exc\CeleryTimeoutException;
use Smuuf\CeleryForPhp\Backends\RedisBackend;
use Smuuf\CeleryForPhp\Brokers\RedisBroker;
use Smuuf\CeleryForPhp\Drivers\PredisDriver;
use Smuuf\CeleryForPhp\State;

require __DIR__ . '/../../bootstrap.php';

$predis = new PredisClient(['host' => TestEnv::getRedisUri()]);
$redisDriver = new PredisDriver($predis);

$c = new Celery(
new RedisBroker($redisDriver),
new RedisBackend($redisDriver),
);
$c = CeleryFactory::getCelery();

// Call real-life Python Celery's task.
$ts = new TaskSignature('main.just_wait');
Expand Down
2 changes: 1 addition & 1 deletion tests/suite/Integration/RealTask.task.justWait.revoke.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use Smuuf\CeleryForPhp\Helpers\Signals;

require __DIR__ . '/../../bootstrap.php';

$predis = new PredisClient(['host' => TestEnv::getRedisUri()]);
$predis = new PredisClient(['host' => '127.0.0.1']);
$redisDriver = new PredisDriver($predis);

$c = new Celery(
Expand Down
Loading

0 comments on commit 115332d

Please sign in to comment.