Skip to content

Commit

Permalink
feat(kafka): Add network support for Kafka container
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastienDegodez committed Dec 15, 2024
1 parent 0d86bda commit 0e7bb9d
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 7 deletions.
51 changes: 47 additions & 4 deletions src/Testcontainers.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Collections.Generic;
using System.Linq;

namespace Testcontainers.Kafka;

/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
Expand All @@ -9,11 +12,15 @@ public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer
public const ushort KafkaPort = 9092;

public const ushort BrokerPort = 9093;

public const ushort ControllerPort = 9094;

public const ushort ZookeeperPort = 2181;

public const string StartupScriptFilePath = "/testcontainers.sh";


private const string ProtocolPrefix = "TC";

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
Expand Down Expand Up @@ -43,6 +50,37 @@ public override KafkaContainer Build()
return new KafkaContainer(DockerResourceConfiguration);
}

/// <summary>
/// Add a listener in the format host:port.
/// Host will be included as a network alias.
/// Use it to register additional connections to the Kafka within the same container network.
///
/// Default listeners: PLAINTEXT://0.0.0.0:9092, BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094
/// </summary>
/// <param name="kafka"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public KafkaBuilder WithListener(string kafka)
{
var host = kafka.Split(':')[0];

var index = (DockerResourceConfiguration.Listeners ?? new List<string>()).Count();
var protocol = $"{ProtocolPrefix}-{index}";
var listener = $"{protocol}://{kafka}";
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";

var currentListeners = this.DockerResourceConfiguration.Environments["KAFKA_LISTENERS"];
var currentListenersSecurityProtocolMap = this.DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"];

return this.Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners:new List<string>{ listener }, advertisedListeners: new List<string>{ listener }))
.WithEnvironment(new Dictionary<string, string>
{
{ "KAFKA_LISTENERS", $"{currentListeners},{string.Join(",", listener)}" },
{ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", $"{currentListenersSecurityProtocolMap},{string.Join(",", listenerSecurityProtocolMap)}" }
})
.WithNetworkAliases(host);
}

/// <inheritdoc />
protected override KafkaBuilder Init()
{
Expand All @@ -51,8 +89,10 @@ protected override KafkaBuilder Init()
.WithPortBinding(KafkaPort, true)
.WithPortBinding(BrokerPort, true)
.WithPortBinding(ZookeeperPort, true)
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort)
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
Expand All @@ -67,6 +107,8 @@ protected override KafkaBuilder Init()
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaServer id=\\d+\\] started"))
.WithStartupCallback((container, ct) =>
{
var additionalAdvertisedListeners =
(container.AdvertisedListeners != null && container.AdvertisedListeners.Any()) ? "," + string.Join(",", container.AdvertisedListeners) : "";
const char lf = '\n';
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
Expand All @@ -79,7 +121,7 @@ protected override KafkaBuilder Init()
startupScript.Append(lf);
startupScript.Append("zookeeper-server-start zookeeper.properties &");
startupScript.Append(lf);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + additionalAdvertisedListeners);
startupScript.Append(lf);
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
startupScript.Append(lf);
Expand All @@ -105,4 +147,5 @@ protected override KafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfigur
{
return new KafkaBuilder(new KafkaConfiguration(oldValue, newValue));
}

}
21 changes: 19 additions & 2 deletions src/Testcontainers.Kafka/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;

namespace Testcontainers.Kafka;

/// <inheritdoc cref="ContainerConfiguration" />
[PublicAPI]
public sealed class KafkaConfiguration : ContainerConfiguration
{
public IEnumerable<string> AdvertisedListeners { get; }
public IEnumerable<string> Listeners { get; }

/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
public KafkaConfiguration()
public KafkaConfiguration(IEnumerable<string> listeners = null, IEnumerable<string> advertisedListeners = null)
{
if ( listeners != null)
{
this.Listeners = [..listeners];
}
if (advertisedListeners != null)
{
this.AdvertisedListeners = [..advertisedListeners];
}
}

/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
Expand Down Expand Up @@ -49,5 +64,7 @@ public KafkaConfiguration(KafkaConfiguration resourceConfiguration)
public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue)
: base(oldValue, newValue)
{
this.Listeners = BuildConfiguration.Combine<IEnumerable<string>>(oldValue.Listeners, newValue.Listeners);
this.AdvertisedListeners = BuildConfiguration.Combine<IEnumerable<string>>(oldValue.AdvertisedListeners, newValue.AdvertisedListeners);
}
}
6 changes: 6 additions & 0 deletions src/Testcontainers.Kafka/KafkaContainer.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
using System.Collections.Generic;

namespace Testcontainers.Kafka;

/// <inheritdoc cref="DockerContainer" />
[PublicAPI]
public sealed class KafkaContainer : DockerContainer
{
private KafkaConfiguration _configuration;
internal IEnumerable<string> AdvertisedListeners => this._configuration.AdvertisedListeners;
/// <summary>
/// Initializes a new instance of the <see cref="KafkaContainer" /> class.
/// </summary>
/// <param name="configuration">The container configuration.</param>
public KafkaContainer(KafkaConfiguration configuration)
: base(configuration)
{
this._configuration = configuration;
}

/// <summary>
Expand All @@ -21,4 +26,5 @@ public string GetBootstrapAddress()
{
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
}

}
60 changes: 60 additions & 0 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System.Collections.Generic;
using System.Text;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;

namespace Testcontainers.Kafka;

public sealed class KafkaContainerNetworkTest : IAsyncLifetime
{
private INetwork _network;
private KafkaContainer _kafkaContainer;

private IContainer _kCatContainer;
public async Task InitializeAsync()
{
_network = new NetworkBuilder().Build();
_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka")
.WithNetwork(_network)
.WithListener("kafka:19092")
.Build();

_kCatContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-kcat")
.WithNetwork(_network)
.WithCommand("-c", "tail -f /dev/null")
.WithEntrypoint("sh")
.WithResourceMapping(Encoding.Default.GetBytes("Message produced by kcat"), "/data/msgs.txt")
.Build();

await _kCatContainer.StartAsync();
await _kafkaContainer.StartAsync();
}

public Task DisposeAsync()
{
return _kafkaContainer.DisposeAsync().AsTask();
}

[Fact]
public async Task TestUsageWithListener()
{
// kcat producer
await _kCatContainer.ExecAsync(new List<string>()
{
"kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"
});


// kcat consumer
var kCatResult = await _kCatContainer.ExecAsync(new List<string>()
{
"kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1"
});

Assert.Contains("Message produced by kcat", kCatResult.Stdout);
}

}
2 changes: 1 addition & 1 deletion tests/Testcontainers.Kafka.Tests/KafkaContainerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public Task DisposeAsync()
}

[Fact]
[Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))]
//[Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))]
public async Task ConsumerReturnsProducerMessage()
{
// Given
Expand Down

0 comments on commit 0e7bb9d

Please sign in to comment.