From 8881a390f318a5f83731baa983188c8d822842f3 Mon Sep 17 00:00:00 2001 From: pramod444 Date: Thu, 30 May 2024 18:15:20 +0530 Subject: [PATCH] [G2P-2355] Added helm-chart for debezium and updated all the connectors. --- charts/debezium/.helmignore | 23 ++++++ charts/debezium/Chart.yaml | 6 ++ charts/debezium/README.md | 3 + charts/debezium/templates/_helpers.tpl | 13 ++++ charts/debezium/templates/deployment.yaml | 67 +++++++++++++++++ charts/debezium/values.yaml | 21 ++++++ .../es-connectors/10.g2p_program.api | 24 +++--- .../es-connectors/20.res_partner.api | 26 +++---- .../30.g2p_program_membership.api | 58 +++++++-------- .../31.g2p_program_registrant_info.api | 42 +++++------ .../32.g2p_program_assessment.api | 52 ++++++------- .../40.g2p_program_membership_deduplicate.api | 22 +++--- .../es-connectors/50.g2p_program_fund.api | 24 +++--- .../es-connectors/60.g2p_entitlement.api | 74 +++++++++---------- .../es-connectors/70.g2p_payment_batch.api | 24 +++--- .../es-connectors/80.g2p_payment.api | 26 +++---- 16 files changed, 319 insertions(+), 186 deletions(-) create mode 100644 charts/debezium/.helmignore create mode 100644 charts/debezium/Chart.yaml create mode 100644 charts/debezium/README.md create mode 100644 charts/debezium/templates/_helpers.tpl create mode 100644 charts/debezium/templates/deployment.yaml create mode 100644 charts/debezium/values.yaml diff --git a/charts/debezium/.helmignore b/charts/debezium/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/charts/debezium/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/charts/debezium/Chart.yaml b/charts/debezium/Chart.yaml new file mode 100644 index 0000000..87b890c --- /dev/null +++ b/charts/debezium/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: debezium +description: Debezium Helm chart +type: application +version: 0.0.0-develop +appVersion: 1.7.1.Final diff --git a/charts/debezium/README.md b/charts/debezium/README.md new file mode 100644 index 0000000..a0d12d8 --- /dev/null +++ b/charts/debezium/README.md @@ -0,0 +1,3 @@ +# opensearch-debezium Helm Chart + +This chart bootstraps a deployment of a opensearch-debezium diff --git a/charts/debezium/templates/_helpers.tpl b/charts/debezium/templates/_helpers.tpl new file mode 100644 index 0000000..eb1a670 --- /dev/null +++ b/charts/debezium/templates/_helpers.tpl @@ -0,0 +1,13 @@ +{{- define "debezium.name" -}} +{{- .Release.Name | trimPrefix "debezium-" -}} +{{- end }} + +{{- define "debezium.selectorLabels" -}} +app: {{ .Chart.Name }} +connector: {{ include "debezium.name" . }} +{{- end }} + +{{- define "debezium.labels" -}} +{{ include "debezium.selectorLabels" . }} +managed-by: {{ .Release.Service | lower }} +{{- end }} diff --git a/charts/debezium/templates/deployment.yaml b/charts/debezium/templates/deployment.yaml new file mode 100644 index 0000000..a2f02f6 --- /dev/null +++ b/charts/debezium/templates/deployment.yaml @@ -0,0 +1,67 @@ +{{ if .Values.debezium.enabled }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: debezium-deploy + labels: + app: debezium +spec: + replicas: {{ .Values.debezium.replicaCount }} + selector: + matchLabels: + app: debezium + template: + metadata: + labels: + app: debezium + spec: + containers: + - name: debezium-container + image: {{ .Values.debezium.image }} + imagePullPolicy: {{ .Values.debezium.imagePullPolicy }} + env: + - name: BOOTSTRAP_SERVERS + value: {{ .Values.debezium.kafka.server }} + - name: GROUP_ID + value: {{ .Values.debezium.kafka.groupId | quote }} + - name: OFFSET_STORAGE_TOPIC + value: {{ .Values.debezium.kafka.topics.offset }} + - name: CONFIG_STORAGE_TOPIC + value: {{ .Values.debezium.kafka.topics.config }} + - name: STATUS_STORAGE_TOPIC + value: {{ .Values.debezium.kafka.topics.status }} + ports: + - containerPort: {{ .Values.debezium.port }} + - name: debezium-fixer + image: {{ .Values.debezium.fixer_container.image }} + imagePullPolicy: {{ .Values.debezium.fixer_container.imagePullPolicy }} + env: + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: STARTUP_DELAY + value: {{ .Values.debezium.fixer_container.startup_delay }} + - name: PROBE_TIME_PERIOD + value: {{ .Values.debezium.fixer_container.probe_time_period }} + - name: ERROR_LEVEL1 + value: {{ .Values.debezium.fixer_container.error_level1 | quote }} + - name: ERROR_LEVEL2 + value: {{ .Values.debezium.fixer_container.error_level2 | quote }} +--- +apiVersion: v1 +kind: Service +metadata: + name: debezium-service + labels: + app: debezium-service +spec: + type: ClusterIP + ports: + - name: debezium + protocol: TCP + port: {{ .Values.debezium.port }} + selector: + app: debezium + + {{ end }} diff --git a/charts/debezium/values.yaml b/charts/debezium/values.yaml new file mode 100644 index 0000000..2937ca8 --- /dev/null +++ b/charts/debezium/values.yaml @@ -0,0 +1,21 @@ +debezium: + enabled: true + replicaCount: 1 + image: debezium/connect:1.7.0.Final + imagePullPolicy: Always + port: 8083 + fixer_container: + image: mosipid/debezium-fixer:1.2.0.1-B2 + imagePullPolicy: Always + startup_delay: "30s" + probe_time_period: "30s" + error_level1: "failed" + error_level2: "PSQLException: An I/O error occurred while sending to the backend" + kafka: + server: "kafka:9092" + groupId: "1" + topics: + offset: debez-connect-offsets + config: debez-connect-configs + status: debez-connect-status + diff --git a/scripts/kafka-connect/es-connectors/10.g2p_program.api b/scripts/kafka-connect/es-connectors/10.g2p_program.api index 1a5a69e..d861373 100644 --- a/scripts/kafka-connect/es-connectors/10.g2p_program.api +++ b/scripts/kafka-connect/es-connectors/10.g2p_program.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_program_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,23 +44,23 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert04.field": "date_ended", "transforms.tsconvert04.input.type": "days_epoch", "transforms.tsconvert04.output.format": "yyyy-MM-dd", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" diff --git a/scripts/kafka-connect/es-connectors/20.res_partner.api b/scripts/kafka-connect/es-connectors/20.res_partner.api index 06c2e0f..e5df078 100644 --- a/scripts/kafka-connect/es-connectors/20.res_partner.api +++ b/scripts/kafka-connect/es-connectors/20.res_partner.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="res_partner_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.res_partner" +OS_INDICES="$DB_PREFIX_INDEX.public.res_partner" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,28 +44,28 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert04.field": "birthdate", "transforms.tsconvert04.input.type": "days_epoch", "transforms.tsconvert04.output.format": "yyyy-MM-dd", - "transforms.tsconvert05.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert05.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert05.field": "registration_date", "transforms.tsconvert05.input.type": "days_epoch", "transforms.tsconvert05.output.format": "yyyy-MM-dd", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" diff --git a/scripts/kafka-connect/es-connectors/30.g2p_program_membership.api b/scripts/kafka-connect/es-connectors/30.g2p_program_membership.api index 7b7b6e4..ce33b64 100644 --- a/scripts/kafka-connect/es-connectors/30.g2p_program_membership.api +++ b/scripts/kafka-connect/es-connectors/30.g2p_program_membership.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_program_membership_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program_membership" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program_membership" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,51 +44,51 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert04.field": "enrollment_date", "transforms.tsconvert04.input.type": "micro_sec", - "transforms.join01.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join01.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program\"', - "transforms.join01.es.url": '\"$ES_URL\"', - "transforms.join01.es.input.query.add.keyword": "false", + "transforms.join01.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join01.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program\"', + "transforms.join01.os.url": '\"$OS_URL\"', + "transforms.join01.os.input.query.add.keyword": "false", "transforms.join01.input.fields": "program_id", "transforms.join01.input.default.values": "null", - "transforms.join01.es.input.fields": "id", - "transforms.join01.es.output.field": "name", + "transforms.join01.os.input.fields": "id", + "transforms.join01.os.output.field": "name", "transforms.join01.output.field": "program_name", - "transforms.join02.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join02.es.index": '\"$DB_PREFIX_INDEX.public.res_partner\"', - "transforms.join02.es.url": '\"$ES_URL\"', - "transforms.join02.es.input.query.add.keyword": "false", + "transforms.join02.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join02.os.index": '\"$DB_PREFIX_INDEX.public.res_partner\"', + "transforms.join02.os.url": '\"$OS_URL\"', + "transforms.join02.os.input.query.add.keyword": "false", "transforms.join02.input.fields": "partner_id", "transforms.join02.input.default.values": "null", - "transforms.join02.es.input.fields": "id", - "transforms.join02.es.output.field": "create_date", + "transforms.join02.os.input.fields": "id", + "transforms.join02.os.output.field": "create_date", "transforms.join02.output.field": "registrant_create_datetime", - "transforms.join03.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join03.es.index": '\"$DB_PREFIX_INDEX.public.res_partner\"', - "transforms.join03.es.url": '\"$ES_URL\"', + "transforms.join03.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join03.os.index": '\"$DB_PREFIX_INDEX.public.res_partner\"', + "transforms.join03.os.url": '\"$OS_URL\"', "transforms.join03.input.fields": "partner_id", "transforms.join03.input.default.values": "null", - "transforms.join03.es.input.fields": "id", - "transforms.join03.es.output.field": "gender", + "transforms.join03.os.input.fields": "id", + "transforms.join03.os.output.field": "gender", "transforms.join03.output.field": "gender", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" } diff --git a/scripts/kafka-connect/es-connectors/31.g2p_program_registrant_info.api b/scripts/kafka-connect/es-connectors/31.g2p_program_registrant_info.api index 8617928..76c8d8f 100644 --- a/scripts/kafka-connect/es-connectors/31.g2p_program_registrant_info.api +++ b/scripts/kafka-connect/es-connectors/31.g2p_program_registrant_info.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_program_registrant_info_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program_registrant_info" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program_registrant_info" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,36 +44,36 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.join01.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join01.es.index": '\"$DB_PREFIX_INDEX.public.res_partner\"', - "transforms.join01.es.url": '\"$ES_URL\"', + "transforms.join01.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join01.os.index": '\"$DB_PREFIX_INDEX.public.res_partner\"', + "transforms.join01.os.url": '\"$OS_URL\"', "transforms.join01.input.fields": "registrant_id", "transforms.join01.input.default.values": "null", - "transforms.join01.es.input.fields": "id", - "transforms.join01.es.output.field": "create_date", + "transforms.join01.os.input.fields": "id", + "transforms.join01.os.output.field": "create_date", "transforms.join01.output.field": "registrant_create_datetime", - "transforms.join02.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join02.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_membership\"', - "transforms.join02.es.url": '\"$ES_URL\"', + "transforms.join02.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join02.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_membership\"', + "transforms.join02.os.url": '\"$OS_URL\"', "transforms.join02.input.fields": "program_membership_id", "transforms.join02.input.default.values": "null", - "transforms.join02.es.input.fields": "id", - "transforms.join02.es.output.field": "create_date", + "transforms.join02.os.input.fields": "id", + "transforms.join02.os.output.field": "create_date", "transforms.join02.output.field": "membership_create_datetime", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" } diff --git a/scripts/kafka-connect/es-connectors/32.g2p_program_assessment.api b/scripts/kafka-connect/es-connectors/32.g2p_program_assessment.api index 216d6de..017e66c 100644 --- a/scripts/kafka-connect/es-connectors/32.g2p_program_assessment.api +++ b/scripts/kafka-connect/es-connectors/32.g2p_program_assessment.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_program_assessment_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program_assessment" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program_assessment" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,45 +44,45 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.join01.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join01.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_registrant_info\"', - "transforms.join01.es.url": '\"$ES_URL\"', + "transforms.join01.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join01.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_registrant_info\"', + "transforms.join01.os.url": '\"$OS_URL\"', "transforms.join01.input.fields": "program_membership_id", "transforms.join01.input.default.values": "null", - "transforms.join01.es.input.fields": "program_membership_id", - "transforms.join01.es.output.field": "registrant_create_datetime", + "transforms.join01.os.input.fields": "program_membership_id", + "transforms.join01.os.output.field": "registrant_create_datetime", "transforms.join01.output.field": "registrant_create_datetime", - "transforms.join02.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join02.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_registrant_info\"', - "transforms.join02.es.url": '\"$ES_URL\"', + "transforms.join02.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join02.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_registrant_info\"', + "transforms.join02.os.url": '\"$OS_URL\"', "transforms.join02.input.fields": "program_membership_id", "transforms.join02.input.default.values": "null", - "transforms.join02.es.input.fields": "program_membership_id", - "transforms.join02.es.output.field": "membership_create_datetime", + "transforms.join02.os.input.fields": "program_membership_id", + "transforms.join02.os.output.field": "membership_create_datetime", "transforms.join02.output.field": "membership_create_datetime", - "transforms.join03.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join03.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_registrant_info\"', - "transforms.join03.es.url": '\"$ES_URL\"', + "transforms.join03.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join03.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_registrant_info\"', + "transforms.join03.os.url": '\"$OS_URL\"', "transforms.join03.input.fields": "program_membership_id", "transforms.join03.input.default.values": "null", - "transforms.join03.es.input.fields": "program_membership_id", - "transforms.join03.es.output.field": "create_datetime", + "transforms.join03.os.input.fields": "program_membership_id", + "transforms.join03.os.output.field": "create_datetime", "transforms.join03.output.field": "application_create_datetime", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" } diff --git a/scripts/kafka-connect/es-connectors/40.g2p_program_membership_deduplicate.api b/scripts/kafka-connect/es-connectors/40.g2p_program_membership_deduplicate.api index 758f30e..8e03c8a 100644 --- a/scripts/kafka-connect/es-connectors/40.g2p_program_membership_deduplicate.api +++ b/scripts/kafka-connect/es-connectors/40.g2p_program_membership_deduplicate.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_program_membership_duplicate_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program_membership_duplicate" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program_membership_duplicate" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,18 +44,18 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" } diff --git a/scripts/kafka-connect/es-connectors/50.g2p_program_fund.api b/scripts/kafka-connect/es-connectors/50.g2p_program_fund.api index 712538c..23c8507 100644 --- a/scripts/kafka-connect/es-connectors/50.g2p_program_fund.api +++ b/scripts/kafka-connect/es-connectors/50.g2p_program_fund.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_program_fund_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program_fund" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program_fund" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,23 +44,23 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert04.field": "date_posted", "transforms.tsconvert04.input.type": "days_epoch", "transforms.tsconvert04.output.format": "yyyy-MM-dd", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" } diff --git a/scripts/kafka-connect/es-connectors/60.g2p_entitlement.api b/scripts/kafka-connect/es-connectors/60.g2p_entitlement.api index 8341301..e78b006 100644 --- a/scripts/kafka-connect/es-connectors/60.g2p_entitlement.api +++ b/scripts/kafka-connect/es-connectors/60.g2p_entitlement.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_entitlement_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_entitlement" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_entitlement" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,68 +44,68 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert04.field": "date_approved", "transforms.tsconvert04.input.type": "days_epoch", "transforms.tsconvert04.output.format": "yyyy-MM-dd", - "transforms.join01.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join01.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_membership\"', - "transforms.join01.es.url": '\"$ES_URL\"', + "transforms.join01.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join01.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_membership\"', + "transforms.join01.os.url": '\"$OS_URL\"', "transforms.join01.input.fields": "partner_id,program_id", "transforms.join01.input.default.values": "null,null", - "transforms.join01.es.input.fields": "partner_id,program_id", - "transforms.join01.es.output.field": "id", + "transforms.join01.os.input.fields": "partner_id,program_id", + "transforms.join01.os.output.field": "id", "transforms.join01.output.field": "program_membership_id", - "transforms.join02.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join02.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', - "transforms.join02.es.url": '\"$ES_URL\"', + "transforms.join02.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join02.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', + "transforms.join02.os.url": '\"$OS_URL\"', "transforms.join02.input.fields": "program_membership_id", "transforms.join02.input.default.values": "null", - "transforms.join02.es.input.fields": "program_membership_id", - "transforms.join02.es.output.field": "registrant_create_datetime", + "transforms.join02.os.input.fields": "program_membership_id", + "transforms.join02.os.output.field": "registrant_create_datetime", "transforms.join02.output.field": "registrant_create_datetime", - "transforms.join03.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join03.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', - "transforms.join03.es.url": '\"$ES_URL\"', + "transforms.join03.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join03.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', + "transforms.join03.os.url": '\"$OS_URL\"', "transforms.join03.input.fields": "program_membership_id", "transforms.join03.input.default.values": "null", - "transforms.join03.es.input.fields": "program_membership_id", - "transforms.join03.es.output.field": "membership_create_datetime", + "transforms.join03.os.input.fields": "program_membership_id", + "transforms.join03.os.output.field": "membership_create_datetime", "transforms.join03.output.field": "membership_create_datetime", - "transforms.join04.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join04.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', - "transforms.join04.es.url": '\"$ES_URL\"', + "transforms.join04.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join04.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', + "transforms.join04.os.url": '\"$OS_URL\"', "transforms.join04.input.fields": "program_membership_id", "transforms.join04.input.default.values": "null", - "transforms.join04.es.input.fields": "program_membership_id", - "transforms.join04.es.output.field": "application_create_datetime", + "transforms.join04.os.input.fields": "program_membership_id", + "transforms.join04.os.output.field": "application_create_datetime", "transforms.join04.output.field": "application_create_datetime", - "transforms.join05.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value", - "transforms.join05.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', - "transforms.join05.es.url": '\"$ES_URL\"', + "transforms.join05.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value", + "transforms.join05.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program_assessment\"', + "transforms.join05.os.url": '\"$OS_URL\"', "transforms.join05.input.fields": "program_membership_id", "transforms.join05.input.default.values": "null", - "transforms.join05.es.input.fields": "program_membership_id", - "transforms.join05.es.output.field": "create_datetime", + "transforms.join05.os.input.fields": "program_membership_id", + "transforms.join05.os.output.field": "create_datetime", "transforms.join05.output.field": "assessment_create_datetime", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" } diff --git a/scripts/kafka-connect/es-connectors/70.g2p_payment_batch.api b/scripts/kafka-connect/es-connectors/70.g2p_payment_batch.api index 97b82b1..90e3356 100644 --- a/scripts/kafka-connect/es-connectors/70.g2p_payment_batch.api +++ b/scripts/kafka-connect/es-connectors/70.g2p_payment_batch.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_payment_batch_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_payment_batch" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_payment_batch" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,22 +44,22 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert04.field": "stats_datetime", "transforms.tsconvert04.input.type": "micro_sec", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" } diff --git a/scripts/kafka-connect/es-connectors/80.g2p_payment.api b/scripts/kafka-connect/es-connectors/80.g2p_payment.api index 8e0f4c9..c1556a6 100644 --- a/scripts/kafka-connect/es-connectors/80.g2p_payment.api +++ b/scripts/kafka-connect/es-connectors/80.g2p_payment.api @@ -4,26 +4,26 @@ #DB_HOSTNAME= #DB_PASS= #DB_PREFIX_INDEX= -#ES_URL= +#OS_URL= -ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name +OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name CONN_NAME="g2p_payment_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table -ES_INDICES="$DB_PREFIX_INDEX.public.g2p_payment" +OS_INDICES="$DB_PREFIX_INDEX.public.g2p_payment" curl \ -X POST \ - http://$ES_CONN_URL/connectors \ + http://$OS_CONN_URL/connectors \ -H 'Content-Type: application/json' \ -d \ '{ "name": '\"$CONN_NAME\"', "config": { - "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "name": '\"$CONN_NAME\"', - "connection.url": '\"$ES_URL\"', + "connection.url": '\"$OS_URL\"', "tasks.max": "1", - "topics": '\"$ES_INDICES\"', + "topics": '\"$OS_INDICES\"', "key.ignore": "false", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", @@ -44,26 +44,26 @@ curl \ "transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn", "transforms.debezExtract.add.fields.prefix": "source_", - "transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert01.field": "source_ts_ms", - "transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert02.field": "create_date", "transforms.tsconvert02.input.type": "micro_sec", - "transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert03.field": "write_date", "transforms.tsconvert03.input.type": "micro_sec", - "transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert04.field": "issuance_date", "transforms.tsconvert04.input.type": "micro_sec", - "transforms.tsconvert05.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value", + "transforms.tsconvert05.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value", "transforms.tsconvert05.field": "payment_date", "transforms.tsconvert05.input.type": "micro_sec", - "transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value", + "transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value", "transforms.tsSelect.ts.order": "write_date,create_date", "transforms.tsSelect.output.field": "@timestamp_gen" }