diff --git a/src/Testcontainers.Kafka/KafkaBuilder.cs b/src/Testcontainers.Kafka/KafkaBuilder.cs index 210129972..1093d572c 100644 --- a/src/Testcontainers.Kafka/KafkaBuilder.cs +++ b/src/Testcontainers.Kafka/KafkaBuilder.cs @@ -1,3 +1,6 @@ +using System.Collections.Generic; +using System.Linq; + namespace Testcontainers.Kafka; /// @@ -9,11 +12,15 @@ public sealed class KafkaBuilder : ContainerBuilder /// Initializes a new instance of the class. /// @@ -43,6 +50,37 @@ public override KafkaContainer Build() return new KafkaContainer(DockerResourceConfiguration); } + /// + /// Add a listener in the format host:port. + /// Host will be included as a network alias. + /// Use it to register additional connections to the Kafka within the same container network. + /// + /// Default listeners: PLAINTEXT://0.0.0.0:9092, BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094 + /// + /// + /// + /// + public KafkaBuilder WithListener(string kafka) + { + var host = kafka.Split(':')[0]; + + var index = (DockerResourceConfiguration.Listeners ?? new List()).Count(); + var protocol = $"{ProtocolPrefix}-{index}"; + var listener = $"{protocol}://{kafka}"; + var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT"; + + var currentListeners = this.DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]; + var currentListenersSecurityProtocolMap = this.DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]; + + return this.Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners:new List{ listener }, advertisedListeners: new List{ listener })) + .WithEnvironment(new Dictionary + { + { "KAFKA_LISTENERS", $"{currentListeners},{string.Join(",", listener)}" }, + { "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", $"{currentListenersSecurityProtocolMap},{string.Join(",", listenerSecurityProtocolMap)}" } + }) + .WithNetworkAliases(host); + } + /// protected override KafkaBuilder Init() { @@ -51,8 +89,10 @@ protected override KafkaBuilder Init() .WithPortBinding(KafkaPort, true) .WithPortBinding(BrokerPort, true) .WithPortBinding(ZookeeperPort, true) - .WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort) - .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT") + .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}") + .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") + .WithEnvironment("KAFKA_NODE_ID", "1") + .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort) .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") .WithEnvironment("KAFKA_BROKER_ID", "1") .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") @@ -67,6 +107,8 @@ protected override KafkaBuilder Init() .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaServer id=\\d+\\] started")) .WithStartupCallback((container, ct) => { + var additionalAdvertisedListeners = + (container.AdvertisedListeners != null && container.AdvertisedListeners.Any()) ? "," + string.Join(",", container.AdvertisedListeners) : ""; const char lf = '\n'; var startupScript = new StringBuilder(); startupScript.Append("#!/bin/bash"); @@ -79,7 +121,7 @@ protected override KafkaBuilder Init() startupScript.Append(lf); startupScript.Append("zookeeper-server-start zookeeper.properties &"); startupScript.Append(lf); - startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort); + startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + additionalAdvertisedListeners); startupScript.Append(lf); startupScript.Append("echo '' > /etc/confluent/docker/ensure"); startupScript.Append(lf); @@ -105,4 +147,5 @@ protected override KafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfigur { return new KafkaBuilder(new KafkaConfiguration(oldValue, newValue)); } + } \ No newline at end of file diff --git a/src/Testcontainers.Kafka/KafkaConfiguration.cs b/src/Testcontainers.Kafka/KafkaConfiguration.cs index f741cb306..994c34530 100644 --- a/src/Testcontainers.Kafka/KafkaConfiguration.cs +++ b/src/Testcontainers.Kafka/KafkaConfiguration.cs @@ -1,16 +1,31 @@ +using System.Collections; +using System.Collections.Generic; +using System.Linq; + namespace Testcontainers.Kafka; /// [PublicAPI] public sealed class KafkaConfiguration : ContainerConfiguration { + public IEnumerable AdvertisedListeners { get; } + public IEnumerable Listeners { get; } + /// /// Initializes a new instance of the class. /// - public KafkaConfiguration() + public KafkaConfiguration(IEnumerable listeners = null, IEnumerable advertisedListeners = null) { + if ( listeners != null) + { + this.Listeners = [..listeners]; + } + if (advertisedListeners != null) + { + this.AdvertisedListeners = [..advertisedListeners]; + } } - + /// /// Initializes a new instance of the class. /// @@ -49,5 +64,7 @@ public KafkaConfiguration(KafkaConfiguration resourceConfiguration) public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue) : base(oldValue, newValue) { + this.Listeners = BuildConfiguration.Combine>(oldValue.Listeners, newValue.Listeners); + this.AdvertisedListeners = BuildConfiguration.Combine>(oldValue.AdvertisedListeners, newValue.AdvertisedListeners); } } \ No newline at end of file diff --git a/src/Testcontainers.Kafka/KafkaContainer.cs b/src/Testcontainers.Kafka/KafkaContainer.cs index 41407fa7d..477c6c920 100644 --- a/src/Testcontainers.Kafka/KafkaContainer.cs +++ b/src/Testcontainers.Kafka/KafkaContainer.cs @@ -1,9 +1,13 @@ +using System.Collections.Generic; + namespace Testcontainers.Kafka; /// [PublicAPI] public sealed class KafkaContainer : DockerContainer { + private KafkaConfiguration _configuration; + internal IEnumerable AdvertisedListeners => this._configuration.AdvertisedListeners; /// /// Initializes a new instance of the class. /// @@ -11,6 +15,7 @@ public sealed class KafkaContainer : DockerContainer public KafkaContainer(KafkaConfiguration configuration) : base(configuration) { + this._configuration = configuration; } /// @@ -21,4 +26,5 @@ public string GetBootstrapAddress() { return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString(); } + } \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs new file mode 100644 index 000000000..27b49e1d0 --- /dev/null +++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs @@ -0,0 +1,60 @@ +using System.Collections.Generic; +using System.Text; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; +using DotNet.Testcontainers.Networks; + +namespace Testcontainers.Kafka; + +public sealed class KafkaContainerNetworkTest : IAsyncLifetime +{ + private INetwork _network; + private KafkaContainer _kafkaContainer; + + private IContainer _kCatContainer; + public async Task InitializeAsync() + { + _network = new NetworkBuilder().Build(); + _kafkaContainer = new KafkaBuilder() + .WithImage("confluentinc/cp-kafka") + .WithNetwork(_network) + .WithListener("kafka:19092") + .Build(); + + _kCatContainer = new ContainerBuilder() + .WithImage("confluentinc/cp-kcat") + .WithNetwork(_network) + .WithCommand("-c", "tail -f /dev/null") + .WithEntrypoint("sh") + .WithResourceMapping(Encoding.Default.GetBytes("Message produced by kcat"), "/data/msgs.txt") + .Build(); + + await _kCatContainer.StartAsync(); + await _kafkaContainer.StartAsync(); + } + + public Task DisposeAsync() + { + return _kafkaContainer.DisposeAsync().AsTask(); + } + + [Fact] + public async Task TestUsageWithListener() + { + // kcat producer + await _kCatContainer.ExecAsync(new List() + { + "kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt" + }); + + + // kcat consumer + var kCatResult = await _kCatContainer.ExecAsync(new List() + { + "kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1" + }); + + Assert.Contains("Message produced by kcat", kCatResult.Stdout); + } + +} \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerTest.cs index 33220a78c..f0e1a4225 100644 --- a/tests/Testcontainers.Kafka.Tests/KafkaContainerTest.cs +++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerTest.cs @@ -15,7 +15,7 @@ public Task DisposeAsync() } [Fact] - [Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))] + //[Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))] public async Task ConsumerReturnsProducerMessage() { // Given