Skip to content

Commit

Permalink
Fix issue with mutable Kafka config
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Dec 24, 2023
1 parent c42e32c commit 3728bf3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,22 @@ public KafkaMessageConsumer(
SecurityProtocol = configuration.SecurityProtocol.HasValue ? (Confluent.Kafka.SecurityProtocol?)((int) configuration.SecurityProtocol.Value) : null,
SslCaLocation = configuration.SslCaLocation
};
_consumerConfig = new ConsumerConfig(_clientConfig)


// We repeat properties because copying them from the ClientConfig modifies the ClientConfig in place
_consumerConfig = new ConsumerConfig()
{
GroupId = groupId,
BootstrapServers = string.Join(",", configuration.BootStrapServers),
ClientId = configuration.Name,
Debug = configuration.Debug,
SaslMechanism = configuration.SaslMechanisms.HasValue ? (Confluent.Kafka.SaslMechanism?)((int)configuration.SaslMechanisms.Value) : null,
SaslKerberosPrincipal = configuration.SaslKerberosPrincipal,
SaslUsername = configuration.SaslUsername,
SaslPassword = configuration.SaslPassword,
SecurityProtocol = configuration.SecurityProtocol.HasValue ? (Confluent.Kafka.SecurityProtocol?)((int) configuration.SecurityProtocol.Value) : null,
SslCaLocation = configuration.SslCaLocation,
GroupId = groupId,
AutoOffsetReset = offsetDefault,
BootstrapServers = string.Join(",", configuration.BootStrapServers),
SessionTimeoutMs = sessionTimeoutMs,
MaxPollIntervalMs = maxPollIntervalMs,
EnablePartitionEof = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,20 @@ public KafkaMessageProducer(

};

_producerConfig = new ProducerConfig(_clientConfig)
//We repeat properties because copying them from them to the producer config updates in client config in place
_producerConfig = new ProducerConfig()
{
Acks = (Confluent.Kafka.Acks)((int)publication.Replication),
BootstrapServers = string.Join(",", configuration.BootStrapServers),
ClientId = configuration.Name,
Debug = configuration.Debug,
SaslMechanism = configuration.SaslMechanisms.HasValue ? (Confluent.Kafka.SaslMechanism?)((int)configuration.SaslMechanisms.Value) : null,
SaslKerberosPrincipal = configuration.SaslKerberosPrincipal,
SaslUsername = configuration.SaslUsername,
SaslPassword = configuration.SaslPassword,
SecurityProtocol = configuration.SecurityProtocol.HasValue ? (Confluent.Kafka.SecurityProtocol?)((int)configuration.SecurityProtocol.Value) : null,
SslCaLocation = configuration.SslCaLocation,
SslKeyLocation = configuration.SslKeystoreLocation,
BatchNumMessages = publication.BatchNumberMessages,
EnableIdempotence = publication.EnableIdempotence,
EnableDeliveryReports = true, //don't change this, we need it for the callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class KafkaProducerRegistryFactory : IAmAProducerRegistryFactory
/// that determines how we publish to Kafka and the parameters of any topic if required.
/// </summary>
/// <param name="globalConfiguration">Configures how we connect to the broker</param>
/// <param name="publication">How do we publish, both producer parameters and topic configuration</param>
/// <param name="publications">The list of topics that we want to publish to</param>
public KafkaProducerRegistryFactory(
KafkaMessagingGatewayConfiguration globalConfiguration,
IEnumerable<KafkaPublication> publications)
Expand Down

0 comments on commit 3728bf3

Please sign in to comment.