Skip to content

Commit

Permalink
Merge pull request #97 from leroy-merlin-br/fix/abstract-producer-config
Browse files Browse the repository at this point in the history
Fix - Dynamic Config
  • Loading branch information
djonasm authored Oct 14, 2021
2 parents d475141 + ea8c51b commit 6c1484b
Show file tree
Hide file tree
Showing 17 changed files with 380 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/ConfigManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ private function overrideHandlerConfig(AbstractHandler $handler): void
return;
}

$this->replace($overrideConfig);
$this->replace($overrideConfig->toArray());
}
}
4 changes: 2 additions & 2 deletions src/Connectors/AbstractConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

abstract class AbstractConfig
{
protected function getBrokerConfig(string $brokerId): array
protected function getBrokerConfig(string $configName, string $brokerId): array
{
if (!$brokerConfig = config("kafka.brokers.{$brokerId}")) {
if (!$brokerConfig = config($configName.".brokers.{$brokerId}")) {
throw new ConfigurationException("Broker '{$brokerId}' configuration not found");
}

Expand Down
4 changes: 2 additions & 2 deletions src/Connectors/Consumer/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class Config extends AbstractConfig

public function make(array $options, array $arguments): ConfigManager
{
$configName = $arguments['config_name'] ?? 'kafka';
$configName = $options['config_name'] ?? 'kafka';
$topicConfig = $this->getTopicConfig($configName, $arguments['topic']);
$consumerConfig = $this->getConsumerConfig($topicConfig, $arguments['consumer_group']);
$brokerConfig = $this->getBrokerConfig($topicConfig['broker']);
$brokerConfig = $this->getBrokerConfig($configName, $topicConfig['broker']);
$schemaConfig = $this->getSchemaConfig($configName, $arguments['topic']);
$config = array_merge(
$topicConfig,
Expand Down
4 changes: 2 additions & 2 deletions src/Connectors/Producer/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use Metamorphosis\ConfigManager;
use Metamorphosis\Connectors\AbstractConfig;
use Metamorphosis\Exceptions\ConfigurationException;
use Metamorphosis\TopicHandler\Producer\ConfigOptions;
use Metamorphosis\TopicHandler\ConfigOptions;

class Config extends AbstractConfig
{
Expand Down Expand Up @@ -52,7 +52,7 @@ public function makeByTopic(string $topicId): ConfigManager
config('kafka.middlewares.producer', []),
$topicConfig['producer']['middlewares'] ?? []
);
$brokerConfig = $this->getBrokerConfig($topicConfig['broker']);
$brokerConfig = $this->getBrokerConfig('kafka', $topicConfig['broker']);
$schemaConfig = $this->getSchemaConfig('kafka', $topicId);
$config = array_merge($topicConfig, $brokerConfig, $schemaConfig);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?php
namespace Metamorphosis\TopicHandler\Producer;
namespace Metamorphosis\TopicHandler;

class ConfigOptions
{
Expand Down Expand Up @@ -90,7 +90,7 @@ class ConfigOptions
private $timeout;

/**
* @var int|null
* @var int
*/
private $partition;

Expand All @@ -102,7 +102,7 @@ class ConfigOptions
public function __construct(
string $topicId,
array $broker,
?int $partition = null,
int $partition = 0,
array $avroSchema = [],
array $middlewares = [],
int $timeout = 1000,
Expand Down Expand Up @@ -158,7 +158,7 @@ public function getBroker(): array
return $this->broker;
}

public function getPartition(): ?int
public function getPartition(): int
{
return $this->partition;
}
Expand All @@ -173,6 +173,7 @@ public function toArray(): array
'auth' => $broker['auth'] ?? null,
'timeout' => $this->getTimeout(),
'is_async' => $this->isAsync(),
'partition' => $this->getPartition(),
'required_acknowledgment' => $this->isRequiredAcknowledgment(),
'max_poll_records' => $this->getMaxPollRecords(),
'flush_attempts' => $this->getFlushAttempts(),
Expand Down
43 changes: 43 additions & 0 deletions src/TopicHandler/ConfigOptionsFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php
namespace Metamorphosis\TopicHandler;

class ConfigOptionsFactory
{
public function makeByConfigName(string $configName, string $topicName, string $brokerName): ConfigOptions
{
$topic = $this->getTopic($configName, $topicName);
$broker = config($configName.'.brokers.'.$brokerName);

$params = array_merge($topic, compact('broker'));

return $this->makeConfigOptions($params);
}

public function makeByConfigNameWithSchema(
string $configName,
string $topicName,
string $brokerName,
string $schemaName
): ConfigOptions {
$topic = $this->getTopic($configName, $topicName);
$broker = config($configName.'.brokers.'.$brokerName);
$avroSchema = config($configName.'.avro_schemas.'.$schemaName);

$params = array_merge($topic, compact('broker', 'avroSchema'));

return $this->makeConfigOptions($params);
}

private function makeConfigOptions($params): ConfigOptions
{
return app(ConfigOptions::class, $params);
}

private function getTopic(string $configName, string $topicName): array
{
$topic = config($configName.'.topics.'.$topicName);
$topic['topicId'] = $topic['topic_id'];

return $topic;
}
}
7 changes: 4 additions & 3 deletions src/TopicHandler/Consumer/AbstractHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@

use Exception;
use Metamorphosis\Exceptions\ResponseWarningException;
use Metamorphosis\TopicHandler\ConfigOptions;

abstract class AbstractHandler implements Handler
{
/**
* Merge and override config from kafka file.
*
* @var array
* @var ConfigOptions
*/
private $configOptions;

public function __construct(array $configOptions = [])
public function __construct(ConfigOptions $configOptions = null)
{
$this->configOptions = $configOptions;
}
Expand All @@ -30,7 +31,7 @@ public function finished(): void
{
}

public function getConfigOptions(): array
public function getConfigOptions(): ?ConfigOptions
{
return $this->configOptions;
}
Expand Down
5 changes: 3 additions & 2 deletions src/TopicHandler/Producer/AbstractProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use Metamorphosis\Exceptions\JsonException;
use Metamorphosis\Record\ProducerRecord;
use Metamorphosis\TopicHandler\ConfigOptions;

class AbstractProducer implements HandlerInterface
{
Expand Down Expand Up @@ -51,8 +52,8 @@ public function createRecord(): ProducerRecord
$record = $this->encodeRecord($record);
}

$topic = $this->getTopic();
$partition = $this->getPartition();
$topic = $this->getConfigOptions()->getTopicId();
$partition = $this->getConfigOptions()->getPartition();
$key = $this->getKey();

return new ProducerRecord($record, $topic, $partition, $key);
Expand Down
34 changes: 34 additions & 0 deletions tests/Integration/Dummies/MessageProducerWithConfigOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php
namespace Tests\Integration\Dummies;

use Illuminate\Support\Facades\Log;
use Metamorphosis\TopicHandler\Producer\AbstractProducer;
use Metamorphosis\TopicHandler\Producer\HandleableResponseInterface;
use RdKafka\Message;
use RuntimeException;

class MessageProducerWithConfigOptions extends AbstractProducer implements HandleableResponseInterface
{
public function success(Message $message): void
{
Log::info('Record successfully sent to broker.', [
'topic' => $message->topic_name,
'payload' => $message->payload,
'key' => $message->key,
'partition' => $message->partition,
]);
}

public function failed(Message $message): void
{
Log::error('Unable to delivery record to broker.', [
'topic' => $message->topic_name,
'payload' => $message->payload,
'key' => $message->key,
'partition' => $message->partition,
'error' => $message->err,
]);

throw new RuntimeException('error sending message!');
}
}
121 changes: 121 additions & 0 deletions tests/Integration/ProducerWithConfigOptionsTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?php
namespace Tests\Integration;

use Illuminate\Support\Facades\Log;
use Metamorphosis\Facades\Metamorphosis;
use Metamorphosis\TopicHandler\ConfigOptions;
use Tests\Integration\Dummies\MessageConsumer;
use Tests\Integration\Dummies\MessageProducerWithConfigOptions;
use Tests\LaravelTestCase;

class ProducerWithConfigOptionsTest extends LaravelTestCase
{
/**
* @var ConfigOptions
*/
private $configOptions;

public function testShouldRunAProducerMessagesWithConfigOptions(): void
{
// Given That I
$this->haveAHandlerConfigured();

// I Expect That
$this->myMessagesHaveBeenProduced();

// When I
$this->haveSomeRandomMessageProduced();

// I Expect That
$this->myMessagesHaveBeenLogged();

// When I
$this->runTheConsumer();
}

protected function runTheConsumer(): void
{
$dummy = new MessageConsumer($this->configOptions);
$this->instance('\App\Kafka\Consumers\ConsumerOverride', $dummy);
config([
'kafka_new_config' => [
'brokers' => [
'override' => [
'connections' => 'kafka:9092',
],
],
'topics' => [
'default' => [
'broker' => 'override',
'consumer' => [
'consumer_groups' => [
'test-consumer-group' => [
'handler' => '\App\Kafka\Consumers\ConsumerOverride',
'offset_reset' => 'earliest',
],
],
],
],
],
],
]);
$this->artisan(
'kafka:consume',
[
'topic' => 'default',
'consumer_group' => 'test-consumer-group',
'--timeout' => 20000,
'--times' => 2,
'--config_name' => 'kafka_new_config',
]
);
}

protected function haveAHandlerConfigured(): void
{
$this->configOptions = new ConfigOptions(
'sale_order_override',
['connections' => 'kafka:9092'],
0,
[],
[],
20000,
false,
true,
600,
10
);
}

private function haveSomeRandomMessageProduced(): void
{
$saleOrderProducer = app(
MessageProducerWithConfigOptions::class,
[
'record' => ['saleOrderId' => 'SALE_ORDER_ID'],
'configOptions' => $this->configOptions,
'key' => 1,
]
);

$saleOrderDispatcher = Metamorphosis::build($saleOrderProducer);
$saleOrderDispatcher->handle($saleOrderProducer->createRecord());
}

private function myMessagesHaveBeenLogged()
{
Log::shouldReceive('alert')
->with('{"saleOrderId":"SALE_ORDER_ID"}');
}

private function myMessagesHaveBeenProduced()
{
Log::shouldReceive('info')
->with('Record successfully sent to broker.', [
'topic' => 'sale_order_override',
'payload' => '{"saleOrderId":"SALE_ORDER_ID"}',
'key' => '1',
'partition' => 0,
]);
}
}
18 changes: 14 additions & 4 deletions tests/Unit/ConfigManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace Tests\Unit;

use Metamorphosis\ConfigManager;
use Metamorphosis\TopicHandler\ConfigOptions;
use Metamorphosis\TopicHandler\Consumer\AbstractHandler;
use Mockery as m;
use Tests\LaravelTestCase;
Expand All @@ -22,16 +23,25 @@ public function testShouldOverrideConfig(): void
],
'topic_id' => 'kafka-test',
];
$overrideConfig = [
'topic_id' => 'kafka-override',
];
$configOptions = new ConfigOptions(
'kafka-override',
['connections' => 'kafka:9092'],
1,
[],
[],
200,
false,
true,
200,
1
);

$configManager = new ConfigManager();

// Expectations
$handler->expects()
->getConfigOptions()
->andReturn($overrideConfig);
->andReturn($configOptions);

// Actions
$configManager->set($config);
Expand Down
Loading

0 comments on commit 6c1484b

Please sign in to comment.