Skip to content

Commit

Permalink
[kafka] remove unnecessary boxing (#3725)
Browse files Browse the repository at this point in the history
  • Loading branch information
lachmatt authored Oct 25, 2024
1 parent 69b795d commit 3dd8d2a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ internal static CallTargetState OnMethodBegin<TTarget, TTopicPartition, TMessage
var activity = KafkaInstrumentation.StartProducerActivity(topicPartition.DuckCast<ITopicPartition>(), message, instance.DuckCast<INamedClient>()!);
if (activity is not null)
{
KafkaInstrumentation.InjectContext<TTopicPartition>(message, activity);
KafkaInstrumentation.InjectContext<TTopicPartition, TMessage>(message, activity);
return new CallTargetState(activity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal static CallTargetState OnMethodBegin<TTarget, TTopicPartition, TMessage
var activity = KafkaInstrumentation.StartProducerActivity(topicPartition.DuckCast<ITopicPartition>(), message, instance.DuckCast<INamedClient>()!);
if (activity is not null)
{
KafkaInstrumentation.InjectContext<TTopicPartition>(message, activity);
KafkaInstrumentation.InjectContext<TTopicPartition, TMessage>(message, activity);
// Store delivery handler as state.
return new CallTargetState(activity, deliveryHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ internal static class KafkaInstrumentation
return activity;
}

public static Activity? StartProducerActivity(
ITopicPartition partition,
IKafkaMessage message,
INamedClient producer)
public static Activity? StartProducerActivity<TTopicPartition, TMessage, TClient>(
TTopicPartition partition,
TMessage message,
TClient producer)
where TTopicPartition : ITopicPartition
where TMessage : IKafkaMessage
where TClient : INamedClient
{
string? spanName = null;
if (!string.IsNullOrEmpty(partition.Topic))
Expand All @@ -87,7 +90,8 @@ internal static class KafkaInstrumentation
return activity;
}

public static void InjectContext<TTopicPartition>(IKafkaMessage message, Activity activity)
public static void InjectContext<TTopicPartition, TMessage>(TMessage message, Activity activity)
where TMessage : IKafkaMessage
{
message.Headers ??= MessageHeadersHelper<TTopicPartition>.Create();
Propagators.DefaultTextMapPropagator.Inject(
Expand Down Expand Up @@ -161,7 +165,8 @@ private static IEnumerable<string> MessageHeaderValueGetter(IConsumeResult? mess
return Enumerable.Empty<string>();
}

private static void MessageHeaderValueSetter(IKafkaMessage msg, string key, string val)
private static void MessageHeaderValueSetter<TMessage>(TMessage msg, string key, string val)
where TMessage : IKafkaMessage
{
msg.Headers?.Remove(key);
msg.Headers?.Add(key, Encoding.UTF8.GetBytes(val));
Expand Down

0 comments on commit 3dd8d2a

Please sign in to comment.