-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
46 changed files
with
3,583 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# Set the default behavior, in case people don't have core.autocrlf set. | ||
* text=auto | ||
|
||
# Set EOL for PHP project files | ||
*.php text eol=lf | ||
|
||
# Denote all files that are truly binary and should not be modified. | ||
*.eot binary | ||
*.ttf binary | ||
*.woff binary | ||
*.woff2 binary |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
name: Tests | ||
|
||
on: | ||
push: ~ | ||
pull_request: ~ | ||
|
||
jobs: | ||
phpunit: | ||
name: PHPUnit on ${{ matrix.php-versions }} ${{ matrix.composer-flags }} | ||
runs-on: ubuntu-latest | ||
continue-on-error: ${{ !matrix.stable }} | ||
strategy: | ||
matrix: | ||
php-versions: ['8.2', '8.3', '8.4'] | ||
stable: [true] | ||
coverage: [true] | ||
composer-flags: [''] | ||
|
||
steps: | ||
- uses: actions/checkout@v2 | ||
with: | ||
fetch-depth: 0 | ||
|
||
- uses: shivammathur/setup-php@v2 | ||
with: | ||
php-version: ${{ matrix.php-versions }} | ||
extensions: curl, mbstring | ||
coverage: xdebug | ||
tools: composer:v2 | ||
|
||
- run: composer update --no-progress ${{ matrix.composer-flags }} | ||
|
||
- run: vendor/bin/phpunit --no-coverage | ||
if: ${{ !matrix.coverage }} | ||
|
||
- run: vendor/bin/phpunit --coverage-text --coverage-clover=coverage.clover | ||
if: ${{ matrix.coverage }} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
/.phpunit.cache | ||
/vendor/ | ||
composer.lock | ||
|
||
# Dev environment | ||
.idea |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Changelog | ||
|
||
All notable changes to this project will be documented in this file. | ||
|
||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) | ||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). | ||
|
||
## [1.0.0-beta1] - 2024-11-19 | ||
|
||
Initial version |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
# Berlioz Queue Manager | ||
|
||
**Berlioz Queue Manager** is responsible for processing jobs from a queue using a job handler. It supports advanced | ||
features like memory and time limits, signal handling, and customizable worker options. | ||
|
||
For more information, and use of Berlioz Framework, go to website and online documentation : | ||
https://getberlioz.com | ||
|
||
## Installation | ||
|
||
### Composer | ||
|
||
You can install **Berlioz Queue Manager** with [Composer](https://getcomposer.org/), it's the recommended installation. | ||
|
||
```bash | ||
$ composer require berlioz/queue-manager | ||
``` | ||
|
||
### Dependencies | ||
|
||
* **PHP** ^8.2 | ||
* Packages: | ||
* **berlioz/helpers** | ||
* **psr/clock** | ||
* **psr/container** | ||
* **psr/log** | ||
* **psr/event-dispatcher** | ||
|
||
## Usage | ||
|
||
### Jobs | ||
|
||
#### `JobDescriptorInterface` | ||
|
||
- **Definition**: Represents jobs that are ready to be pushed into a queue. | ||
- **Example**: Defining the structure and payload of a task before queueing it. | ||
- **Note**: A generic `JobDescriptor` class is available for creating new messages quickly. However, you can extend or override this class to provide additional control, such as custom payload validation or specific job behaviors. | ||
|
||
#### `JobInterface` | ||
|
||
- **Definition**: Manages jobs that have been consumed from a queue. | ||
- **Example**: Handling retries, deleting jobs after processing, or releasing jobs back into the queue. | ||
|
||
#### `JobForQueue` | ||
|
||
- **Definition**: Ensures specific jobs are routed to designated queues. | ||
- **Example**: Assigning priority tasks to a high-priority queue. | ||
|
||
|
||
### Jobs handlers | ||
|
||
The `JobHandlerManager` is a central component for managing multiple job handlers in the **Berlioz Queue Manager**. It implements the `JobHandlerInterface` and acts as a dispatcher, delegating job processing to the appropriate handler based on the job's name. | ||
|
||
```php | ||
use Berlioz\QueueManager\Job\JobHandlerManager; | ||
|
||
$manager = new JobHandlerManager($container, $defaultHandler); | ||
$manager->addHandler('email', EmailJobHandler::class); | ||
$manager->addHandler('report', ReportJobHandler::class); | ||
|
||
$job = new Job('email'); // Example job with name 'email' | ||
$manager->handle($job); // Delegates to EmailJobHandler | ||
``` | ||
|
||
The `JobHandlerInterface` defines the contract for handling jobs in the **Berlioz Queue Manager**. Implementing this interface allows you to define how specific jobs should be processed. | ||
|
||
|
||
Below is an example implementation of a `JobHandlerInterface` for consuming and processing a job named `"foo"`: | ||
|
||
```php | ||
use Berlioz\QueueManager\Handler\JobHandlerInterface; | ||
use Berlioz\QueueManager\Job\JobInterface; | ||
use Berlioz\QueueManager\Exception\QueueManagerException; | ||
|
||
class FooJobHandler implements JobHandlerInterface | ||
{ | ||
public function handle(JobInterface $job): void | ||
{ | ||
if ($job->getName() !== 'foo') { | ||
throw new QueueManagerException('Invalid job name'); | ||
} | ||
|
||
// Process the job | ||
$payload = $job->getPayload(); | ||
echo "Processing job 'foo' with payload: " . json_encode($payload); | ||
} | ||
} | ||
``` | ||
|
||
### Worker | ||
|
||
The `Worker` class is the main part of the **Berlioz Queue Manager** and is responsible for processing jobs from a queue | ||
using a job handler. | ||
|
||
```php | ||
use Berlioz\QueueManager\Queue\MemoryQueue; | ||
use Berlioz\QueueManager\Worker; | ||
use Berlioz\QueueManager\WorkerOptions; | ||
use Berlioz\QueueManager\Handler\JobHandlerManager; | ||
use Psr\Log\NullLogger; | ||
|
||
// Create a Job Handler Manager | ||
$jobHandler = new JobHandlerManager($container); | ||
|
||
// Initialize the Worker | ||
$worker = new Worker($jobHandler); | ||
|
||
// Optionally, set a logger | ||
$worker->setLogger(new NullLogger()); | ||
|
||
// Configure worker options | ||
$options = new WorkerOptions( | ||
name: 'worker', // Worker name | ||
limit: 10, // Max jobs to execute | ||
memoryLimit: 128, // Memory limit in MB | ||
timeLimit: 60, // Time limit in seconds | ||
killFilePath: 10, // File to kill process | ||
stopNoJob: true, // Stop if no job | ||
sleep: 2 // Sleep time between jobs in seconds | ||
); | ||
|
||
// Create a queue instance | ||
$queue = new MemoryQueue(); | ||
|
||
// Run the worker | ||
$exitCode = $worker->run($queue, $options); | ||
``` | ||
|
||
### Queues | ||
|
||
#### DbQueue | ||
|
||
The `DbQueue` is a durable implementation of a queue that uses a database to store jobs persistently. This ensures that jobs remain available even in the event of application or server restarts. By leveraging a database, the `DbQueue` provides reliability and durability, making it suitable for production environments where job data must not be lost. | ||
|
||
**Key Characteristics**: | ||
|
||
- **Durable Storage**: Jobs are stored in a relational or NoSQL database, ensuring persistence and fault tolerance. | ||
- **Transactional Guarantees**: Can leverage database transactions to ensure that job insertion, processing, and deletion are atomic operations. | ||
- **Scalability**: With proper indexing and optimization, the `DbQueue` can handle large volumes of jobs efficiently. | ||
- **Use Cases**: | ||
- Applications that require guaranteed delivery and processing of jobs. | ||
- Scenarios where jobs must survive server or application crashes. | ||
- Environments where job metadata (e.g., retries, priorities) must be tracked over time. | ||
|
||
While `DbQueue` offers durability and reliability, its performance may be impacted by database latency compared to in-memory queues. It is best suited for scenarios where persistence and fault tolerance are prioritized over low-latency operations. | ||
|
||
```php | ||
use Berlioz\QueueManager\Queue\DbQueue; | ||
use Hector\Connection\Connection; | ||
|
||
$dbConnection = new Connection('mysql://localhost:3306'); | ||
$queue = new DbQueue( | ||
connection: $dbConnection, // Database connection | ||
name: 'default', // Queue name | ||
tableName: 'queue_jobs', // Name of MySQL table | ||
maxAttempts: 5, // Maximum attempts of a job | ||
); | ||
``` | ||
|
||
Example of schema for MySQL: | ||
|
||
```mysql | ||
CREATE TABLE `queue_jobs` ( | ||
`job_id` int unsigned NOT NULL AUTO_INCREMENT, | ||
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
`queue` varchar(128) NOT NULL DEFAULT 'default', | ||
`availability_time` timestamp NOT NULL, | ||
`attempts` int unsigned NOT NULL DEFAULT '0', | ||
`lock_time` timestamp NULL DEFAULT NULL, | ||
`payload` json NOT NULL, | ||
PRIMARY KEY (`job_id`), | ||
KEY `INDEX_job` (`queue`,`availability_time`,`lock_time`,`attempts`) | ||
) ENGINE=InnoDB; | ||
``` | ||
|
||
If you want to keep the jobs treated: | ||
|
||
```mysql | ||
CREATE TABLE `queue_jobs_done` ( | ||
`job_id` int unsigned NOT NULL, | ||
`create_time` timestamp NOT NULL, | ||
`queue` varchar(128) NOT NULL, | ||
`availability_time` timestamp NOT NULL, | ||
`attempts` int unsigned NOT NULL, | ||
`lock_time` timestamp NOT NULL, | ||
`payload` json NOT NULL, | ||
PRIMARY KEY (`job_id`), | ||
KEY `INDEX_job` (`queue`,`lock_time`) | ||
) ENGINE=InnoDB; | ||
|
||
-- Trigger to make automatic insert into `queue_jobs_done` | ||
-- the deleted done jobs into `queue_jobs`. | ||
|
||
DELIMITER $$ | ||
CREATE | ||
TRIGGER `queue_jobs_AFTER_DELETE` AFTER DELETE ON `queue_jobs` | ||
FOR EACH ROW BEGIN | ||
INSERT INTO `queue_jobs_done` (`job_id`, `create_time`, `queue`, `availability_time`, `attempts`, `lock_time`, `payload`) | ||
VALUES (OLD.`job_id`, OLD.`create_time`, OLD.`queue`, OLD.`availability_time`, OLD.`attempts`, OLD.`lock_time`, OLD.`payload`); | ||
END;$$ | ||
DELIMITER ; | ||
``` | ||
|
||
#### Memory queue | ||
|
||
The `MemoryQueue` is a lightweight, ephemeral implementation of a queue that stores jobs in memory for the duration of the script's execution. This queue is particularly useful for testing, development, or scenarios where persistent storage is not required. Since the jobs are stored in memory, they are lost when the script ends, making it unsuitable for production environments where job persistence is critical. | ||
|
||
**Key Characteristics**: | ||
|
||
- **Ephemeral Nature**: Jobs exist only during the script's runtime. | ||
- **Fast and Lightweight**: No external dependencies or storage overhead. | ||
- **Use Cases**: | ||
- Unit testing or local development. | ||
- Short-lived tasks that do not require durability. | ||
- Simulating job execution flows without external systems. | ||
|
||
The `MemoryQueue` provides all the standard operations of a queue, such as pushing jobs, consuming jobs, and checking the size of the queue, while maintaining a simple in-memory data structure to manage these operations. However, since it lacks durability, it should be used with caution and only in scenarios where the transient nature of the data is acceptable. | ||
|
||
```php | ||
use Berlioz\QueueManager\Queue\MemoryQueue; | ||
|
||
$queue = new MemoryQueue( | ||
name: 'default', // Queue name | ||
retryTime: 30, // Time to wait after failed job | ||
); | ||
``` | ||
|
||
#### AwsSqsQueue | ||
|
||
The `AwsSqsQueue` is an implementation of a queue that integrates with Amazon Simple Queue Service (SQS), a fully managed message queuing service provided by AWS. This queue leverages the scalability, durability, and distributed nature of SQS to handle job storage and delivery in a reliable and fault-tolerant manner. | ||
|
||
**Key Characteristics**: | ||
|
||
- **Fully Managed**: Offloads the operational complexity of managing infrastructure, scaling, and maintenance. | ||
- **Highly Durable**: Messages are redundantly stored across multiple data centers, ensuring data durability and availability. | ||
- **Scalable**: Capable of handling an unlimited number of messages and automatically scaling to meet demand. | ||
- **Low Overhead**: Removes the need for a dedicated queue server or database. | ||
- **Use Cases**: | ||
- Distributed systems requiring reliable asynchronous communication. | ||
- Scenarios with high message throughput or unpredictable traffic spikes. | ||
- Cloud-native applications leveraging other AWS services like Lambda or EC2. | ||
|
||
With features like visibility timeouts, message delays, and dead-letter queues, `AwsSqsQueue` provides robust mechanisms for handling complex workflows and ensuring job delivery. However, since it is a cloud-based service, its performance depends on network latency and AWS's regional availability. | ||
|
||
```php | ||
use Aws\Sqs\SqsClient; | ||
use Berlioz\QueueManager\Queue\AwsSqsQueue; | ||
|
||
$queue = new AwsSqsQueue( | ||
sqsClient: new SqsClient(...), // Database connection | ||
name: 'default', // Queue name | ||
queueUrl: '...', // AWS queue URL | ||
); | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
{ | ||
"name": "berlioz/queue-manager", | ||
"description": "Queue manager", | ||
"type": "library", | ||
"minimum-stability": "beta", | ||
"license": "MIT", | ||
"homepage": "https://getberlioz.com", | ||
"authors": [ | ||
{ | ||
"name": "Ronan Giron", | ||
"email": "[email protected]" | ||
} | ||
], | ||
"autoload": { | ||
"psr-4": { | ||
"Berlioz\\QueueManager\\": "src/" | ||
} | ||
}, | ||
"autoload-dev": { | ||
"psr-4": { | ||
"Berlioz\\QueueManager\\Tests\\": "tests/" | ||
} | ||
}, | ||
"require": { | ||
"php": "^8.2", | ||
"berlioz/helpers": "^1.0", | ||
"psr/clock": "^1.0", | ||
"psr/container": "^1.0 || ^2.0", | ||
"psr/log": "^1.0 || ^2.0 || ^3.0" | ||
}, | ||
"require-dev": { | ||
"phpunit/phpunit": "^11.0", | ||
"phpstan/phpstan": "^1.10", | ||
"aws/aws-sdk-php": "^3.316", | ||
"hectororm/query": "^1.0" | ||
}, | ||
"suggest": { | ||
"ext-pcntl": "Tu use signals to exit worker", | ||
"aws/aws-sdk-php": "To use AWS SQS service", | ||
"hectororm/query": "To use database queue" | ||
}, | ||
"scripts": { | ||
"test": "vendor/bin/phpunit --colors=never --stderr", | ||
"analyse": "vendor/bin/phpstan analyse src tests" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" bootstrap="./tests/bootstrap.php" colors="true" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/11.0/phpunit.xsd" cacheDirectory=".phpunit.cache"> | ||
<testsuites> | ||
<testsuite name="Berlioz Event Manager test suite"> | ||
<directory suffix="Test.php">./tests</directory> | ||
</testsuite> | ||
</testsuites> | ||
<source> | ||
<include> | ||
<directory suffix=".php">src</directory> | ||
</include> | ||
</source> | ||
</phpunit> |
Oops, something went wrong.