diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapper.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapper.java index f48a0d65d..c6c172191 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapper.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapper.java @@ -31,13 +31,27 @@ public class MinimalAndMessageAttributesHeaderMapper implements HeaderMapper { private final MinimalHeaderMapper minimalHeaderMapping = new MinimalHeaderMapper(); + private static final String HEADER_PROJECT_ID = "GCPProjectId"; + private static final String HEADER_TOPIC_ID = "PubSubTopicId"; + private static final String HEADER_SUBSCRIPTION_ID = "PubSubSubscriptionId"; + @Override public Map mapHeaders(final PubSubMessageData source) { val miniMap = minimalHeaderMapping.mapHeaders(source); + val extraMap = mapExtra(source); val headMap = source.getMessage().getAttributesMap(); return ImmutableMap.builder() .putAll(miniMap) + .putAll(extraMap) .putAll(headMap) .build(); } + + private Map mapExtra(PubSubMessageData source) { + return Map.of( + HEADER_PROJECT_ID, source.getSourcePartition().getProjectId(), + HEADER_TOPIC_ID, source.getSourcePartition().getTopicId(), + HEADER_SUBSCRIPTION_ID, source.getSourcePartition().getSubscriptionId() + ); + } } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapperTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapperTest.java index 0ce4e8d33..568f7bff9 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapperTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapperTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -41,7 +42,7 @@ class MinimalAndMessageAttributesHeaderMapperTest { private static final Map HEADERS_MAP = Map.of("attr1", "value1", "attr2", "value2"); - @Mock + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private PubSubMessageData pubSubMessageData; @Mock @@ -55,18 +56,27 @@ void setup() { } @Test - void testGetHeaders() { + void testMapHeaders() { + when(pubsubMessage.getPublishTime()).thenReturn(Timestamp.newBuilder().setSeconds(PUBLISH_TIME_INSTANT .getEpochSecond()).build()); when(pubsubMessage.getAttributesMap()).thenReturn(HEADERS_MAP); when(pubSubMessageData.getMessage()).thenReturn(pubsubMessage); + when(pubSubMessageData.getSourcePartition().getProjectId()).thenReturn("test-project"); + when(pubSubMessageData.getSourcePartition().getTopicId()).thenReturn("test-topic"); + when(pubSubMessageData.getSourcePartition().getSubscriptionId()).thenReturn("test-subscription"); Map result = minimalAndMessageAttributesHeaderMapping.mapHeaders(pubSubMessageData); assertEquals( ImmutableMap.builder() .put("PublishTimestamp", String.valueOf(PUBLISH_TIME_INSTANT.getEpochSecond())) - .putAll(HEADERS_MAP).build(), + .putAll(HEADERS_MAP) + .put("GCPProjectId", "test-project") // Include expected values from mapExtra + .put("PubSubTopicId", "test-topic") + .put("PubSubSubscriptionId", "test-subscription") + .build(), result); } + }