diff --git a/.gitignore b/.gitignore
index 4320cfaa4..93ba65b1c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ release.properties
*.iml
.classpath
.project
+.factorypath
.settings/
.vscode/
.attach_pid*
diff --git a/examples/pom.xml b/examples/pom.xml
index eb3acaada..109de6829 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,6 +28,7 @@
spring-reactive
spring-rsocket
spring-function
+ spring-kafka
diff --git a/examples/spring-kafka/README.md b/examples/spring-kafka/README.md
new file mode 100644
index 000000000..84fe0e9b0
--- /dev/null
+++ b/examples/spring-kafka/README.md
@@ -0,0 +1,32 @@
+# Spring Kafka + CloudEvents sample
+
+## Build
+
+```shell
+mvn package
+```
+
+## Start Consumer
+
+```shell
+mvn spring-boot:run
+```
+
+You can try sending a request using any kafka client, or using the integration tests in this project. You send to the "in" topic and it echos back a cloud event on the "out" topic. The listener is implemented like this (the request and response are modelled directly as a `CloudEvent`):
+
+```java
+@KafkaListener(id = "listener", topics = "in", clientIdPrefix = "demo")
+@SendTo("out")
+public CloudEvent listen(CloudEvent event) {
+ return ...;
+}
+```
+
+and to make that work we need to install the Kafka message converter as a `@Bean`:
+
+```java
+@Bean
+public CloudEventRecordMessageConverter recordMessageConverter() {
+ return new CloudEventRecordMessageConverter();
+}
+```
diff --git a/examples/spring-kafka/pom.xml b/examples/spring-kafka/pom.xml
new file mode 100644
index 000000000..327669a5a
--- /dev/null
+++ b/examples/spring-kafka/pom.xml
@@ -0,0 +1,82 @@
+
+
+
+ cloudevents-examples
+ io.cloudevents
+ 2.1.0-SNAPSHOT
+
+ 4.0.0
+
+ cloudevents-spring-kafka-example
+
+
+ 2.4.3
+ 1.15.2
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring-boot.version}
+ pom
+ import
+
+
+ org.testcontainers
+ testcontainers-bom
+ ${testcontainers.version}
+ pom
+ import
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ io.cloudevents
+ cloudevents-spring
+ ${project.version}
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ io.cloudevents
+ cloudevents-json-jackson
+ ${project.version}
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+
+
+
+
diff --git a/examples/spring-kafka/src/main/java/io/cloudevents/examples/spring/DemoApplication.java b/examples/spring-kafka/src/main/java/io/cloudevents/examples/spring/DemoApplication.java
new file mode 100644
index 000000000..bba99b8be
--- /dev/null
+++ b/examples/spring-kafka/src/main/java/io/cloudevents/examples/spring/DemoApplication.java
@@ -0,0 +1,58 @@
+package io.cloudevents.examples.spring;
+
+import java.net.URI;
+import java.util.UUID;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.TopicBuilder;
+import org.springframework.messaging.handler.annotation.SendTo;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.spring.kafka.CloudEventRecordMessageConverter;
+
+@SpringBootApplication
+public class DemoApplication {
+
+ public static void main(String[] args) throws Exception {
+ SpringApplication.run(DemoApplication.class, args);
+ }
+
+ @KafkaListener(id = "listener", topics = "in", clientIdPrefix = "demo")
+ @SendTo("out")
+ public CloudEvent listen(CloudEvent event) {
+ System.err.println("Echo: " + event);
+ return CloudEventBuilder.from(event).withId(UUID.randomUUID().toString())
+ .withSource(URI.create("https://spring.io/foos")).withType("io.spring.event.Foo")
+ .withData(event.getData().toBytes()).build();
+ }
+
+ @Bean
+ public NewTopic topicOut() {
+ return TopicBuilder.name("out").build();
+ }
+
+ @Bean
+ public NewTopic topicIn() {
+ return TopicBuilder.name("in").build();
+ }
+
+ @Configuration
+ public static class CloudEventMessageConverterConfiguration {
+ /**
+ * Configure a RecordMessageConverter for Spring Kafka to pick up and use to
+ * convert to and from CloudEvent and Message.
+ */
+ @Bean
+ public CloudEventRecordMessageConverter recordMessageConverter() {
+ return new CloudEventRecordMessageConverter();
+ }
+
+ }
+
+}
diff --git a/examples/spring-kafka/src/main/java/io/cloudevents/examples/spring/Foo.java b/examples/spring-kafka/src/main/java/io/cloudevents/examples/spring/Foo.java
new file mode 100644
index 000000000..679c38df3
--- /dev/null
+++ b/examples/spring-kafka/src/main/java/io/cloudevents/examples/spring/Foo.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.examples.spring;
+
+class Foo {
+
+ private String value;
+
+ public Foo() {
+ }
+
+ public Foo(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Foo [value=" + this.value + "]";
+ }
+
+}
\ No newline at end of file
diff --git a/examples/spring-kafka/src/main/resources/application.properties b/examples/spring-kafka/src/main/resources/application.properties
new file mode 100644
index 000000000..a8296f04a
--- /dev/null
+++ b/examples/spring-kafka/src/main/resources/application.properties
@@ -0,0 +1,4 @@
+spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
diff --git a/examples/spring-kafka/src/test/java/io/cloudevents/examples/spring/DemoApplicationTests.java b/examples/spring-kafka/src/test/java/io/cloudevents/examples/spring/DemoApplicationTests.java
new file mode 100644
index 000000000..c71ba012c
--- /dev/null
+++ b/examples/spring-kafka/src/test/java/io/cloudevents/examples/spring/DemoApplicationTests.java
@@ -0,0 +1,165 @@
+package io.cloudevents.examples.spring;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.net.URI;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.util.MimeType;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = "spring.kafka.consumer.auto-offset-reset=earliest")
+@ContextConfiguration(initializers = DemoApplicationTests.Initializer.class)
+public class DemoApplicationTests {
+
+ @Autowired
+ private KafkaTemplate kafka;
+
+ @Autowired
+ private KafkaListenerConfiguration listener;
+
+ @BeforeEach
+ public void clear() {
+ listener.queue.clear();
+ }
+
+ @Test
+ void echoWithKafkaStyleHeaders() throws Exception {
+
+ kafka.send(MessageBuilder.withPayload("{\"value\":\"Dave\"}".getBytes()) //
+ .setHeader(KafkaHeaders.TOPIC, "in") //
+ .setHeader("ce_id", "12345") //
+ .setHeader("ce_specversion", "1.0") //
+ .setHeader("ce_type", "io.spring.event") //
+ .setHeader("ce_source", "https://spring.io/events") //
+ .setHeader("ce_datacontenttype", MimeType.valueOf("application/json")) //
+ .build());
+
+ Message response = listener.queue.poll(2000, TimeUnit.MILLISECONDS);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getPayload()).isEqualTo("{\"value\":\"Dave\"}".getBytes());
+
+ MessageHeaders headers = response.getHeaders();
+
+ assertThat(headers.get("ce-id")).isNotNull();
+ assertThat(headers.get("ce-source")).isNotNull();
+ assertThat(headers.get("ce-type")).isNotNull();
+
+ assertThat(headers.get("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.get("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @Test
+ void echoWithCanonicalHeaders() throws Exception {
+
+ kafka.send(MessageBuilder.withPayload("{\"value\":\"Dave\"}".getBytes()) //
+ .setHeader(KafkaHeaders.TOPIC, "in") //
+ .setHeader("ce-id", "12345") //
+ .setHeader("ce-specversion", "1.0") //
+ .setHeader("ce-type", "io.spring.event") //
+ .setHeader("ce-source", "https://spring.io/events") //
+ .setHeader("ce-datacontenttype", MimeType.valueOf("application/json")) //
+ .build());
+
+ Message response = listener.queue.poll(2000, TimeUnit.MILLISECONDS);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getPayload()).isEqualTo("{\"value\":\"Dave\"}".getBytes());
+
+ MessageHeaders headers = response.getHeaders();
+
+ assertThat(headers.get("ce-id")).isNotNull();
+ assertThat(headers.get("ce-source")).isNotNull();
+ assertThat(headers.get("ce-type")).isNotNull();
+
+ assertThat(headers.get("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.get("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @Test
+ void echoWithStructured() throws Exception {
+
+ CloudEvent event = CloudEventBuilder.v1() //
+ .withId("12345") //
+ .withSource(URI.create("https://spring.io/events")) //
+ .withType("io.spring.event") //
+ .withDataContentType("application/json") //
+ .withData("{\"value\":\"Dave\"}".getBytes()).build();
+
+ kafka.send(MessageBuilder.withPayload(event) //
+ .setHeader(KafkaHeaders.TOPIC, "in") //
+ .setHeader("contentType", MimeType.valueOf("application/cloudevents+json")) //
+ .build());
+
+ Message response = listener.queue.poll(2000, TimeUnit.MILLISECONDS);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getPayload()).isEqualTo("{\"value\":\"Dave\"}".getBytes());
+
+ MessageHeaders headers = response.getHeaders();
+
+ assertThat(headers.get("ce-id")).isNotNull();
+ assertThat(headers.get("ce-source")).isNotNull();
+ assertThat(headers.get("ce-type")).isNotNull();
+
+ assertThat(headers.get("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.get("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/foos");
+
+ }
+
+ @TestConfiguration
+ static class KafkaListenerConfiguration {
+
+ private ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(1);
+
+ @KafkaListener(id = "test", topics = "out", clientIdPrefix = "test")
+ public void listen(Message message) {
+ System.err.println(message);
+ queue.add(message);
+ }
+
+ }
+
+ public static class Initializer implements ApplicationContextInitializer {
+
+ private static KafkaContainer kafka;
+
+ static {
+ kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3")) //
+ .withNetwork(null); // .withReuse(true);
+ kafka.start();
+ }
+
+ @Override
+ public void initialize(ConfigurableApplicationContext context) {
+ TestPropertyValues.of("spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers()).applyTo(context);
+ }
+
+ }
+}
diff --git a/examples/spring-kafka/src/test/java/io/cloudevents/examples/spring/StructuredApplicationTests.java b/examples/spring-kafka/src/test/java/io/cloudevents/examples/spring/StructuredApplicationTests.java
new file mode 100644
index 000000000..6cdcd2bbc
--- /dev/null
+++ b/examples/spring-kafka/src/test/java/io/cloudevents/examples/spring/StructuredApplicationTests.java
@@ -0,0 +1,112 @@
+package io.cloudevents.examples.spring;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.net.URI;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.util.MimeType;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = "spring.kafka.consumer.auto-offset-reset=earliest")
+@ContextConfiguration(initializers = StructuredApplicationTests.Initializer.class)
+public class StructuredApplicationTests {
+
+ @Autowired
+ private KafkaTemplate kafka;
+
+ @Autowired
+ private KafkaListenerConfiguration listener;
+
+ @BeforeEach
+ public void clear() {
+ listener.queue.clear();
+ }
+
+ @Test
+ void echo() throws Exception {
+
+ CloudEvent event = CloudEventBuilder.v1() //
+ .withId("12345") //
+ .withSource(URI.create("https://spring.io/events")) //
+ .withType("io.spring.event") //
+ .withDataContentType("application/json") //
+ .withData("{\"value\":\"Dave\"}".getBytes()).build();
+
+ kafka.send(MessageBuilder.withPayload(event) //
+ .setHeader(KafkaHeaders.TOPIC, "in") //
+ .setHeader("contentType", MimeType.valueOf("application/cloudevents+json")) //
+ .build());
+
+ Message response = listener.queue.poll(2000, TimeUnit.MILLISECONDS);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getPayload().getData().toBytes()).isEqualTo("{\"value\":\"Dave\"}".getBytes());
+ assertThat(response.getPayload().getId()).isNotEqualTo("12345");
+ assertThat(response.getPayload().getType()).isEqualTo("io.spring.event.Foo");
+ assertThat(response.getPayload().getSource().toString()).isEqualTo("https://spring.io/foos");
+
+ MessageHeaders headers = response.getHeaders();
+
+ assertThat(headers.get("ce-id")).isNotNull();
+ assertThat(headers.get("ce_id")).isNull();
+ assertThat(headers.get("ce-source")).isNotNull();
+ assertThat(headers.get("ce-type")).isNotNull();
+
+ assertThat(headers.get("ce-id")).isNotEqualTo("12345");
+ assertThat(headers.get("ce-type")).isEqualTo("io.spring.event.Foo");
+ assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/foos");
+ assertThat(headers.get("ce-datacontenttype")).isEqualTo("application/json");
+
+ }
+
+ @TestConfiguration
+ static class KafkaListenerConfiguration {
+
+ private ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(1);
+
+ @KafkaListener(id = "structured", topics = "out", clientIdPrefix = "structured")
+ public void listen(Message message) {
+ System.err.println(message);
+ queue.add(message);
+ }
+
+ }
+
+ public static class Initializer implements ApplicationContextInitializer {
+
+ private static KafkaContainer kafka;
+
+ static {
+ kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3")) //
+ .withNetwork(null); // .withReuse(true);
+ kafka.start();
+ }
+
+ @Override
+ public void initialize(ConfigurableApplicationContext context) {
+ TestPropertyValues.of("spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers()).applyTo(context);
+ }
+
+ }
+}
diff --git a/examples/spring-reactive/README.md b/examples/spring-reactive/README.md
index 0d64449b5..d4c58ac95 100644
--- a/examples/spring-reactive/README.md
+++ b/examples/spring-reactive/README.md
@@ -36,7 +36,7 @@ curl -v -H'Content-type: application/cloudevents+json' \
http://localhost:8080/event
```
-The `/event endpoint is implemented like this (the request and response are modelled directly as a `CloudEvent`):
+The `/event` endpoint is implemented like this (the request and response are modelled directly as a `CloudEvent`):
```java
@PostMapping("/event")
diff --git a/pom.xml b/pom.xml
index a8db0c08b..519fe0075 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,6 +166,7 @@
https://qpid.apache.org/releases/qpid-proton-j-0.33.7/api/
https://fasterxml.github.io/jackson-databind/javadoc/2.10/
+
diff --git a/spring/pom.xml b/spring/pom.xml
index 53fa97189..e00009471 100644
--- a/spring/pom.xml
+++ b/spring/pom.xml
@@ -33,6 +33,7 @@
io.cloudevents.spring
2.4.3
+ 3.1.2
@@ -63,6 +64,17 @@
spring-messaging
true
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
+
+ org.springframework.cloud
+ spring-cloud-function-context
+ ${spring-cloud-function.version}
+ true
+
io.cloudevents
cloudevents-core
@@ -115,4 +127,3 @@
-
diff --git a/spring/src/main/java/io/cloudevents/spring/kafka/CloudEventRecordMessageConverter.java b/spring/src/main/java/io/cloudevents/spring/kafka/CloudEventRecordMessageConverter.java
new file mode 100644
index 000000000..04a67e7e9
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/kafka/CloudEventRecordMessageConverter.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.kafka;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.kafka.support.converter.MessagingMessageConverter;
+import org.springframework.kafka.support.converter.RecordMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.spring.messaging.CloudEventContextUtils;
+import io.cloudevents.spring.messaging.CloudEventMessageConverter;
+import io.cloudevents.spring.messaging.CloudEventsHeaders;
+
+public class CloudEventRecordMessageConverter implements RecordMessageConverter {
+
+ private RecordMessageConverter delegate = new MessagingMessageConverter();
+
+ private CloudEventMessageConverter converter = new CloudEventMessageConverter();
+
+ private static String KAFKA_PREFIX = "ce_";
+
+ @Override
+ public Message> toMessage(ConsumerRecord, ?> record, Acknowledgment acknowledgment, Consumer, ?> consumer,
+ Type payloadType) {
+ Message> message = canonicalize(delegate.toMessage(record, acknowledgment, consumer, payloadType));
+ if (payloadType.equals(CloudEvent.class)) {
+ CloudEvent event = (CloudEvent) converter.fromMessage(message, CloudEvent.class);
+ message = MessageBuilder.withPayload(event).copyHeaders(message.getHeaders()).build();
+ }
+ return message;
+ }
+
+ @Override
+ public ProducerRecord, ?> fromMessage(Message> message, String defaultTopic) {
+ if (message.getPayload() instanceof CloudEvent) {
+ CloudEvent payload = (CloudEvent) message.getPayload();
+ Map map = CloudEventContextUtils.toMap(payload);
+ message = MessageBuilder.withPayload(payload.getData().toBytes()).copyHeaders(message.getHeaders())
+ .copyHeaders(map).build();
+ }
+ return delegate.fromMessage(message, defaultTopic);
+ }
+
+ private static Message> canonicalize(Message> message) {
+ Map headers = new HashMap<>(message.getHeaders());
+ for (Map.Entry entry : message.getHeaders().entrySet()) {
+ if (entry.getKey().startsWith(KAFKA_PREFIX)) {
+ headers.remove(entry.getKey());
+ headers.put(CloudEventsHeaders.CE_PREFIX + entry.getKey().substring(KAFKA_PREFIX.length()),
+ stringValue(entry.getValue()));
+ } else {
+ headers.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return MessageBuilder.createMessage(message.getPayload(), new MessageHeaders(headers));
+ }
+
+ private static String stringValue(Object value) {
+ if (value instanceof byte[]) {
+ return new String((byte[]) value);
+ }
+ return value.toString();
+ }
+
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/kafka/package-info.java b/spring/src/main/java/io/cloudevents/spring/kafka/package-info.java
new file mode 100644
index 000000000..99511b8c6
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/kafka/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Provides classes related to working with Cloud Events within the context of Spring and
+ * Kafka.
+ */
+package io.cloudevents.spring.kafka;
diff --git a/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageConverter.java b/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageConverter.java
index 68c41f572..f68c07795 100644
--- a/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageConverter.java
+++ b/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageConverter.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -31,9 +31,9 @@
/**
* A {@link MessageConverter} that can translate to and from a {@link Message
- * Message<byte[]>} or {@link Message Message<String>} and a {@link CloudEvent}. The
- * {@link CloudEventContext} is canonicalized, with key names given a {@code ce-} prefix in the
- * {@link MessageHeaders}.
+ * Message<byte[]>} or {@link Message Message<String>} and a
+ * {@link CloudEvent}. The {@link CloudEventContext} is canonicalized, with key
+ * names given a {@code ce-} prefix in the {@link MessageHeaders}.
*
* @author Dave Syer
*/
@@ -67,7 +67,7 @@ private MessageReader createMessageReader(Message> message) {
private String version(MessageHeaders message) {
if (message.containsKey(CloudEventsHeaders.SPEC_VERSION)) {
- return message.get(CloudEventsHeaders.SPEC_VERSION).toString();
+ return stringValue(message.get(CloudEventsHeaders.SPEC_VERSION));
}
return null;
}
@@ -81,21 +81,24 @@ private MessageReader structuredMessageReader(Message> message, EventFormat fo
}
private String contentType(MessageHeaders message) {
- if (message.containsKey(MessageHeaders.CONTENT_TYPE)) {
- return message.get(MessageHeaders.CONTENT_TYPE).toString();
- }
- if (message.containsKey(CloudEventsHeaders.CONTENT_TYPE)) {
- return message.get(CloudEventsHeaders.CONTENT_TYPE).toString();
+ if (message.containsKey(MessageHeaders.CONTENT_TYPE) && !message.containsKey(CloudEventsHeaders.CONTENT_TYPE)) {
+ return stringValue(message.get(MessageHeaders.CONTENT_TYPE));
}
return null;
}
+ private String stringValue(Object value) {
+ if (value instanceof byte[]) {
+ return new String((byte[])value);
+ }
+ return value.toString();
+ }
+
private byte[] getBinaryData(Message> message) {
Object payload = message.getPayload();
if (payload instanceof byte[]) {
return (byte[]) payload;
- }
- else if (payload instanceof String) {
+ } else if (payload instanceof String) {
return ((String) payload).getBytes(Charset.defaultCharset());
}
return null;
diff --git a/spring/src/main/java/io/cloudevents/spring/messaging/MessageBinaryMessageReader.java b/spring/src/main/java/io/cloudevents/spring/messaging/MessageBinaryMessageReader.java
index 8989fbe8c..59ba7f95d 100644
--- a/spring/src/main/java/io/cloudevents/spring/messaging/MessageBinaryMessageReader.java
+++ b/spring/src/main/java/io/cloudevents/spring/messaging/MessageBinaryMessageReader.java
@@ -15,15 +15,18 @@
*/
package io.cloudevents.spring.messaging;
+import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
+import org.springframework.messaging.MessageHeaders;
+
import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import static io.cloudevents.spring.messaging.CloudEventsHeaders.CE_PREFIX;
-import static org.springframework.messaging.MessageHeaders.CONTENT_TYPE;
+import static io.cloudevents.spring.messaging.CloudEventsHeaders.CONTENT_TYPE;
/**
* Utility for converting maps (message headers) to `CloudEvent` contexts.
@@ -33,11 +36,14 @@
*/
class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl {
- private final Map headers;
+ private final Map headers = new HashMap<>();
public MessageBinaryMessageReader(SpecVersion version, Map headers, byte[] payload) {
super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
- this.headers = headers;
+ this.headers.putAll(headers);
+ if (headers.containsKey(MessageHeaders.CONTENT_TYPE) && !headers.containsKey(CONTENT_TYPE)) {
+ this.headers.put(CONTENT_TYPE, headers.get(MessageHeaders.CONTENT_TYPE));
+ }
}
public MessageBinaryMessageReader(SpecVersion version, Map headers) {
@@ -67,6 +73,9 @@ protected void forEachHeader(BiConsumer fn) {
@Override
protected String toCloudEventsValue(Object value) {
+ if (value instanceof byte[]) {
+ return new String((byte[])value);
+ }
return value.toString();
}
diff --git a/spring/src/test/java/io/cloudevents/spring/kafka/CloudEventRecordMessageConverterTests.java b/spring/src/test/java/io/cloudevents/spring/kafka/CloudEventRecordMessageConverterTests.java
new file mode 100644
index 000000000..f09bf60b1
--- /dev/null
+++ b/spring/src/test/java/io/cloudevents/spring/kafka/CloudEventRecordMessageConverterTests.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+
+/**
+ * @author Dave Syer
+ *
+ */
+class CloudEventRecordMessageConverterTests {
+
+ private static final String JSON = "{\"specversion\":\"1.0\"," //
+ + "\"id\":\"12345\"," //
+ + "\"source\":\"https://spring.io/events\"," //
+ + "\"type\":\"io.spring.event\"," //
+ + "\"datacontenttype\":\"application/json\"," //
+ + "\"data\":{\"value\":\"Dave\"}" //
+ + "}";
+
+ private CloudEventRecordMessageConverter converter = new CloudEventRecordMessageConverter();
+
+ @Test
+ void structuredCloudEventMessage() {
+ ConsumerRecord, ?> record = new ConsumerRecord("in", 0, 0, null, JSON.getBytes());
+ record.headers().add(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json".getBytes());
+ Message> message = converter.toMessage(record, null, null, CloudEvent.class);
+ assertThat(message).isNotNull();
+ CloudEvent event = (CloudEvent) message.getPayload();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getDataContentType()).isEqualTo("application/json");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+
+ @Test
+ void binaryCloudEventMessage() {
+ ConsumerRecord, ?> record = new ConsumerRecord("in", 0, 0, null, "{\"value\":\"Dave\"}".getBytes());
+ record.headers() //
+ .add(MessageHeaders.CONTENT_TYPE, "application/json".getBytes()) //
+ .add("ce_specversion", "1.0".getBytes()) //
+ .add("ce_id", "12345".getBytes()) //
+ .add("ce_source", "https://spring.io/events".getBytes()) //
+ .add("ce_type", "io.spring.event".getBytes());
+ Message> message = converter.toMessage(record, null, null, CloudEvent.class);
+ assertThat(message).isNotNull();
+ CloudEvent event = (CloudEvent) message.getPayload();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getDataContentType()).isEqualTo("application/json");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+ @Test
+ void binaryNonCloudEventMessage() {
+ ConsumerRecord, ?> record = new ConsumerRecord("in", 0, 0, null, "{\"value\":\"Dave\"}".getBytes());
+ record.headers() //
+ .add(MessageHeaders.CONTENT_TYPE, "application/json".getBytes()) //
+ .add("ce_specversion", "1.0".getBytes()) //
+ .add("ce_id", "12345".getBytes()) //
+ .add("ce_source", "https://spring.io/events".getBytes()) //
+ .add("ce_type", "io.spring.event".getBytes());
+ Message> message = converter.toMessage(record, null, null, String.class);
+ assertThat(message).isNotNull();
+ // TODO: should it be a String?
+ assertThat(message.getPayload()).isEqualTo("{\"value\":\"Dave\"}".getBytes());
+ Map headers = message.getHeaders();
+ assertThat(headers.get("ce-id")).isEqualTo("12345");
+ assertThat(headers.get("ce-specversion")).isEqualTo("1.0");
+ assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/events");
+ assertThat(headers.get("ce-type")).isEqualTo("io.spring.event");
+ }
+}
diff --git a/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventContextUtilsTests.java b/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventContextUtilsTests.java
index a8fc19286..5b6e1c388 100644
--- a/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventContextUtilsTests.java
+++ b/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventContextUtilsTests.java
@@ -32,7 +32,7 @@ public void testWithEmpty() {
@Test
public void testWithPrefix() {
Map headers = new HashMap<>();
- headers.put("ce-scpecversion", "1.0");
+ headers.put("ce-specversion", "1.0");
headers.put("ce-id", "A234-1234-1234");
headers.put("ce-source", "https://spring.io/");
headers.put("ce-type", "org.springframework");
@@ -48,7 +48,7 @@ public void testWithPrefix() {
@Test
public void testExtensionsWithPrefix() {
Map headers = new HashMap<>();
- headers.put("ce-scpecversion", "1.0");
+ headers.put("ce-specversion", "1.0");
headers.put("ce-id", "A234-1234-1234");
headers.put("ce-source", "https://spring.io/");
headers.put("ce-type", "org.springframework");
diff --git a/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventMessageConverterTests.java b/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventMessageConverterTests.java
index eaa84e1ae..e57f0605c 100644
--- a/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventMessageConverterTests.java
+++ b/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventMessageConverterTests.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -83,6 +83,25 @@ void validCloudEvent() {
assertThat(event.getType()).isEqualTo("io.spring.event");
}
+ @Test
+ void cloudEventWithContentType() {
+ // Sometimes you get a message that already has headers, but was originally a structured
+ // CloudEvent in the incoming transport, and we need to be able to extract the event
+ // safely.
+ Message> message = MessageBuilder.withPayload("{}").setHeader("ce-specversion", "1.0")
+ .setHeader("ce-id", "12345").setHeader("ce-source", "https://spring.io/events")
+ .setHeader("ce-type", "io.spring.event")
+ .setHeader("ce-datacontenttype", "application/json")
+ .setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json").build();
+ CloudEvent event = (CloudEvent) converter.fromMessage(message, CloudEvent.class);
+ assertThat(event).isNotNull();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getDataContentType()).isEqualTo("application/json");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+
@Test
void structuredCloudEvent() {
byte[] payload = JSON.getBytes();
@@ -92,11 +111,26 @@ void structuredCloudEvent() {
assertThat(event).isNotNull();
assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getDataContentType()).isEqualTo("application/json");
assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
assertThat(event.getType()).isEqualTo("io.spring.event");
}
@Test
+ void structuredCloudEventBinaryHeader() {
+ byte[] payload = JSON.getBytes();
+ Message> message = MessageBuilder.withPayload(payload)
+ .setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json".getBytes()).build();
+ CloudEvent event = (CloudEvent) converter.fromMessage(message, CloudEvent.class);
+ assertThat(event).isNotNull();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getDataContentType()).isEqualTo("application/json");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+
+ @Test
void structuredCloudEventStringPayload() {
Message> message = MessageBuilder.withPayload(JSON)
.setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json").build();