curl -XPUT "http://4b83662dcda14aaa87323ca4cbbde779.containerhost:9244/index_template/rmoff_ais_01" -H 'Content-Type: application/json' -d'{ "index_patterns": [ "ship_status_reports", "reefers_and_vessels" ], "template": { "mappings": { "dynamic_templates": [ { "locations": { "match": "*_LOCATION", "mapping": { "type": "geo_point" } } }, { "dates": { "match": "*_TS", "mapping": { "type": "date" } } } ] } }}'
DROP CONNECTOR IF EXISTS SINK_ELASTIC_SHIP_STATUS_REPORTS_01; CREATE SINK CONNECTOR SINK_ELASTIC_SHIP_STATUS_REPORTS_01 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'key.converter' = 'io.confluent.connect.avro.AvroConverter', 'key.converter.schema.registry.url' = 'http://schema-registry:8081', 'value.converter' = 'io.confluent.connect.avro.AvroConverter', 'value.converter.schema.registry.url' = 'http://schema-registry:8081', 'connection.url' = 'http://elasticsearch:9200', 'type.name' = '_doc', 'topics.regex' = 'SHIP_STATUS_REPORTS_V.', 'key.ignore' = 'true', 'schema.ignore' = 'false', 'behavior.on.malformed.documents' = 'ignore', 'transforms' = 'topicname', 'transforms.topicname.type' = 'org.apache.kafka.connect.transforms.RegexRouter', 'transforms.topicname.regex' = '.', 'transforms.topicname.replacement' = 'ship_status_reports' );
Check status [source,sql]
DESCRIBE CONNECTOR SINK_ELASTIC_SHIP_STATUS_REPORTS_01;
Check docs [source,bash]
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/ship*?h=idx,docsCount"
ship_status_reports 79
Check mapping [source,bash]
curl -s "http://localhost:9200/ship_status_reports/_mapping" | jq '.ship_status_reports.mappings.properties|.SHIP_LOCATION, .STATUS_TS' {
"type": "geo_point" } { "type": "date" }
CREATE SINK CONNECTOR SINK_ELASTIC_SHIP_REEFERS_AND_VESSELS_00 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'key.converter' = 'io.confluent.connect.avro.AvroConverter', 'key.converter.schema.registry.url' = 'http://schema-registry:8081', 'value.converter' = 'io.confluent.connect.avro.AvroConverter', 'value.converter.schema.registry.url' = 'http://schema-registry:8081', 'connection.url' = 'http://elasticsearch:9200', 'type.name' = '_doc', 'topics.regex' = 'REEFERS_AND_VESSELS_.*', 'key.ignore' = 'true', 'schema.ignore' = 'false', 'behavior.on.malformed.documents' = 'ignore', 'behavior.on.null.values' = 'DELETE', 'transforms' = 'topicname', 'transforms.topicname.type' = 'org.apache.kafka.connect.transforms.RegexRouter', 'transforms.topicname.regex' = '(.*)\_V[0-9]{2}$', 'transforms.topicname.replacement' = '$1' );