diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java index e50806a0..301d6921 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java @@ -21,14 +21,11 @@ import org.swisspush.gateleen.core.validation.ValidationStatus; import org.swisspush.gateleen.validation.ValidationException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; -import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; - /** * Handler class for all Kafka related requests. * @@ -56,41 +53,41 @@ public class KafkaHandler extends ConfigurationResourceConsumer { private boolean initialized = false; - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, - KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { - this(configurationResourceManager, null, repository, kafkaMessageSender, - configResourceUri, streamingPath); - } - - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, - KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, - String streamingPath) { - this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender, - configResourceUri, streamingPath, new HashMap<>()); - } - - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, - KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { - - this(configurationResourceManager, null, repository, kafkaMessageSender, - configResourceUri, streamingPath, properties); - } - - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, - KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { - this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager, - kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath, - properties); - log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!"); - } +// /** @deprecated Use {@link #builder()} */ +// @Deprecated +// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, +// KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { +// this(configurationResourceManager, null, repository, kafkaMessageSender, +// configResourceUri, streamingPath); +// } +// +// /** @deprecated Use {@link #builder()} */ +// @Deprecated +// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, +// KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, +// String streamingPath) { +// this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender, +// configResourceUri, streamingPath, new HashMap<>()); +// } +// +// /** @deprecated Use {@link #builder()} */ +// @Deprecated +// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, +// KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { +// +// this(configurationResourceManager, null, repository, kafkaMessageSender, +// configResourceUri, streamingPath, properties); +// } +// +// /** @deprecated Use {@link #builder()} */ +// @Deprecated +// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, +// KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { +// this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager, +// kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath, +// properties); +// log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!"); +// } /** Use {@link #builder()} to get an instance. */ KafkaHandler( @@ -181,7 +178,7 @@ public boolean handle(final HttpServerRequest request) { // TODO refactor away this callback-hell (Counts for the COMPLETE method // surrounding this line, named 'KafkaHandler.handle()', NOT only // those lines below). - kafkaProducerRecordBuilder.buildRecords(topic, payload).compose((List> kafkaProducerRecords) -> { + kafkaProducerRecordBuilder.buildRecordsAsync(topic, payload).compose((List> kafkaProducerRecords) -> { maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> { if(validationEvent.succeeded()) { if(validationEvent.result().isSuccess()) { @@ -208,6 +205,7 @@ public boolean handle(final HttpServerRequest request) { return; } log.error("TODO error handling", exceptionFactory.newException(ex)); + respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request); }); }); return true; diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java index 9c2a7ddb..c7a850a1 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java @@ -51,7 +51,7 @@ class KafkaProducerRecordBuilder { * @return A list of {@link KafkaProducerRecord}s created from the provided payload * @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.) */ - Future>> buildRecords(String topic, Buffer payload) { + Future>> buildRecordsAsync(String topic, Buffer payload) { return Future.succeededFuture().compose((Void v) -> { JsonObject payloadObj; try { @@ -82,9 +82,9 @@ Future>> buildRecords(String topic, Buf }); } - /** @deprecated Use {@link #buildRecords(String, Buffer)}. */ + /** @deprecated Use {@link #buildRecordsAsync(String, Buffer)}. */ @Deprecated - static List> buildRecordsBlocking(String topic, Buffer payload) throws ValidationException { + static List> buildRecords(String topic, Buffer payload) throws ValidationException { List> kafkaProducerRecords = new ArrayList<>(); JsonObject payloadObj; try { diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java index 4dc44624..e2ba6751 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java @@ -22,7 +22,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; -import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecordsBlocking; +import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords; /** * Test class for the {@link KafkaMessageSender} @@ -47,7 +47,7 @@ public void sendSingleMessage(TestContext context) throws ValidationException { Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode())); + buildRecords(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); @@ -64,7 +64,7 @@ public void sendSingleMessageWithoutKey(TestContext context) throws ValidationEx Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload(null).encode())); + buildRecords(topic, Buffer.buffer(buildSingleRecordPayload(null).encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); @@ -81,7 +81,7 @@ public void sendMultipleMessages(TestContext context) throws ValidationException Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); + buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); @@ -105,7 +105,7 @@ public void sendMultipleMessagesWithFailingMessage(TestContext context) throws V Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); + buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); when(producer.send(eq(records.get(1)))).thenReturn(Future.failedFuture("Message with key '" + records.get(1).key() + "' failed.")); diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java index 0c03ec09..3cb7a63c 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java @@ -17,7 +17,7 @@ import java.util.List; -import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecordsBlocking; +import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords; /** * Test class for the {@link KafkaProducerRecordBuilder} @@ -34,34 +34,34 @@ public class KafkaProducerRecordBuilderTest { public void buildRecordsInvalidJson() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Error while parsing payload"); - buildRecordsBlocking("myTopic", Buffer.buffer("notValidJson")); + buildRecords("myTopic", Buffer.buffer("notValidJson")); } @Test public void buildRecordsMissingRecordsArray() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Missing 'records' array"); - buildRecordsBlocking("myTopic", Buffer.buffer("{}")); + buildRecords("myTopic", Buffer.buffer("{}")); } @Test public void buildRecordsNotArray() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects"); - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}")); + buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}")); } @Test public void buildRecordsInvalidRecordsType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects"); - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [123]}")); + buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}")); } @Test public void buildRecordsEmptyRecordsArray(TestContext context) throws ValidationException { final List> records = - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": []}")); + buildRecords("myTopic", Buffer.buffer("{\"records\": []}")); context.assertTrue(records.isEmpty()); } @@ -69,42 +69,42 @@ public void buildRecordsEmptyRecordsArray(TestContext context) throws Validation public void buildRecordsInvalidKeyType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'key' must be of type String"); - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}")); + buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}")); } @Test public void buildRecordsInvalidValueType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'value' must be of type JsonObject"); - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}")); + buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}")); } @Test public void buildRecordsMissingValue() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'value' is required"); - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\":[{}]}")); + buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}")); } @Test public void buildRecordsInvalidHeadersType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'headers' must be of type JsonObject"); - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}")); + buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}")); } @Test public void buildRecordsInvalidHeadersValueType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'headers' must be of type JsonObject holding String values only"); - buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}")); + buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}")); } @Test public void buildRecordsValidNoKeyNoHeaders(TestContext context) throws ValidationException { JsonObject payload = buildPayload(null, null); final List> records = - buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode())); + buildRecords("myTopic", Buffer.buffer(payload.encode())); context.assertFalse(records.isEmpty()); context.assertEquals(1, records.size()); context.assertEquals("myTopic", records.get(0).topic()); @@ -118,7 +118,7 @@ public void buildRecordsValidNoKeyNoHeaders(TestContext context) throws Validati public void buildRecordsValidWithKeyNoHeaders(TestContext context) throws ValidationException { JsonObject payload = buildPayload("someKey", null); final List> records = - buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode())); + buildRecords("myTopic", Buffer.buffer(payload.encode())); context.assertFalse(records.isEmpty()); context.assertEquals(1, records.size()); context.assertEquals("myTopic", records.get(0).topic()); @@ -135,7 +135,7 @@ public void buildRecordsValidWithKeyWithHeaders(TestContext context) throws Vali .add("header_1", "value_1") .add("header_2", "value_2")); final List> records = - buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode())); + buildRecords("myTopic", Buffer.buffer(payload.encode())); context.assertFalse(records.isEmpty()); context.assertEquals(1, records.size()); context.assertEquals("myTopic", records.get(0).topic());