Skip to content

Commit

Permalink
Discard invalid consumer records
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored and knative-prow-robot committed Oct 16, 2024
1 parent d1e6cb4 commit 57ad189
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ public RecordDispatcherMutatorChain(final RecordDispatcher next, final CloudEven

@Override
public Future<Void> dispatch(ConsumerRecord<Object, CloudEvent> record) {
return next.dispatch(
KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record)));
final var n = cloudEventMutator.apply(record);
if (n == record.value()) {
return next.dispatch(record);
}
return next.dispatch(KafkaConsumerRecordUtils.copyRecordAssigningValue(record, n));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cl

@Override
public CloudEvent apply(ConsumerRecord<Object, CloudEvent> record) {
if (record.value() instanceof InvalidCloudEvent) {
return record.value();
}
final var builder = CloudEventBuilder.from(record.value());
applyKafkaMetadata(builder, record.partition(), record.offset());
applyCloudEventOverrides(builder);
Expand Down

0 comments on commit 57ad189

Please sign in to comment.