Skip to content

Commit

Permalink
Merge pull request #22 from tomaj/sqs
Browse files Browse the repository at this point in the history
Sqs driver
  • Loading branch information
tomaj authored Sep 5, 2016
2 parents d327689 + 3510263 commit 8979ed9
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ Updates should follow the [Keep a CHANGELOG](http://keepachangelog.com/) princip

## [Unreleased][unreleased]

## 1.1.0 - 2016-09-05

### Added

* Amazon SQS driver

## 1.0.0 - 2016-09-02

### Added
Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
"jakub-onderka/php-parallel-lint": "^0.9.0",
"ext-redis": "*",
"ext-zmq": "*",
"predis/predis": "*"
"predis/predis": "*",
"aws/aws-sdk-php": "2.*"
},
"suggest": {
"monolog/monolog": "Basic logger implements psr/logger",
"ext-redis": "Allow use for redis driver with native redis php extension",
"ext-zmq": "Allow use for ZeroMQ driver with native zmq php extension",
"npredis/predis": "Allow use for redis driver with php package Predis",
"aws/aws-sdk-php": "Allow use Amazon SQS driver",
"php-amqplib/php-amqplib": "Allow using rabbimq as driver"
},
"autoload": {
Expand Down
25 changes: 25 additions & 0 deletions examples/sqs/emitter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

use Tomaj\Hermes\Driver\AmazonSqsDriver;
use Tomaj\Hermes\Dispatcher;
use Tomaj\Hermes\Message;
use Aws\Sqs\SqsClient;

require_once __DIR__ . '/../../vendor/autoload.php';

$client = SqsClient::factory([
'version' => 'latest',
'region' => 'eu-west-1',
'key' => '*key*',
'secret' => '*secret*',
]);

$driver = new AmazonSqsDriver($client, '*queueName*');
$dispatcher = new Dispatcher($driver);
$counter = 1;
while (true) {
$dispatcher->emit(new Message('type1', ['message' => $counter]));
echo "Emited message $counter\n";
$counter++;
sleep(1);
}
22 changes: 22 additions & 0 deletions examples/sqs/processor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

use Tomaj\Hermes\Driver\AmazonSqsDriver;
use Tomaj\Hermes\Dispatcher;
use Tomaj\Hermes\Handler\EchoHandler;
use Aws\Sqs\SqsClient;

require_once __DIR__ . '/../../vendor/autoload.php';

$client = SqsClient::factory([
'version' => 'latest',
'region' => 'eu-west-1',
'key' => '*key*',
'secret' => '*secret*',
]);

$driver = new AmazonSqsDriver($client, '*queueName*');
$dispatcher = new Dispatcher($driver);

$dispatcher->registerHandler('type1', new EchoHandler());

$dispatcher->handle();
124 changes: 124 additions & 0 deletions src/Driver/AmazonSqsDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

namespace Tomaj\Hermes\Driver;

use Closure;
use Exception;
use Tomaj\Hermes\MessageInterface;
use Tomaj\Hermes\MessageSerializer;
use Aws\Sqs\SqsClient;

class AmazonSqsDriver implements DriverInterface
{
use SerializerAwareTrait;

/**
* @var SqsClient
*/
private $client;

/**
* @var string
*/
private $queueName;

/**
* string
*/
private $queueUrl;

/**
* integer
*/
private $sleepInterval = 0;

/**
* Create new Amazon SQS driver.
*
* You have to create aws client instnace and provide it to this driver.
* You can use service builder or factory method.
*
* <code>
* use Aws\Sqs\SqsClient;
*
* $client = SqsClient::factory(array(
* 'profile' => '<profile in your aws credentials file>',
* 'region' => '<region name>'
* ));
* </code>
*
* or
*
* <code>
* use Aws\Common\Aws;
*
* // Create a service builder using a configuration file
* $aws = Aws::factory('/path/to/my_config.json');
*
* // Get the client from the builder by namespace
* $client = $aws->get('Sqs');
* </code>
*
* More examples see: https://docs.aws.amazon.com/aws-sdk-php/v2/guide/service-sqs.html
*
*
* @see examples/sqs folder
*
* @param AMQPChannel $client
* @param string $queueName
* @param array $queueAttributes
*/
public function __construct(SqsClient $client, $queueName, $queueAttributes = [])
{
$this->client = $client;
$this->queueName = $queueName;
$this->serializer = new MessageSerializer();

$result = $client->createQueue([
'QueueName' => $queueName,
'Attributes' => $queueAttributes,
]);
$this->queueUrl = $result->get('QueueUrl');
}

/**
* {@inheritdoc}
*/
public function send(MessageInterface $message)
{
$this->client->sendMessage([
'QueueUrl' => $this->queueUrl,
'MessageBody' => $this->serializer->serialize($message),
]);
}

/**
* {@inheritdoc}
*/
public function wait(Closure $callback)
{
while (true) {
$result = $this->client->receiveMessage(array(
'QueueUrl' => $this->queueUrl,
'WaitTimeSeconds' => 20,
));

$messages = $result['Messages'];

if ($messages) {
foreach ($messages as $message) {
$hermesMessage = $this->serializer->unserialize($message['Body']);
$callback($hermesMessage);
$result = $this->client->deleteMessage(array(
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $message['ReceiptHandle'],
));
}
}

if ($this->sleepInterval) {
sleep($this->sleepInterval);
}
}
}
}

0 comments on commit 8979ed9

Please sign in to comment.