From a1db072b4fadaa820f0d63f571cdcafda9e7e03b Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Sat, 2 Nov 2024 22:38:29 +0530 Subject: [PATCH] OS Connector: Index Write method updated. Debezium Connector: Tombstones removed Signed-off-by: Lalith Kota --- scripts/pbms/debezium-connectors/default.json | 3 +- .../opensearch-connectors/10.g2p_program.json | 3 +- .../opensearch-connectors/20.res_partner.json | 3 +- .../21.res_partner_history.json | 2 +- .../30.g2p_program_membership.json | 3 +- .../31.g2p_program_registrant_info.json | 3 +- ...2.g2p_program_registrant_info_history.json | 2 +- .../33.g2p_program_assessment.json | 3 +- ...40.g2p_program_membership_deduplicate.json | 3 +- .../50.g2p_program_fund.json | 3 +- .../60.g2p_entitlement.json | 3 +- .../70.g2p_payment_batch.json | 3 +- .../opensearch-connectors/80.g2p_payment.json | 3 +- .../debezium-connectors/default.json | 5 ++- .../{05.g2p_id_type.json => 05.id_type.json} | 7 ++-- ...e_history.json => 06.id_type_history.json} | 4 +- ...0.res_partner.json => 11.res_partner.json} | 6 ++- ...story.json => 12.res_partner_history.json} | 2 +- ...dedupe.json => 13.res_partner_dedupe.json} | 2 +- .../{15.g2p_reg_id.json => 14.reg_id.json} | 42 ++++++++++++------- ...id_history.json => 15.reg_id_history.json} | 28 ++++++------- 21 files changed, 81 insertions(+), 52 deletions(-) rename scripts/social-registry/opensearch-connectors/{05.g2p_id_type.json => 05.id_type.json} (89%) rename scripts/social-registry/opensearch-connectors/{06.g2p_id_type_history.json => 06.id_type_history.json} (92%) rename scripts/social-registry/opensearch-connectors/{10.res_partner.json => 11.res_partner.json} (92%) rename scripts/social-registry/opensearch-connectors/{11.res_partner_history.json => 12.res_partner_history.json} (95%) rename scripts/social-registry/opensearch-connectors/{12.res_partner_dedupe.json => 13.res_partner_dedupe.json} (92%) rename scripts/social-registry/opensearch-connectors/{15.g2p_reg_id.json => 14.reg_id.json} (63%) rename scripts/social-registry/opensearch-connectors/{16.g2p_reg_id_history.json => 15.reg_id_history.json} (90%) diff --git a/scripts/pbms/debezium-connectors/default.json b/scripts/pbms/debezium-connectors/default.json index fab6745..b432581 100644 --- a/scripts/pbms/debezium-connectors/default.json +++ b/scripts/pbms/debezium-connectors/default.json @@ -14,7 +14,8 @@ "topic.prefix": "${DB_PREFIX_INDEX}", "table.include.list": "public.res_partner,public.g2p_program,public.g2p_program_fund,public.g2p_program_membership,public.g2p_program_membership_duplicate,public.g2p_program_registrant_info,public.g2p_program_assessment,public.g2p_entitlement,public.g2p_payment_batch,public.g2p_payment", "heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}", - "decimal.handling.mode": "double" + "decimal.handling.mode": "double", + "tombstones.on.delete": false }, "wait_after_init_secs": 20 } diff --git a/scripts/pbms/opensearch-connectors/10.g2p_program.json b/scripts/pbms/opensearch-connectors/10.g2p_program.json index 68df4e3..0b88745 100644 --- a/scripts/pbms/opensearch-connectors/10.g2p_program.json +++ b/scripts/pbms/opensearch-connectors/10.g2p_program.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_program", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/20.res_partner.json b/scripts/pbms/opensearch-connectors/20.res_partner.json index 736c375..d539564 100644 --- a/scripts/pbms/opensearch-connectors/20.res_partner.json +++ b/scripts/pbms/opensearch-connectors/20.res_partner.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.res_partner", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/21.res_partner_history.json b/scripts/pbms/opensearch-connectors/21.res_partner_history.json index 8db78d4..7c1dee5 100644 --- a/scripts/pbms/opensearch-connectors/21.res_partner_history.json +++ b/scripts/pbms/opensearch-connectors/21.res_partner_history.json @@ -24,7 +24,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/30.g2p_program_membership.json b/scripts/pbms/opensearch-connectors/30.g2p_program_membership.json index 1178ca4..5e2624d 100644 --- a/scripts/pbms/opensearch-connectors/30.g2p_program_membership.json +++ b/scripts/pbms/opensearch-connectors/30.g2p_program_membership.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_program_membership", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/31.g2p_program_registrant_info.json b/scripts/pbms/opensearch-connectors/31.g2p_program_registrant_info.json index 2e26329..945b2b7 100644 --- a/scripts/pbms/opensearch-connectors/31.g2p_program_registrant_info.json +++ b/scripts/pbms/opensearch-connectors/31.g2p_program_registrant_info.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_program_registrant_info", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/32.g2p_program_registrant_info_history.json b/scripts/pbms/opensearch-connectors/32.g2p_program_registrant_info_history.json index cf7f74a..904ce34 100644 --- a/scripts/pbms/opensearch-connectors/32.g2p_program_registrant_info_history.json +++ b/scripts/pbms/opensearch-connectors/32.g2p_program_registrant_info_history.json @@ -24,7 +24,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/33.g2p_program_assessment.json b/scripts/pbms/opensearch-connectors/33.g2p_program_assessment.json index 0461bbd..64f98fd 100644 --- a/scripts/pbms/opensearch-connectors/33.g2p_program_assessment.json +++ b/scripts/pbms/opensearch-connectors/33.g2p_program_assessment.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_program_assessment", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/40.g2p_program_membership_deduplicate.json b/scripts/pbms/opensearch-connectors/40.g2p_program_membership_deduplicate.json index 6653084..14b1f9d 100644 --- a/scripts/pbms/opensearch-connectors/40.g2p_program_membership_deduplicate.json +++ b/scripts/pbms/opensearch-connectors/40.g2p_program_membership_deduplicate.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_program_membership_duplicate", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/50.g2p_program_fund.json b/scripts/pbms/opensearch-connectors/50.g2p_program_fund.json index d3dbd28..22cb9fa 100644 --- a/scripts/pbms/opensearch-connectors/50.g2p_program_fund.json +++ b/scripts/pbms/opensearch-connectors/50.g2p_program_fund.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_program_fund", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/60.g2p_entitlement.json b/scripts/pbms/opensearch-connectors/60.g2p_entitlement.json index b43195b..f72e293 100644 --- a/scripts/pbms/opensearch-connectors/60.g2p_entitlement.json +++ b/scripts/pbms/opensearch-connectors/60.g2p_entitlement.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_entitlement", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/70.g2p_payment_batch.json b/scripts/pbms/opensearch-connectors/70.g2p_payment_batch.json index 70f9e66..7bf6b6a 100644 --- a/scripts/pbms/opensearch-connectors/70.g2p_payment_batch.json +++ b/scripts/pbms/opensearch-connectors/70.g2p_payment_batch.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_payment_batch", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/pbms/opensearch-connectors/80.g2p_payment.json b/scripts/pbms/opensearch-connectors/80.g2p_payment.json index 9710f70..ebe2329 100644 --- a/scripts/pbms/opensearch-connectors/80.g2p_payment.json +++ b/scripts/pbms/opensearch-connectors/80.g2p_payment.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_payment", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/social-registry/debezium-connectors/default.json b/scripts/social-registry/debezium-connectors/default.json index 844a270..ae2ca3c 100644 --- a/scripts/social-registry/debezium-connectors/default.json +++ b/scripts/social-registry/debezium-connectors/default.json @@ -12,9 +12,10 @@ "database.password": "${DB_PASS}", "database.dbname": "${DB_NAME}", "topic.prefix": "${DB_PREFIX_INDEX}", - "table.include.list": "public.res_partner,public.g2p_id_type,public.g2p_reg_id", + "table.include.list": "public.g2p_region,public.g2p_id_type,public.res_partner,public.g2p_reg_id", "heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}", - "decimal.handling.mode": "double" + "decimal.handling.mode": "double", + "tombstones.on.delete": false }, "wait_after_init_secs": 20 } diff --git a/scripts/social-registry/opensearch-connectors/05.g2p_id_type.json b/scripts/social-registry/opensearch-connectors/05.id_type.json similarity index 89% rename from scripts/social-registry/opensearch-connectors/05.g2p_id_type.json rename to scripts/social-registry/opensearch-connectors/05.id_type.json index 1e2d3f8..68af9a5 100644 --- a/scripts/social-registry/opensearch-connectors/05.g2p_id_type.json +++ b/scripts/social-registry/opensearch-connectors/05.id_type.json @@ -1,5 +1,5 @@ { - "name": "g2p_id_type_${DB_PREFIX_INDEX}", + "name": "id_type_${DB_PREFIX_INDEX}", "config": { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "connection.url": "${OPENSEARCH_URL}", @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_id_type", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", @@ -41,5 +42,5 @@ "transforms.tsSelect.ts.order": "write_date,create_date,source_ts_ms", "transforms.tsSelect.output.field": "@timestamp_gen" }, - "wait_after_init_secs": 15 + "wait_after_init_secs": 20 } diff --git a/scripts/social-registry/opensearch-connectors/06.g2p_id_type_history.json b/scripts/social-registry/opensearch-connectors/06.id_type_history.json similarity index 92% rename from scripts/social-registry/opensearch-connectors/06.g2p_id_type_history.json rename to scripts/social-registry/opensearch-connectors/06.id_type_history.json index b40b23d..54818a6 100644 --- a/scripts/social-registry/opensearch-connectors/06.g2p_id_type_history.json +++ b/scripts/social-registry/opensearch-connectors/06.id_type_history.json @@ -1,5 +1,5 @@ { - "name": "g2p_id_type_history_${DB_PREFIX_INDEX}", + "name": "id_type_history_${DB_PREFIX_INDEX}", "config": { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "connection.url": "${OPENSEARCH_URL}", @@ -24,7 +24,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/social-registry/opensearch-connectors/10.res_partner.json b/scripts/social-registry/opensearch-connectors/11.res_partner.json similarity index 92% rename from scripts/social-registry/opensearch-connectors/10.res_partner.json rename to scripts/social-registry/opensearch-connectors/11.res_partner.json index 6dcb69a..45e824a 100644 --- a/scripts/social-registry/opensearch-connectors/10.res_partner.json +++ b/scripts/social-registry/opensearch-connectors/11.res_partner.json @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.res_partner", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -24,7 +25,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", @@ -50,5 +51,6 @@ "transforms.tsSelect.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampSelector${dollar}Value", "transforms.tsSelect.ts.order": "write_date,create_date,source_ts_ms", "transforms.tsSelect.output.field": "@timestamp_gen" - } + }, + "wait_after_init_secs": 60 } diff --git a/scripts/social-registry/opensearch-connectors/11.res_partner_history.json b/scripts/social-registry/opensearch-connectors/12.res_partner_history.json similarity index 95% rename from scripts/social-registry/opensearch-connectors/11.res_partner_history.json rename to scripts/social-registry/opensearch-connectors/12.res_partner_history.json index 9d196aa..e32112c 100644 --- a/scripts/social-registry/opensearch-connectors/11.res_partner_history.json +++ b/scripts/social-registry/opensearch-connectors/12.res_partner_history.json @@ -24,7 +24,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/social-registry/opensearch-connectors/12.res_partner_dedupe.json b/scripts/social-registry/opensearch-connectors/13.res_partner_dedupe.json similarity index 92% rename from scripts/social-registry/opensearch-connectors/12.res_partner_dedupe.json rename to scripts/social-registry/opensearch-connectors/13.res_partner_dedupe.json index da05e54..922223e 100644 --- a/scripts/social-registry/opensearch-connectors/12.res_partner_dedupe.json +++ b/scripts/social-registry/opensearch-connectors/13.res_partner_dedupe.json @@ -27,7 +27,7 @@ "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.renameTopic.type": "org.openg2p.reporting.kafka.connect.transforms.RenameTopic", "transforms.renameTopic.topic": "res_partner" diff --git a/scripts/social-registry/opensearch-connectors/15.g2p_reg_id.json b/scripts/social-registry/opensearch-connectors/14.reg_id.json similarity index 63% rename from scripts/social-registry/opensearch-connectors/15.g2p_reg_id.json rename to scripts/social-registry/opensearch-connectors/14.reg_id.json index 0151996..ec559de 100644 --- a/scripts/social-registry/opensearch-connectors/15.g2p_reg_id.json +++ b/scripts/social-registry/opensearch-connectors/14.reg_id.json @@ -1,5 +1,5 @@ { - "name": "g2p_reg_id_${DB_PREFIX_INDEX}", + "name": "reg_id_${DB_PREFIX_INDEX}", "config": { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "connection.url": "${OPENSEARCH_URL}", @@ -7,6 +7,7 @@ "connection.password": "${OPENSEARCH_PASSWORD}", "tasks.max": "1", "topics": "${DB_PREFIX_INDEX}.public.g2p_reg_id", + "index.write.method": "upsert", "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -18,29 +19,39 @@ "behavior.on.malformed.documents": "warn", "behavior.on.version.conflict": "warn", - "transforms": "keyExtId,valExt,join01,insertBack1,insertBack2,tsconvert01,tsconvert02,tsconvert03,tsSelect", + "transforms": "keyExtId,join01,join02,insertBack1,insertBack2,valExt,tsconvert01,tsconvert02,tsconvert03,tsSelect", "transforms.keyExtId.type": "org.apache.kafka.connect.transforms.ExtractField${dollar}Key", "transforms.keyExtId.field": "id", - "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", - "transforms.join01.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value", - "transforms.join01.input.fields": "id_type", - "transforms.join01.output.fields": "id_type_name", + "transforms.join01.input.fields": "payload.before.id_type", + "transforms.join01.output.fields": "before_id_type_name", + "transforms.join01.output.default.value": "", "transforms.join01.es.index": "${DB_PREFIX_INDEX}.public.g2p_id_type", "transforms.join01.es.input.fields": "id", "transforms.join01.es.output.fields": "name", - "transforms.join01.es.url": "${OPENSEARCH_URL}", "transforms.join01.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}", + "transforms.join01.es.url": "${OPENSEARCH_URL}", "transforms.join01.es.username": "${OPENSEARCH_USERNAME}", "transforms.join01.es.password": "${OPENSEARCH_PASSWORD}", + "transforms.join02.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value", + "transforms.join02.input.fields": "payload.after.id_type", + "transforms.join02.output.fields": "after_id_type_name", + "transforms.join02.output.default.value": "", + "transforms.join02.es.index": "${DB_PREFIX_INDEX}.public.g2p_id_type", + "transforms.join02.es.input.fields": "id", + "transforms.join02.es.output.fields": "name", + "transforms.join02.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}", + "transforms.join02.es.url": "${OPENSEARCH_URL}", + "transforms.join02.es.username": "${OPENSEARCH_USERNAME}", + "transforms.join02.es.password": "${OPENSEARCH_PASSWORD}", + "transforms.insertBack1.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewFieldInsertBack${dollar}Value", - "transforms.insertBack1.id.expr": ".partner_id", - "transforms.insertBack1.condition": ".id_type_name == \"NATIONAL ID\"", - "transforms.insertBack1.value": "{reg_id_NATIONAL_ID: .value}", + "transforms.insertBack1.id.expr": ".payload.before.partner_id", + "transforms.insertBack1.condition": "(.payload.before.partner_id != null) and (.before_id_type_name != null) and (.before_id_type_name != \"\")", + "transforms.insertBack1.value": "{(\"reg_id_\" + (.before_id_type_name | gsub(\"\\\\s+\"; \"_\"))): null}", "transforms.insertBack1.es.index": "${DB_PREFIX_INDEX}.public.res_partner", "transforms.insertBack1.es.url": "${OPENSEARCH_URL}", "transforms.insertBack1.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}", @@ -48,15 +59,18 @@ "transforms.insertBack1.es.password": "${OPENSEARCH_PASSWORD}", "transforms.insertBack2.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewFieldInsertBack${dollar}Value", - "transforms.insertBack2.id.expr": ".partner_id", - "transforms.insertBack2.condition": ".id_type_name == \"NATIONAL ID TOKEN\"", - "transforms.insertBack2.value": "{reg_id_NATIONAL_ID_TOKEN: .value}", + "transforms.insertBack2.id.expr": ".payload.after.partner_id", + "transforms.insertBack2.condition": "(.payload.after.partner_id != null) and (.after_id_type_name != null) and (.after_id_type_name != \"\")", + "transforms.insertBack2.value": "{(\"reg_id_\" + (.after_id_type_name | gsub(\"\\\\s+\"; \"_\"))): .payload.after.value}", "transforms.insertBack2.es.index": "${DB_PREFIX_INDEX}.public.res_partner", "transforms.insertBack2.es.url": "${OPENSEARCH_URL}", "transforms.insertBack2.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}", "transforms.insertBack2.es.username": "${OPENSEARCH_USERNAME}", "transforms.insertBack2.es.password": "${OPENSEARCH_PASSWORD}", + "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms,id_type_name: .after_id_type_name} else null end)", + "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", diff --git a/scripts/social-registry/opensearch-connectors/16.g2p_reg_id_history.json b/scripts/social-registry/opensearch-connectors/15.reg_id_history.json similarity index 90% rename from scripts/social-registry/opensearch-connectors/16.g2p_reg_id_history.json rename to scripts/social-registry/opensearch-connectors/15.reg_id_history.json index fbb1356..94d3e67 100644 --- a/scripts/social-registry/opensearch-connectors/16.g2p_reg_id_history.json +++ b/scripts/social-registry/opensearch-connectors/15.reg_id_history.json @@ -1,5 +1,5 @@ { - "name": "g2p_reg_id_history_${DB_PREFIX_INDEX}", + "name": "reg_id_history_${DB_PREFIX_INDEX}", "config": { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "connection.url": "${OPENSEARCH_URL}", @@ -18,24 +18,13 @@ "behavior.on.malformed.documents": "warn", "behavior.on.version.conflict": "warn", - "transforms": "keyExtId,valExt,join01,tsconvert01,tsconvert02,tsconvert03,tsSelect,renameTopic", + "transforms": "keyExtId,valExt,tsconvert01,tsconvert02,tsconvert03,join01,tsSelect,renameTopic", "transforms.keyExtId.type": "org.apache.kafka.connect.transforms.ExtractField${dollar}Key", "transforms.keyExtId.field": "id", "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value", - "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", - - "transforms.join01.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value", - "transforms.join01.input.fields": "id_type", - "transforms.join01.output.fields": "id_type_name", - "transforms.join01.es.index": "${DB_PREFIX_INDEX}.public.g2p_id_type", - "transforms.join01.es.input.fields": "id", - "transforms.join01.es.output.fields": "name", - "transforms.join01.es.url": "${OPENSEARCH_URL}", - "transforms.join01.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}", - "transforms.join01.es.username": "${OPENSEARCH_USERNAME}", - "transforms.join01.es.password": "${OPENSEARCH_PASSWORD}", + "transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)", "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value", "transforms.tsconvert01.field": "source_ts_ms", @@ -48,6 +37,17 @@ "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", + "transforms.join01.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value", + "transforms.join01.input.fields": "id_type", + "transforms.join01.output.fields": "id_type_name", + "transforms.join01.es.index": "${DB_PREFIX_INDEX}.public.g2p_id_type", + "transforms.join01.es.input.fields": "id", + "transforms.join01.es.output.fields": "name", + "transforms.join01.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}", + "transforms.join01.es.url": "${OPENSEARCH_URL}", + "transforms.join01.es.username": "${OPENSEARCH_USERNAME}", + "transforms.join01.es.password": "${OPENSEARCH_PASSWORD}", + "transforms.tsSelect.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampSelector${dollar}Value", "transforms.tsSelect.ts.order": "write_date,create_date,source_ts_ms", "transforms.tsSelect.output.field": "@timestamp_gen",