Skip to content

Commit

Permalink
Merge pull request #106 from leroy-merlin-br/allow-consume-single-mes…
Browse files Browse the repository at this point in the history
…sage

Allow consume message without using command
  • Loading branch information
djonasm authored Nov 19, 2021
2 parents 36e70c8 + c4e66bb commit 0ed85a5
Show file tree
Hide file tree
Showing 53 changed files with 1,178 additions and 498 deletions.
2 changes: 1 addition & 1 deletion src/Connectors/Consumer/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected static function requiresPartition(AbstractConfigManager $configManager
return !is_null($partition) && $partition >= 0;
}

private static function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface
public static function getConsumer(bool $autoCommit, AbstractConfigManager $configManager): ConsumerInterface
{
if (self::requiresPartition($configManager)) {
return app(LowLevel::class)->getConsumer($autoCommit, $configManager);
Expand Down
7 changes: 6 additions & 1 deletion src/Connectors/Consumer/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ public function getConsumer(): ConsumerInterface
return $this->consumer;
}

public function consume(): ?Message
{
return $this->getConsumer()->consume();
}

public function handleMessage(): void
{
try {
if ($response = $this->consumer->consume()) {
if ($response = $this->consume()) {
$record = app(ConsumerRecord::class, compact('response'));
$this->dispatcher->handle($record);
$this->commit();
Expand Down
8 changes: 3 additions & 5 deletions src/Connectors/Producer/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Metamorphosis\Connectors\AbstractConfig;
use Metamorphosis\Exceptions\ConfigurationException;
use Metamorphosis\ProducerConfigManager;
use Metamorphosis\TopicHandler\ConfigOptions;
use Metamorphosis\TopicHandler\ConfigOptions\Producer as ProducerConfigOptions;

class Config extends AbstractConfig
{
Expand Down Expand Up @@ -38,12 +38,10 @@ class Config extends AbstractConfig
'ssl_verify' => false,
];

public function make(ConfigOptions $configOptions): AbstractConfigManager
public function make(ProducerConfigOptions $configOptions): AbstractConfigManager
{
$configManager = app(ProducerConfigManager::class);
$data = $configOptions->toArray();
unset($data['handler']);
$configManager->set($data);
$configManager->set($configOptions->toArray());

return $configManager;
}
Expand Down
41 changes: 41 additions & 0 deletions src/Consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php
namespace Metamorphosis;

use Metamorphosis\Connectors\Consumer\Factory;
use Metamorphosis\Consumers\ConsumerInterface;
use Metamorphosis\Middlewares\Handler\Dispatcher;
use Metamorphosis\Record\ConsumerRecord;
use Metamorphosis\Record\RecordInterface;
use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions;

class Consumer
{
/**
* @var ConsumerInterface
*/
private $consumer;

/**
* @var Dispatcher
*/
private $dispatcher;

public function __construct(ConsumerConfigManager $configManager, ConsumerConfigOptions $configOptions)
{
$configManager->set($configOptions->toArray());

$this->consumer = Factory::getConsumer(true, $configManager);
$this->dispatcher = new Dispatcher($configManager->middlewares());
}

public function consume(): ?RecordInterface
{
if ($response = $this->consumer->consume()) {
$record = app(ConsumerRecord::class, compact('response'));

return $this->dispatcher->handle($record);
}

return null;
}
}
6 changes: 3 additions & 3 deletions src/Middlewares/AvroSchemaDecoder.php
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<?php
namespace Metamorphosis\Middlewares;

use Closure;
use Metamorphosis\AbstractConfigManager;
use Metamorphosis\Avro\ClientFactory;
use Metamorphosis\Avro\Serializer\Decoders\DecoderInterface;
use Metamorphosis\Avro\Serializer\MessageDecoder;
use Metamorphosis\Exceptions\ConfigurationException;
use Metamorphosis\Middlewares\Handler\MiddlewareHandlerInterface;
use Metamorphosis\Record\RecordInterface;

class AvroSchemaDecoder implements MiddlewareInterface
Expand All @@ -31,10 +31,10 @@ public function __construct(AbstractConfigManager $configManager, ClientFactory
$this->decoder = new MessageDecoder($factory->make($configManager));
}

public function process(RecordInterface $record, MiddlewareHandlerInterface $handler): void
public function process(RecordInterface $record, Closure $next)
{
$record->setPayload($this->decoder->decodeMessage($record->getPayload()));

$handler->handle($record);
return $next($record);
}
}
7 changes: 4 additions & 3 deletions src/Middlewares/AvroSchemaMixedEncoder.php
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<?php
namespace Metamorphosis\Middlewares;

use Closure;
use Metamorphosis\AbstractConfigManager;
use Metamorphosis\Avro\CachedSchemaRegistryClient;
use Metamorphosis\Avro\ClientFactory;
use Metamorphosis\Avro\Serializer\Encoders\SchemaId;
use Metamorphosis\Exceptions\ConfigurationException;
use Metamorphosis\Middlewares\Handler\MiddlewareHandlerInterface;
use Metamorphosis\Record\RecordInterface;

/**
Expand Down Expand Up @@ -43,7 +43,7 @@ public function __construct(SchemaId $schemaIdEncoder, ClientFactory $factory, A
$this->configManager = $configManager;
}

public function process(RecordInterface $record, MiddlewareHandlerInterface $handler): void
public function process(RecordInterface $record, Closure $next)
{
$topic = $this->configManager->get('topic_id');
$schema = $this->schemaRegistry->getBySubjectAndVersion("{$topic}-value", 'latest');
Expand All @@ -52,6 +52,7 @@ public function process(RecordInterface $record, MiddlewareHandlerInterface $han
$encodedPayload = $this->schemaIdEncoder->encode($schema, $arrayPayload);

$record->setPayload($encodedPayload);
$handler->handle($record);

return $next($record);
}
}
2 changes: 1 addition & 1 deletion src/Middlewares/Handler/AbstractMiddlewareHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ public function __construct(iterable $queue)
/**
* Handles the current entry in the middleware queue and advances.
*/
abstract public function handle(RecordInterface $record): void;
abstract public function handle(RecordInterface $record);
}
3 changes: 2 additions & 1 deletion src/Middlewares/Handler/Consumer.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Metamorphosis\Middlewares\Handler;

use Closure;
use Metamorphosis\Middlewares\MiddlewareInterface;
use Metamorphosis\Record\RecordInterface;
use Metamorphosis\TopicHandler\Consumer\Handler as ConsumerTopicHandler;
Expand All @@ -17,7 +18,7 @@ public function __construct(ConsumerTopicHandler $consumerTopicHandler)
$this->consumerTopicHandler = $consumerTopicHandler;
}

public function process(RecordInterface $record, MiddlewareHandlerInterface $handler): void
public function process(RecordInterface $record, Closure $next)
{
$this->consumerTopicHandler->handle($record);
}
Expand Down
5 changes: 3 additions & 2 deletions src/Middlewares/Handler/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

class Dispatcher extends AbstractMiddlewareHandler
{
public function handle(RecordInterface $record): void
public function handle(RecordInterface $record)
{
reset($this->queue);
$iterator = new Iterator($this->queue);
$iterator->handle($record);

return $iterator->handle($record);
}
}
8 changes: 6 additions & 2 deletions src/Middlewares/Handler/Iterator.php
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
<?php
namespace Metamorphosis\Middlewares\Handler;

use Closure;
use Metamorphosis\Middlewares\MiddlewareInterface;
use Metamorphosis\Record\RecordInterface;

class Iterator extends AbstractMiddlewareHandler
{
public function handle(RecordInterface $record): void
public function handle(RecordInterface $record)
{
$closure = Closure::fromCallable([$this, 'handle']);
$entry = current($this->queue);
$middleware = $entry;
next($this->queue);

if ($middleware instanceof MiddlewareInterface) {
$middleware->process($record, $this);
return $middleware->process($record, $closure);
}

return $record;
}
}
2 changes: 1 addition & 1 deletion src/Middlewares/Handler/MiddlewareHandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

interface MiddlewareHandlerInterface
{
public function handle(RecordInterface $record): void;
public function handle(RecordInterface $record);
}
3 changes: 2 additions & 1 deletion src/Middlewares/Handler/Producer.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Metamorphosis\Middlewares\Handler;

use Closure;
use Metamorphosis\Middlewares\MiddlewareInterface;
use Metamorphosis\Producer\Poll;
use Metamorphosis\Record\RecordInterface;
Expand Down Expand Up @@ -30,7 +31,7 @@ public function __construct(ProducerTopic $topic, Poll $poll, int $partition)
$this->partition = $partition;
}

public function process(RecordInterface $record, MiddlewareHandlerInterface $handler): void
public function process(RecordInterface $record, Closure $next): void
{
$this->topic->produce($this->getPartition($record), 0, $record->getPayload(), $record->getKey());

Expand Down
6 changes: 3 additions & 3 deletions src/Middlewares/JsonDecode.php
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
<?php
namespace Metamorphosis\Middlewares;

use Closure;
use Exception;
use Metamorphosis\Middlewares\Handler\MiddlewareHandlerInterface;
use Metamorphosis\Record\RecordInterface;

class JsonDecode implements MiddlewareInterface
{
public function process(RecordInterface $record, MiddlewareHandlerInterface $handler): void
public function process(RecordInterface $record, Closure $next)
{
$payload = json_decode($record->getPayload(), true);

Expand All @@ -19,6 +19,6 @@ public function process(RecordInterface $record, MiddlewareHandlerInterface $han

$record->setPayload($payload);

$handler->handle($record);
return $next($record);
}
}
6 changes: 3 additions & 3 deletions src/Middlewares/Log.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php
namespace Metamorphosis\Middlewares;

use Metamorphosis\Middlewares\Handler\MiddlewareHandlerInterface;
use Closure;
use Metamorphosis\Record\RecordInterface;
use Psr\Log\LoggerInterface;

Expand All @@ -17,12 +17,12 @@ public function __construct(LoggerInterface $log)
$this->log = $log;
}

public function process(RecordInterface $record, MiddlewareHandlerInterface $handler): void
public function process(RecordInterface $record, Closure $next)
{
$this->log->info('Processing kafka record: '.$record->getPayload(), [
'original' => (array) $record->getOriginal(),
]);

$handler->handle($record);
return $next($record);
}
}
7 changes: 5 additions & 2 deletions src/Middlewares/MiddlewareInterface.php
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
<?php
namespace Metamorphosis\Middlewares;

use Metamorphosis\Middlewares\Handler\MiddlewareHandlerInterface;
use Closure;
use Metamorphosis\Record\RecordInterface;

interface MiddlewareInterface
{
public function process(RecordInterface $record, MiddlewareHandlerInterface $handler): void;
/**
* @return mixed
*/
public function process(RecordInterface $record, Closure $next);
}
9 changes: 9 additions & 0 deletions src/TopicHandler/ConfigOptions/Auth/AuthInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php
namespace Metamorphosis\TopicHandler\ConfigOptions\Auth;

interface AuthInterface
{
public function toArray(): array;

public function getType(): string;
}
11 changes: 11 additions & 0 deletions src/TopicHandler/ConfigOptions/Auth/EnumType.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php
namespace Metamorphosis\TopicHandler\ConfigOptions\Auth;

class EnumType
{
public const NONE_TYPE = 'none';

public const SSL_TYPE = 'ssl';

public const SASL_SSL_TYPE = 'sasl_ssl';
}
15 changes: 15 additions & 0 deletions src/TopicHandler/ConfigOptions/Auth/None.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php
namespace Metamorphosis\TopicHandler\ConfigOptions\Auth;

class None implements AuthInterface
{
public function toArray(): array
{
return [];
}

public function getType(): string
{
return EnumType::NONE_TYPE;
}
}
57 changes: 57 additions & 0 deletions src/TopicHandler/ConfigOptions/Auth/SaslSsl.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php
namespace Metamorphosis\TopicHandler\ConfigOptions\Auth;

class SaslSsl implements AuthInterface
{
/**
* @var string
*/
private $mechanisms;

/**
* @var string
*/
private $username;

/**
* @var string
*/
private $password;

public function __construct(string $mechanisms, string $username, string $password)
{
$this->mechanisms = $mechanisms;
$this->username = $username;
$this->password = $password;
}

public function getPassword(): string
{
return $this->password;
}

public function getUsername(): string
{
return $this->username;
}

public function getMechanisms(): string
{
return $this->mechanisms;
}

public function toArray(): array
{
return [
'type' => $this->getType(),
'mechanisms' => $this->getMechanisms(),
'username' => $this->getUsername(),
'password' => $this->getPassword(),
];
}

public function getType(): string
{
return EnumType::SASL_SSL_TYPE;
}
}
Loading

0 comments on commit 0ed85a5

Please sign in to comment.