diff --git a/src/ConfigManager.php b/src/ConfigManager.php index e6e5b733..f87ce35e 100644 --- a/src/ConfigManager.php +++ b/src/ConfigManager.php @@ -72,6 +72,6 @@ private function overrideHandlerConfig(AbstractHandler $handler): void return; } - $this->replace($overrideConfig); + $this->replace($overrideConfig->toArray()); } } diff --git a/src/Connectors/AbstractConfig.php b/src/Connectors/AbstractConfig.php index 706cf81f..ad35c13f 100644 --- a/src/Connectors/AbstractConfig.php +++ b/src/Connectors/AbstractConfig.php @@ -6,9 +6,9 @@ abstract class AbstractConfig { - protected function getBrokerConfig(string $brokerId): array + protected function getBrokerConfig(string $configName, string $brokerId): array { - if (!$brokerConfig = config("kafka.brokers.{$brokerId}")) { + if (!$brokerConfig = config($configName.".brokers.{$brokerId}")) { throw new ConfigurationException("Broker '{$brokerId}' configuration not found"); } diff --git a/src/Connectors/Consumer/Config.php b/src/Connectors/Consumer/Config.php index 901f2f59..3e153216 100644 --- a/src/Connectors/Consumer/Config.php +++ b/src/Connectors/Consumer/Config.php @@ -39,10 +39,10 @@ class Config extends AbstractConfig public function make(array $options, array $arguments): ConfigManager { - $configName = $arguments['config_name'] ?? 'kafka'; + $configName = $options['config_name'] ?? 'kafka'; $topicConfig = $this->getTopicConfig($configName, $arguments['topic']); $consumerConfig = $this->getConsumerConfig($topicConfig, $arguments['consumer_group']); - $brokerConfig = $this->getBrokerConfig($topicConfig['broker']); + $brokerConfig = $this->getBrokerConfig($configName, $topicConfig['broker']); $schemaConfig = $this->getSchemaConfig($configName, $arguments['topic']); $config = array_merge( $topicConfig, diff --git a/src/Connectors/Producer/Config.php b/src/Connectors/Producer/Config.php index dd4afb70..34a4d843 100644 --- a/src/Connectors/Producer/Config.php +++ b/src/Connectors/Producer/Config.php @@ -4,7 +4,7 @@ use Metamorphosis\ConfigManager; use Metamorphosis\Connectors\AbstractConfig; use Metamorphosis\Exceptions\ConfigurationException; -use Metamorphosis\TopicHandler\Producer\ConfigOptions; +use Metamorphosis\TopicHandler\ConfigOptions; class Config extends AbstractConfig { @@ -52,7 +52,7 @@ public function makeByTopic(string $topicId): ConfigManager config('kafka.middlewares.producer', []), $topicConfig['producer']['middlewares'] ?? [] ); - $brokerConfig = $this->getBrokerConfig($topicConfig['broker']); + $brokerConfig = $this->getBrokerConfig('kafka', $topicConfig['broker']); $schemaConfig = $this->getSchemaConfig('kafka', $topicId); $config = array_merge($topicConfig, $brokerConfig, $schemaConfig); diff --git a/src/TopicHandler/Producer/ConfigOptions.php b/src/TopicHandler/ConfigOptions.php similarity index 96% rename from src/TopicHandler/Producer/ConfigOptions.php rename to src/TopicHandler/ConfigOptions.php index 7197cbf1..c4a8fe1d 100644 --- a/src/TopicHandler/Producer/ConfigOptions.php +++ b/src/TopicHandler/ConfigOptions.php @@ -1,5 +1,5 @@ broker; } - public function getPartition(): ?int + public function getPartition(): int { return $this->partition; } @@ -173,6 +173,7 @@ public function toArray(): array 'auth' => $broker['auth'] ?? null, 'timeout' => $this->getTimeout(), 'is_async' => $this->isAsync(), + 'partition' => $this->getPartition(), 'required_acknowledgment' => $this->isRequiredAcknowledgment(), 'max_poll_records' => $this->getMaxPollRecords(), 'flush_attempts' => $this->getFlushAttempts(), diff --git a/src/TopicHandler/ConfigOptionsFactory.php b/src/TopicHandler/ConfigOptionsFactory.php new file mode 100644 index 00000000..aa3aa838 --- /dev/null +++ b/src/TopicHandler/ConfigOptionsFactory.php @@ -0,0 +1,43 @@ +getTopic($configName, $topicName); + $broker = config($configName.'.brokers.'.$brokerName); + + $params = array_merge($topic, compact('broker')); + + return $this->makeConfigOptions($params); + } + + public function makeByConfigNameWithSchema( + string $configName, + string $topicName, + string $brokerName, + string $schemaName + ): ConfigOptions { + $topic = $this->getTopic($configName, $topicName); + $broker = config($configName.'.brokers.'.$brokerName); + $avroSchema = config($configName.'.avro_schemas.'.$schemaName); + + $params = array_merge($topic, compact('broker', 'avroSchema')); + + return $this->makeConfigOptions($params); + } + + private function makeConfigOptions($params): ConfigOptions + { + return app(ConfigOptions::class, $params); + } + + private function getTopic(string $configName, string $topicName): array + { + $topic = config($configName.'.topics.'.$topicName); + $topic['topicId'] = $topic['topic_id']; + + return $topic; + } +} diff --git a/src/TopicHandler/Consumer/AbstractHandler.php b/src/TopicHandler/Consumer/AbstractHandler.php index f56a5415..ee17ee17 100644 --- a/src/TopicHandler/Consumer/AbstractHandler.php +++ b/src/TopicHandler/Consumer/AbstractHandler.php @@ -3,17 +3,18 @@ use Exception; use Metamorphosis\Exceptions\ResponseWarningException; +use Metamorphosis\TopicHandler\ConfigOptions; abstract class AbstractHandler implements Handler { /** * Merge and override config from kafka file. * - * @var array + * @var ConfigOptions */ private $configOptions; - public function __construct(array $configOptions = []) + public function __construct(ConfigOptions $configOptions = null) { $this->configOptions = $configOptions; } @@ -30,7 +31,7 @@ public function finished(): void { } - public function getConfigOptions(): array + public function getConfigOptions(): ?ConfigOptions { return $this->configOptions; } diff --git a/src/TopicHandler/Producer/AbstractProducer.php b/src/TopicHandler/Producer/AbstractProducer.php index 7f401bc6..ea1c8223 100644 --- a/src/TopicHandler/Producer/AbstractProducer.php +++ b/src/TopicHandler/Producer/AbstractProducer.php @@ -3,6 +3,7 @@ use Metamorphosis\Exceptions\JsonException; use Metamorphosis\Record\ProducerRecord; +use Metamorphosis\TopicHandler\ConfigOptions; class AbstractProducer implements HandlerInterface { @@ -51,8 +52,8 @@ public function createRecord(): ProducerRecord $record = $this->encodeRecord($record); } - $topic = $this->getTopic(); - $partition = $this->getPartition(); + $topic = $this->getConfigOptions()->getTopicId(); + $partition = $this->getConfigOptions()->getPartition(); $key = $this->getKey(); return new ProducerRecord($record, $topic, $partition, $key); diff --git a/tests/Integration/Dummies/MessageProducerWithConfigOptions.php b/tests/Integration/Dummies/MessageProducerWithConfigOptions.php new file mode 100644 index 00000000..c00152c5 --- /dev/null +++ b/tests/Integration/Dummies/MessageProducerWithConfigOptions.php @@ -0,0 +1,34 @@ + $message->topic_name, + 'payload' => $message->payload, + 'key' => $message->key, + 'partition' => $message->partition, + ]); + } + + public function failed(Message $message): void + { + Log::error('Unable to delivery record to broker.', [ + 'topic' => $message->topic_name, + 'payload' => $message->payload, + 'key' => $message->key, + 'partition' => $message->partition, + 'error' => $message->err, + ]); + + throw new RuntimeException('error sending message!'); + } +} diff --git a/tests/Integration/ProducerWithConfigOptionsTest.php b/tests/Integration/ProducerWithConfigOptionsTest.php new file mode 100644 index 00000000..8ab318c5 --- /dev/null +++ b/tests/Integration/ProducerWithConfigOptionsTest.php @@ -0,0 +1,121 @@ +haveAHandlerConfigured(); + + // I Expect That + $this->myMessagesHaveBeenProduced(); + + // When I + $this->haveSomeRandomMessageProduced(); + + // I Expect That + $this->myMessagesHaveBeenLogged(); + + // When I + $this->runTheConsumer(); + } + + protected function runTheConsumer(): void + { + $dummy = new MessageConsumer($this->configOptions); + $this->instance('\App\Kafka\Consumers\ConsumerOverride', $dummy); + config([ + 'kafka_new_config' => [ + 'brokers' => [ + 'override' => [ + 'connections' => 'kafka:9092', + ], + ], + 'topics' => [ + 'default' => [ + 'broker' => 'override', + 'consumer' => [ + 'consumer_groups' => [ + 'test-consumer-group' => [ + 'handler' => '\App\Kafka\Consumers\ConsumerOverride', + 'offset_reset' => 'earliest', + ], + ], + ], + ], + ], + ], + ]); + $this->artisan( + 'kafka:consume', + [ + 'topic' => 'default', + 'consumer_group' => 'test-consumer-group', + '--timeout' => 20000, + '--times' => 2, + '--config_name' => 'kafka_new_config', + ] + ); + } + + protected function haveAHandlerConfigured(): void + { + $this->configOptions = new ConfigOptions( + 'sale_order_override', + ['connections' => 'kafka:9092'], + 0, + [], + [], + 20000, + false, + true, + 600, + 10 + ); + } + + private function haveSomeRandomMessageProduced(): void + { + $saleOrderProducer = app( + MessageProducerWithConfigOptions::class, + [ + 'record' => ['saleOrderId' => 'SALE_ORDER_ID'], + 'configOptions' => $this->configOptions, + 'key' => 1, + ] + ); + + $saleOrderDispatcher = Metamorphosis::build($saleOrderProducer); + $saleOrderDispatcher->handle($saleOrderProducer->createRecord()); + } + + private function myMessagesHaveBeenLogged() + { + Log::shouldReceive('alert') + ->with('{"saleOrderId":"SALE_ORDER_ID"}'); + } + + private function myMessagesHaveBeenProduced() + { + Log::shouldReceive('info') + ->with('Record successfully sent to broker.', [ + 'topic' => 'sale_order_override', + 'payload' => '{"saleOrderId":"SALE_ORDER_ID"}', + 'key' => '1', + 'partition' => 0, + ]); + } +} diff --git a/tests/Unit/ConfigManagerTest.php b/tests/Unit/ConfigManagerTest.php index 971ed6bb..1720f877 100644 --- a/tests/Unit/ConfigManagerTest.php +++ b/tests/Unit/ConfigManagerTest.php @@ -2,6 +2,7 @@ namespace Tests\Unit; use Metamorphosis\ConfigManager; +use Metamorphosis\TopicHandler\ConfigOptions; use Metamorphosis\TopicHandler\Consumer\AbstractHandler; use Mockery as m; use Tests\LaravelTestCase; @@ -22,16 +23,25 @@ public function testShouldOverrideConfig(): void ], 'topic_id' => 'kafka-test', ]; - $overrideConfig = [ - 'topic_id' => 'kafka-override', - ]; + $configOptions = new ConfigOptions( + 'kafka-override', + ['connections' => 'kafka:9092'], + 1, + [], + [], + 200, + false, + true, + 200, + 1 + ); $configManager = new ConfigManager(); // Expectations $handler->expects() ->getConfigOptions() - ->andReturn($overrideConfig); + ->andReturn($configOptions); // Actions $configManager->set($config); diff --git a/tests/Unit/Connectors/Consumer/ConfigTest.php b/tests/Unit/Connectors/Consumer/ConfigTest.php index dc15a07d..94517b02 100644 --- a/tests/Unit/Connectors/Consumer/ConfigTest.php +++ b/tests/Unit/Connectors/Consumer/ConfigTest.php @@ -3,14 +3,19 @@ use Metamorphosis\Connectors\Consumer\Config; use Metamorphosis\Exceptions\ConfigurationException; +use Metamorphosis\TopicHandler\ConfigOptions; +use Mockery as m; use Tests\LaravelTestCase; +use Tests\Unit\Dummies\ConsumerHandlerDummy; class ConfigTest extends LaravelTestCase { public function testShouldValidateConsumerConfig(): void { // Set - config(['kafka.topics.default.consumer.consumer_groups.test-consumer-group.handler' => '\Tests\Unit\Dummies\ConsumerHandlerDummy']); + config(['kafka.topics.default.consumer.consumer_groups.test-consumer-group.handler' => ConsumerHandlerDummy::class]); + $consumerHandler = $this->instance(ConsumerHandlerDummy::class, m::mock(ConsumerHandlerDummy::class)); + $configOptions = m::mock(ConfigOptions::class); $config = new Config(); $options = [ @@ -30,7 +35,7 @@ public function testShouldValidateConsumerConfig(): void 'offset_reset' => 'earliest', 'offset' => 0, 'partition' => 0, - 'handler' => '\Tests\Unit\Dummies\ConsumerHandlerDummy', + 'handler' => 'Tests\Unit\Dummies\ConsumerHandlerDummy', 'timeout' => 20000, 'consumer_group' => 'test-consumer-group', 'connections' => 'kafka:9092', @@ -44,6 +49,17 @@ public function testShouldValidateConsumerConfig(): void 'request_options' => [], ]; + // Expectations + $consumerHandler->expects() + ->getConfigOptions() + ->andReturn($configOptions); + + $configOptions->expects() + ->toArray() + ->andReturn([ + 'topic_id' => 'kafka-override', + ]); + // Actions $configManager = $config->make($options, $arguments); diff --git a/tests/Unit/Connectors/Producer/ConfigTest.php b/tests/Unit/Connectors/Producer/ConfigTest.php index d59dbfcd..c4ba520b 100644 --- a/tests/Unit/Connectors/Producer/ConfigTest.php +++ b/tests/Unit/Connectors/Producer/ConfigTest.php @@ -3,7 +3,7 @@ use Metamorphosis\Connectors\Producer\Config; use Metamorphosis\Exceptions\ConfigurationException; -use Metamorphosis\TopicHandler\Producer\ConfigOptions; +use Metamorphosis\TopicHandler\ConfigOptions; use Tests\LaravelTestCase; class ConfigTest extends LaravelTestCase diff --git a/tests/Unit/Dummies/ConsumerHandlerDummy.php b/tests/Unit/Dummies/ConsumerHandlerDummy.php index 373d4936..08eb14a0 100644 --- a/tests/Unit/Dummies/ConsumerHandlerDummy.php +++ b/tests/Unit/Dummies/ConsumerHandlerDummy.php @@ -7,11 +7,6 @@ class ConsumerHandlerDummy extends AbstractHandler { - public function __construct() - { - parent::__construct(['topic_id' => 'kafka-override']); - } - public function handle(RecordInterface $data): void { } diff --git a/tests/Unit/ProducerTest.php b/tests/Unit/ProducerTest.php index 95b9c32b..aaf3a512 100644 --- a/tests/Unit/ProducerTest.php +++ b/tests/Unit/ProducerTest.php @@ -8,9 +8,9 @@ use Metamorphosis\Middlewares\Handler\Dispatcher; use Metamorphosis\Middlewares\Handler\Producer as ProducerMiddleware; use Metamorphosis\Producer; +use Metamorphosis\TopicHandler\ConfigOptions; use Metamorphosis\TopicHandler\Producer\AbstractHandler; use Metamorphosis\TopicHandler\Producer\AbstractProducer; -use Metamorphosis\TopicHandler\Producer\ConfigOptions; use Mockery as m; use RdKafka\Producer as KafkaProducer; use RdKafka\ProducerTopic; diff --git a/tests/Unit/TopicHandler/ConfigOptionsFactoryTest.php b/tests/Unit/TopicHandler/ConfigOptionsFactoryTest.php new file mode 100644 index 00000000..6cc77cf4 --- /dev/null +++ b/tests/Unit/TopicHandler/ConfigOptionsFactoryTest.php @@ -0,0 +1,84 @@ + 'kafka-test', + 'connections' => 'kafka:9092', + 'auth' => [ + 'type' => 'ssl', + 'ca' => '/var/www/html/vendor/orchestra/testbench-core/laravel/storage/ca.pem', + 'certificate' => '/var/www/html/vendor/orchestra/testbench-core/laravel/storage/kafka.cert', + 'key' => '/var/www/html/vendor/orchestra/testbench-core/laravel/storage/kafka.key', + ], + 'timeout' => 1000, + 'is_async' => true, + 'partition' => 0, + 'required_acknowledgment' => false, + 'max_poll_records' => 500, + 'flush_attempts' => 10, + 'middlewares' => [], + 'avro_schema' => [ + 'url' => '', + 'ssl_verify' => true, + 'request_options' => [ + 'headers' => [ + 'Authorization' => [ + 'Basic Og==', + ], + ], + ], + ], + ]; + // Actions + $result = $factory->makeByConfigNameWithSchema( + 'kafka', + 'default', + 'default', + 'default' + ); + + // Assertions + $this->assertSame($expected, $result->toArray()); + } + + public function testShouldMakeConfigOptionWithoutAvro(): void + { + // Set + config(['kafka.brokers.new' => ['connections' => 'localhost:9092']]); + $factory = new ConfigOptionsFactory(); + + $expected = [ + 'topic_id' => 'kafka-test', + 'connections' => 'localhost:9092', + 'auth' => null, + 'timeout' => 1000, + 'is_async' => true, + 'partition' => 0, + 'required_acknowledgment' => false, + 'max_poll_records' => 500, + 'flush_attempts' => 10, + 'middlewares' => [], + 'avro_schema' => [], + ]; + + // Actions + $result = $factory->makeByConfigName( + 'kafka', + 'default', + 'new' + ); + + // Assertions + $this->assertSame($expected, $result->toArray()); + } +} diff --git a/tests/Unit/TopicHandler/ConfigOptionsTest.php b/tests/Unit/TopicHandler/ConfigOptionsTest.php new file mode 100644 index 00000000..60305442 --- /dev/null +++ b/tests/Unit/TopicHandler/ConfigOptionsTest.php @@ -0,0 +1,45 @@ + 'kafka:9092'], + 1, + [], + [], + 200, + false, + true, + 200, + 1 + ); + + $expected = [ + 'topic_id' => 'topic-id', + 'connections' => 'kafka:9092', + 'auth' => null, + 'timeout' => 200, + 'is_async' => false, + 'partition' => 1, + 'required_acknowledgment' => true, + 'max_poll_records' => 200, + 'flush_attempts' => 1, + 'middlewares' => [], + 'avro_schema' => [], + ]; + + // Actions + $result = $configOptions->toArray(); + + // Expectations + $this->assertSame($expected, $result); + } +}