From 6b6d597f1794b2f4d07edc937f677587add53eaa Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Sat, 21 Sep 2024 17:46:34 -0700 Subject: [PATCH] Add origination time to buffer event and populate the partition key Signed-off-by: Souvik Bose --- .../converter/KinesisRecordConverter.java | 19 ++++++++++++++++--- .../MetadataKeyAttributes.java | 5 +++-- .../processor/KinesisRecordProcessor.java | 7 ++----- .../converter/KinesisRecordConverterTest.java | 5 +++-- .../processor/KinesisRecordProcessorTest.java | 19 +++++++++++++------ 5 files changed, 37 insertions(+), 18 deletions(-) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/{processor => converter}/MetadataKeyAttributes.java (53%) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java index 5a70b95c10..c21dec4c21 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java @@ -12,11 +12,13 @@ import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.record.Record; import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; @@ -29,10 +31,21 @@ public KinesisRecordConverter(final InputCodec codec) { this.codec = codec; } - public List> convert(List kinesisClientRecords) throws IOException { + public List> convert(List kinesisClientRecords, + final String streamName) throws IOException { List> records = new ArrayList<>(); - for (KinesisClientRecord record : kinesisClientRecords) { - processRecord(record, records::add); + for (KinesisClientRecord kinesisClientRecord : kinesisClientRecords) { + processRecord(kinesisClientRecord, record -> { + records.add(record); + Event event = record.getData(); + EventMetadata eventMetadata = event.getMetadata(); + eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, + streamName.toLowerCase()); + eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE, kinesisClientRecord.partitionKey()); + final Instant externalOriginationTime = kinesisClientRecord.approximateArrivalTimestamp(); + event.getEventHandle().setExternalOriginationTime(externalOriginationTime); + event.getMetadata().setExternalOriginationTime(externalOriginationTime); + }); } return records; } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/MetadataKeyAttributes.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/MetadataKeyAttributes.java similarity index 53% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/MetadataKeyAttributes.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/MetadataKeyAttributes.java index e2debba54e..6ef99ddcd3 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/MetadataKeyAttributes.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/MetadataKeyAttributes.java @@ -8,8 +8,9 @@ * */ -package org.opensearch.dataprepper.plugins.kinesis.source.processor; +package org.opensearch.dataprepper.plugins.kinesis.source.converter; public class MetadataKeyAttributes { - static final String KINESIS_STREAM_NAME_METADATA_ATTRIBUTE = "kinesis_stream_name"; + public static final String KINESIS_STREAM_NAME_METADATA_ATTRIBUTE = "stream_name"; + public static final String KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key"; } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index 6df0760ca3..a488cfb1ea 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -18,7 +18,6 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; @@ -162,15 +161,13 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { // Track the records for checkpoint purpose kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer()); - List> records = kinesisRecordConverter.convert(processRecordsInput.records()); + List> records = kinesisRecordConverter.convert(processRecordsInput.records(), streamIdentifier.streamName()); int eventCount = 0; for (Record record: records) { Event event = record.getData(); acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event)); - EventMetadata eventMetadata = event.getMetadata(); - eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, - streamIdentifier.streamName().toLowerCase()); + bufferAccumulator.add(record); eventCount++; } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java index 6b0646e993..2598745b2f 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java @@ -41,6 +41,7 @@ import static org.mockito.Mockito.verify; public class KinesisRecordConverterTest { + private static final String streamId = "stream-1"; @Test void setup() throws IOException { @@ -52,7 +53,7 @@ void setup() throws IOException { KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() .data(ByteBuffer.wrap(sample_record_data.getBytes())) .build(); - kinesisRecordConverter.convert(List.of(kinesisClientRecord)); + kinesisRecordConverter.convert(List.of(kinesisClientRecord), streamId); verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class)); } @@ -79,7 +80,7 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException { KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() .data(ByteBuffer.wrap(writer.toString().getBytes())) .build(); - List> events = kinesisRecordConverter.convert(List.of(kinesisClientRecord)); + List> events = kinesisRecordConverter.convert(List.of(kinesisClientRecord), streamId); assertEquals(events.size(), numRecords); } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index ea002e27e9..692703bb9f 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -32,6 +32,7 @@ import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter; +import org.opensearch.dataprepper.plugins.kinesis.source.converter.MetadataKeyAttributes; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; @@ -189,9 +190,10 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied() List> records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); records.add(record); - when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); @@ -235,9 +237,10 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled() List> records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); records.add(record); - when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); @@ -285,9 +288,10 @@ void testProcessRecordsWithAcknowledgementsEnabled() List> records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); records.add(record); - when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); @@ -339,9 +343,10 @@ void testProcessRecordsWithNDJsonInputCodec() List> records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); records.add(record); - when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); @@ -381,9 +386,10 @@ void testProcessRecordsNoThrowException() List> records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); records.add(record); - when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records); final Throwable exception = mock(RuntimeException.class); doThrow(exception).when(bufferAccumulator).add(any(Record.class)); @@ -405,9 +411,10 @@ void testProcessRecordsBufferFlushNoThrowException() List> records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); records.add(record); - when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records); final Throwable exception = mock(RuntimeException.class); doThrow(exception).when(bufferAccumulator).flush();