Skip to content

Commit

Permalink
Discard invalid consumer records (#4128)
Browse files Browse the repository at this point in the history
* Discard invalid consumer records

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add unit tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Oct 16, 2024
1 parent 1d50d7d commit 265839a
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ public RecordDispatcherMutatorChain(final RecordDispatcher next, final CloudEven

@Override
public Future<Void> dispatch(ConsumerRecord<Object, CloudEvent> record) {
return next.dispatch(
KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record)));
final var n = cloudEventMutator.apply(record);
if (n == record.value()) {
return next.dispatch(record);
}
return next.dispatch(KafkaConsumerRecordUtils.copyRecordAssigningValue(record, n));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cl

@Override
public CloudEvent apply(ConsumerRecord<Object, CloudEvent> record) {
if (record.value() instanceof InvalidCloudEvent) {
return record.value();
}
final var builder = CloudEventBuilder.from(record.value());
applyKafkaMetadata(builder, record.partition(), record.offset());
applyCloudEventOverrides(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dev.knative.eventing.kafka.broker.dispatcher.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
Expand All @@ -24,12 +25,14 @@
import static org.mockito.Mockito.when;

import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.vertx.core.Future;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -74,4 +77,22 @@ public void shouldCloseInner() {
assertThat(succeeded.succeeded()).isTrue();
verify(next, times(1)).close();
}

@Test
public void shouldNotThrowOnInvalidCloudEvent() {
final var next = mock(RecordDispatcher.class);

final var called = new AtomicInteger(0);
final var given = new InvalidCloudEvent(null);
final var chain = new RecordDispatcherMutatorChain(next, in -> {
assertThat(in.value()).isSameAs(given);
called.incrementAndGet();
return given;
});

final var givenRecord = new ConsumerRecord<>("t1", 0, 0, (Object) "abc", (CloudEvent) given);

assertThatCode(() -> chain.dispatch(givenRecord)).doesNotThrowAnyException();
assertThat(called.get()).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.net.URI;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -56,6 +58,30 @@ public void shouldAddExtensions() {
assertThat(got).isEqualTo(expected.build());
}

@Test
public void shouldNotThrowOnInvalidCloudEvent() {
final var extensions = Map.of(
"a", "foo",
"b", "bar");
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
.putAllExtensions(extensions)
.build();

final var mutator = new CloudEventOverridesMutator(ceOverrides);

final var given = new InvalidCloudEvent(null);

CloudEvent ce = null;
assertThatCode(() -> {
mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));
})
.doesNotThrowAnyException();

final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

assertThat(got).isSameAs(given);
}

@Test
public void shouldAddKafkaExtensionsWhenNoOverrides() {
final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()
Expand Down

0 comments on commit 265839a

Please sign in to comment.