Skip to content

Commit

Permalink
Merge pull request #47 from lalithkota/develop
Browse files Browse the repository at this point in the history
OS Connector: Index Write method updated. Debezium Connector: Tombsto…
  • Loading branch information
lalithkota authored Nov 2, 2024
2 parents b1d8dda + a1db072 commit b88bea8
Show file tree
Hide file tree
Showing 21 changed files with 81 additions and 52 deletions.
3 changes: 2 additions & 1 deletion scripts/pbms/debezium-connectors/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"topic.prefix": "${DB_PREFIX_INDEX}",
"table.include.list": "public.res_partner,public.g2p_program,public.g2p_program_fund,public.g2p_program_membership,public.g2p_program_membership_duplicate,public.g2p_program_registrant_info,public.g2p_program_assessment,public.g2p_entitlement,public.g2p_payment_batch,public.g2p_payment",
"heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}",
"decimal.handling.mode": "double"
"decimal.handling.mode": "double",
"tombstones.on.delete": false
},
"wait_after_init_secs": 20
}
3 changes: 2 additions & 1 deletion scripts/pbms/opensearch-connectors/10.g2p_program.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
3 changes: 2 additions & 1 deletion scripts/pbms/opensearch-connectors/20.res_partner.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.res_partner",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_membership",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_registrant_info",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_assessment",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_membership_duplicate",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
3 changes: 2 additions & 1 deletion scripts/pbms/opensearch-connectors/50.g2p_program_fund.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_fund",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
3 changes: 2 additions & 1 deletion scripts/pbms/opensearch-connectors/60.g2p_entitlement.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_entitlement",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
3 changes: 2 additions & 1 deletion scripts/pbms/opensearch-connectors/70.g2p_payment_batch.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_payment_batch",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
3 changes: 2 additions & 1 deletion scripts/pbms/opensearch-connectors/80.g2p_payment.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_payment",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
5 changes: 3 additions & 2 deletions scripts/social-registry/debezium-connectors/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
"database.password": "${DB_PASS}",
"database.dbname": "${DB_NAME}",
"topic.prefix": "${DB_PREFIX_INDEX}",
"table.include.list": "public.res_partner,public.g2p_id_type,public.g2p_reg_id",
"table.include.list": "public.g2p_region,public.g2p_id_type,public.res_partner,public.g2p_reg_id",
"heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}",
"decimal.handling.mode": "double"
"decimal.handling.mode": "double",
"tombstones.on.delete": false
},
"wait_after_init_secs": 20
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"name": "g2p_id_type_${DB_PREFIX_INDEX}",
"name": "id_type_${DB_PREFIX_INDEX}",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"connection.url": "${OPENSEARCH_URL}",
"connection.username": "${OPENSEARCH_USERNAME}",
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_id_type",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand All @@ -41,5 +42,5 @@
"transforms.tsSelect.ts.order": "write_date,create_date,source_ts_ms",
"transforms.tsSelect.output.field": "@timestamp_gen"
},
"wait_after_init_secs": 15
"wait_after_init_secs": 20
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "g2p_id_type_history_${DB_PREFIX_INDEX}",
"name": "id_type_history_${DB_PREFIX_INDEX}",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"connection.url": "${OPENSEARCH_URL}",
Expand All @@ -24,7 +24,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.res_partner",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -24,7 +25,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand All @@ -50,5 +51,6 @@
"transforms.tsSelect.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampSelector${dollar}Value",
"transforms.tsSelect.ts.order": "write_date,create_date,source_ts_ms",
"transforms.tsSelect.output.field": "@timestamp_gen"
}
},
"wait_after_init_secs": 60
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.renameTopic.type": "org.openg2p.reporting.kafka.connect.transforms.RenameTopic",
"transforms.renameTopic.topic": "res_partner"
Expand Down
Loading

0 comments on commit b88bea8

Please sign in to comment.