Some sinks will require a schema. An example of this is the JDBC Sink.
Create source data, serialised in varying ways:
JsonConverter, `schemas.enable=false
- a.k.a. throw away your schemas, I want to make life difficult for anyone using this data ;-)curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-debezium-orders-01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "": "43", "": "asgard", "table.whitelist": "demo.orders", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "dbhistory.demo" , "decimal.handling.mode": "double", "include.schema.changes": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "transforms": "unwrap,addTopicPrefix", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-json-no-schema-$1" } }'
Create a JDBC sink
Reading JSON data with no schema data (and
, as is correct)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_00_json", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-no-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "": "sink_postgres_00_json" } }'
The sinks will be
:curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort | grep sink
sink-neo4j-orders-00 | RUNNING | RUNNING sink_postgres_00_json | RUNNING | FAILED
Sink 00 (Reading JSON data with no schema data (and
, as is correct)):curl -s "http://localhost:8083/connectors/sink_postgres_00_json/status" | \ jq '.tasks[0].trace'
[...] org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: sink_postgres_00_json [...]
Solutions? For both sinks we have no schema data.
In the case of sink_00 we admit the fact (
) and Kafka Connect complains that there is no schema.So what so do? Serialise the data with a schema. Either change the way the data is produced to include a schema (e.g. Avro, or with
), OR use stream processing to apply a schema and reserialise the data.
To apply a schema and reserialise the data for consumption by Kafka Connect, you can use KSQL:
docker-compose exec ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while [ $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) -eq 000 ] ; do echo -e $(date) "KSQL Server HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) " (waiting for 200)" ; sleep 5 ; done; ksql http://ksql-server:8088'
CREATE STREAM ORDERS_JSON ( id INT, order_id INT, customer_id INT, order_total_usd DOUBLE, make VARCHAR, model VARCHAR, delivery_city VARCHAR, delivery_company VARCHAR, delivery_address VARCHAR, CREATE_TS VARCHAR, UPDATE_TS VARCHAR ) WITH ( KAFKA_TOPIC='mysql-debezium-json-no-schema-asgard.demo.ORDERS', VALUE_FORMAT='JSON' ); SET 'auto.offset.reset' = 'earliest'; CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT='AVRO', KAFKA_TOPIC='asgard.demo.ORDERS-avro' ) AS SELECT * FROM ORDERS_JSON;
Stream the reserialised and schema-enriched data:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_03_avro", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "asgard.demo.ORDERS-avro", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "key.converter": "", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "": "sink_postgres_03_avro" } }'
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
Some sinks will require a schema. An example of this is the JDBC Sink.
Create source data, serialised in varying ways:
JsonConverter, `schemas.enable=false
- a.k.a. throw away your schemas, I want to make life difficult for anyone using this data ;-)curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-debezium-orders-01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "": "43", "": "asgard", "table.whitelist": "demo.orders", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "dbhistory.demo" , "decimal.handling.mode": "double", "include.schema.changes": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "transforms": "unwrap,addTopicPrefix", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-json-no-schema-$1" } }'
JsonConverter, `schemas.enable=true
- a.k.a. I want to keep my schemas (but don’t care about bloated message size)curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-debezium-orders-02", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "": "44", "": "asgard", "table.whitelist": "demo.orders", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "dbhistory.demo" , "decimal.handling.mode": "double", "include.schema.changes": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "transforms": "unwrap,addTopicPrefix", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-json-with-schema-$1" } }'
Create a JDBC sink
Reading JSON data with no schema data (and
, as is correct)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_00_json", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-no-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "": "sink_postgres_00_json" } }'
Reading JSON data with schema data (and
, as is correct)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-with-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "errors.tolerance": "all", "": "sink_postgres_01" } }'
Reading JSON data with no schema data (and
, incorrectly)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_02", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-no-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "": "sink_postgres_01" } }'
Two of the three sinks will be
:curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort | grep sink
sink-neo4j-orders-00 | RUNNING | RUNNING sink_postgres_00_json | RUNNING | FAILED sink_postgres_01 | RUNNING | RUNNING sink_postgres_02 | RUNNING | FAILED
Sink 00 (Reading JSON data with no schema data (and
, as is correct)):curl -s "http://localhost:8083/connectors/sink_postgres_00_json/status" | \ jq '.tasks[0].trace'
[...] org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: sink_postgres_00_json [...]
Sink 02 (Reading JSON data with no schema data (and
, incorrectly))org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration
Solutions? For both sinks we have no schema data.
In the case of sink_00 we admit the fact (
) and Kafka Connect complains that there is no schema. -
In the case of sink_02 we pretend that there is a schema (
) and Kafka Connect spots our ruse and tells us that the JSON we’ve given it does not match that required (schema
as top-level elements)So what so do? Serialise the data with a schema. Either change the way the data is produced to include a schema (e.g. Avro, or with
), OR use stream processing to apply a schema and reserialise the data.
To apply a schema and reserialise the data for consumption by Kafka Connect, you can use KSQL:
docker-compose exec ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while [ $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) -eq 000 ] ; do echo -e $(date) "KSQL Server HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) " (waiting for 200)" ; sleep 5 ; done; ksql http://ksql-server:8088'
CREATE STREAM ORDERS_JSON ( id INT, order_id INT, customer_id INT, order_total_usd DOUBLE, make VARCHAR, model VARCHAR, delivery_city VARCHAR, delivery_company VARCHAR, delivery_address VARCHAR, CREATE_TS VARCHAR, UPDATE_TS VARCHAR ) WITH ( KAFKA_TOPIC='mysql-debezium-json-no-schema-asgard.demo.ORDERS', VALUE_FORMAT='JSON' ); SET 'auto.offset.reset' = 'earliest'; CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT='AVRO', KAFKA_TOPIC='asgard.demo.ORDERS-avro' ) AS SELECT * FROM ORDERS_JSON;
Stream the reserialised and schema-enriched data:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_03_avro", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "asgard.demo.ORDERS-avro", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "key.converter": "", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "": "sink_postgres_03_avro" } }'
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
select * from sink_postgres_03_avro ORDER BY CREATE_TS DESC LIMIT 2;
Single Message Transforms can be used to apply transformations including:
Change the topic name (n.b. often used by sinks to define the target object name)
Dropping fields
Renaming fields
Renaming the topic
Here the example is on a sink connector but SMT are equally applicable to source connectors too.
Remove the key from its struct
Remove part of the topic name
Append a timestamp to the topic name (useful for time-based indices in Elasticsearch etc)
Rename a field
Drop a field
both get omitted from the target data
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "sink-elastic-orders-01",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "mysql-debezium-asgard.demo.ORDERS",
"key.ignore": "false",
"schema.ignore": "true",
"": "",
"connection.url": "http://elasticsearch:9200",
"transforms": "dropTopicPrefix,extractKey,addDateToTopic,renameField,dropField",
"transforms.addDateToTopic.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addDateToTopic.topic.format": "${topic}-${timestamp}",
"transforms.addDateToTopic.timestamp.format": "YYYYMM",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "delivery_address:shipping_address",
"transforms.dropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist": "CREATE_TS"
Inspect the data in Elasticsearch:
curl -s http://localhost:9200/_cat/indices
green open .kibana_task_manager AhFACVWpRby6kZwYFwM68w 1 0 2 0 12.5kb 12.5kb
green open .kibana_1 xTC-RMxZSj-KcF22zmEoZA 1 0 5 0 22.9kb 22.9kb
yellow open asgard.demo.orders-201905 qzMvZH8DQWKkLjr1yFB-Bw 5 1 3338 0 1.3mb 1.3mb
yellow open mysql-debezium-asgard.demo.orders l5dwQAfjRkWfhTP7EZRFrw 5 1 0 0 1.2kb 1.2kb
echo '{ "size": 1, "sort": [ { "UPDATE_TS": { "order": "desc" } } ] }' |\
http http://localhost:9200/asgard.demo.orders-201905/_search
curl -s http://localhost:9200/asgard.demo.orders-201905/_search \
-H 'content-type: application/json' \
-d '{ "size": 1, "sort": [ { "UPDATE_TS": { "order": "desc" } } ] }' |\
jq '.'
Check the status of the connector
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
Output should be similar to
sink-elastic-orders-01 | RUNNING | RUNNING
source-debezium-orders-00 | RUNNING | RUNNING
Force a failure:
$ docker-compose stop mysql
Stopping mysql ... done
Check the status of the connector again
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
Output should be similar to
sink-elastic-orders-01 | RUNNING | RUNNING
source-debezium-orders-00 | RUNNING | FAILED
Now let’s see why. We could use the REST API, which may or may not give a useful trace:
curl -s "http://localhost:8083/connectors/source-debezium-orders-00/status" | \
jq '.tasks[0].trace'
"org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(\n\tat io.debezium.connector.mysql.AbstractReader.failed(\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$\n\tat\nCaused by:\n\tat\n\tat\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(\n\t... 3 more\n"
(it’s useful, but not so readable)
Best is to crack open the Kafka Connect worker log
docker-compose logs -f kafka-connect
Then search from the end of it in reverse (I use GNU Screen to make this very easy) and look for ERROR
First reverse hit will be the task dying
Task is being killed and will not recover until manually restarted
Second reverse hit will be the cause of the task dying, often a stack trace that you’ll need to pick through
org.apache.kafka.connect.errors.ConnectException … Caused by: at
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "postgres",
"topic.prefix": "postgres-00-",
"": 1000,
"table.whitelist" : "test1,test2",
"": "create_ts",
"validate.non.null": false
$ curl -s "http://localhost:8083/connectors/jdbc_source_postgres_02/status" | \
jq '.tasks[]'
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect-01:8083"
"id": 1,
"state": "RUNNING",
"worker_id": "kafka-connect-02:8083"
docker-compose stop kafka-connect-02
$ curl -s "http://localhost:8083/connectors/jdbc_source_postgres_02/status" | \
jq '.tasks[]'
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect-01:8083"
"id": 1,
"state": "RUNNING",
"worker_id": "kafka-connect-01:8083"