Skip to content

Commit

Permalink
[G2P-2355] Added helm-chart for debezium and updated all the connectors.
Browse files Browse the repository at this point in the history
  • Loading branch information
pramod444 committed May 30, 2024
1 parent 046bf8b commit 8881a39
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 186 deletions.
23 changes: 23 additions & 0 deletions charts/debezium/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
6 changes: 6 additions & 0 deletions charts/debezium/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: v2
name: debezium
description: Debezium Helm chart
type: application
version: 0.0.0-develop
appVersion: 1.7.1.Final
3 changes: 3 additions & 0 deletions charts/debezium/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# opensearch-debezium Helm Chart

This chart bootstraps a deployment of a opensearch-debezium
13 changes: 13 additions & 0 deletions charts/debezium/templates/_helpers.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{- define "debezium.name" -}}
{{- .Release.Name | trimPrefix "debezium-" -}}
{{- end }}

{{- define "debezium.selectorLabels" -}}
app: {{ .Chart.Name }}
connector: {{ include "debezium.name" . }}
{{- end }}

{{- define "debezium.labels" -}}
{{ include "debezium.selectorLabels" . }}
managed-by: {{ .Release.Service | lower }}
{{- end }}
67 changes: 67 additions & 0 deletions charts/debezium/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{{ if .Values.debezium.enabled }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: debezium-deploy
labels:
app: debezium
spec:
replicas: {{ .Values.debezium.replicaCount }}
selector:
matchLabels:
app: debezium
template:
metadata:
labels:
app: debezium
spec:
containers:
- name: debezium-container
image: {{ .Values.debezium.image }}
imagePullPolicy: {{ .Values.debezium.imagePullPolicy }}
env:
- name: BOOTSTRAP_SERVERS
value: {{ .Values.debezium.kafka.server }}
- name: GROUP_ID
value: {{ .Values.debezium.kafka.groupId | quote }}
- name: OFFSET_STORAGE_TOPIC
value: {{ .Values.debezium.kafka.topics.offset }}
- name: CONFIG_STORAGE_TOPIC
value: {{ .Values.debezium.kafka.topics.config }}
- name: STATUS_STORAGE_TOPIC
value: {{ .Values.debezium.kafka.topics.status }}
ports:
- containerPort: {{ .Values.debezium.port }}
- name: debezium-fixer
image: {{ .Values.debezium.fixer_container.image }}
imagePullPolicy: {{ .Values.debezium.fixer_container.imagePullPolicy }}
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: STARTUP_DELAY
value: {{ .Values.debezium.fixer_container.startup_delay }}
- name: PROBE_TIME_PERIOD
value: {{ .Values.debezium.fixer_container.probe_time_period }}
- name: ERROR_LEVEL1
value: {{ .Values.debezium.fixer_container.error_level1 | quote }}
- name: ERROR_LEVEL2
value: {{ .Values.debezium.fixer_container.error_level2 | quote }}
---
apiVersion: v1
kind: Service
metadata:
name: debezium-service
labels:
app: debezium-service
spec:
type: ClusterIP
ports:
- name: debezium
protocol: TCP
port: {{ .Values.debezium.port }}
selector:
app: debezium

{{ end }}
21 changes: 21 additions & 0 deletions charts/debezium/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
debezium:
enabled: true
replicaCount: 1
image: debezium/connect:1.7.0.Final
imagePullPolicy: Always
port: 8083
fixer_container:
image: mosipid/debezium-fixer:1.2.0.1-B2
imagePullPolicy: Always
startup_delay: "30s"
probe_time_period: "30s"
error_level1: "failed"
error_level2: "PSQLException: An I/O error occurred while sending to the backend"
kafka:
server: "kafka:9092"
groupId: "1"
topics:
offset: debez-connect-offsets
config: debez-connect-configs
status: debez-connect-status

24 changes: 12 additions & 12 deletions scripts/kafka-connect/es-connectors/10.g2p_program.api
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@
#DB_HOSTNAME=
#DB_PASS=
#DB_PREFIX_INDEX=
#ES_URL=
#OS_URL=

ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name
OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name

CONN_NAME="g2p_program_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table
ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program"
OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program"

curl \
-X POST \
http://$ES_CONN_URL/connectors \
http://$OS_CONN_URL/connectors \
-H 'Content-Type: application/json' \
-d \
'{
"name": '\"$CONN_NAME\"',
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"name": '\"$CONN_NAME\"',
"connection.url": '\"$ES_URL\"',
"connection.url": '\"$OS_URL\"',
"tasks.max": "1",
"topics": '\"$ES_INDICES\"',
"topics": '\"$OS_INDICES\"',
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -44,23 +44,23 @@ curl \
"transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn",
"transforms.debezExtract.add.fields.prefix": "source_",

"transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert01.field": "source_ts_ms",

"transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert02.field": "create_date",
"transforms.tsconvert02.input.type": "micro_sec",

"transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert03.field": "write_date",
"transforms.tsconvert03.input.type": "micro_sec",

"transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert04.field": "date_ended",
"transforms.tsconvert04.input.type": "days_epoch",
"transforms.tsconvert04.output.format": "yyyy-MM-dd",

"transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value",
"transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value",
"transforms.tsSelect.ts.order": "write_date,create_date",
"transforms.tsSelect.output.field": "@timestamp_gen"

Expand Down
26 changes: 13 additions & 13 deletions scripts/kafka-connect/es-connectors/20.res_partner.api
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@
#DB_HOSTNAME=
#DB_PASS=
#DB_PREFIX_INDEX=
#ES_URL=
#OS_URL=

ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name
OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name

CONN_NAME="res_partner_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table
ES_INDICES="$DB_PREFIX_INDEX.public.res_partner"
OS_INDICES="$DB_PREFIX_INDEX.public.res_partner"

curl \
-X POST \
http://$ES_CONN_URL/connectors \
http://$OS_CONN_URL/connectors \
-H 'Content-Type: application/json' \
-d \
'{
"name": '\"$CONN_NAME\"',
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"name": '\"$CONN_NAME\"',
"connection.url": '\"$ES_URL\"',
"connection.url": '\"$OS_URL\"',
"tasks.max": "1",
"topics": '\"$ES_INDICES\"',
"topics": '\"$OS_INDICES\"',
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -44,28 +44,28 @@ curl \
"transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn",
"transforms.debezExtract.add.fields.prefix": "source_",

"transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert01.field": "source_ts_ms",

"transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert02.field": "create_date",
"transforms.tsconvert02.input.type": "micro_sec",

"transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert03.field": "write_date",
"transforms.tsconvert03.input.type": "micro_sec",

"transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert04.field": "birthdate",
"transforms.tsconvert04.input.type": "days_epoch",
"transforms.tsconvert04.output.format": "yyyy-MM-dd",

"transforms.tsconvert05.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert05.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert05.field": "registration_date",
"transforms.tsconvert05.input.type": "days_epoch",
"transforms.tsconvert05.output.format": "yyyy-MM-dd",

"transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value",
"transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value",
"transforms.tsSelect.ts.order": "write_date,create_date",
"transforms.tsSelect.output.field": "@timestamp_gen"

Expand Down
58 changes: 29 additions & 29 deletions scripts/kafka-connect/es-connectors/30.g2p_program_membership.api
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@
#DB_HOSTNAME=
#DB_PASS=
#DB_PREFIX_INDEX=
#ES_URL=
#OS_URL=

ES_CONN_URL='es-connect:8083'; # needn't change .. this is the ES-connector service name
OS_CONN_URL='os-connect:8083'; # needn't change .. this is the OS-connector service name

CONN_NAME="g2p_program_membership_$DB_PREFIX_INDEX"; # change this.. give unique name for each db/table
ES_INDICES="$DB_PREFIX_INDEX.public.g2p_program_membership"
OS_INDICES="$DB_PREFIX_INDEX.public.g2p_program_membership"

curl \
-X POST \
http://$ES_CONN_URL/connectors \
http://$OS_CONN_URL/connectors \
-H 'Content-Type: application/json' \
-d \
'{
"name": '\"$CONN_NAME\"',
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"name": '\"$CONN_NAME\"',
"connection.url": '\"$ES_URL\"',
"connection.url": '\"$OS_URL\"',
"tasks.max": "1",
"topics": '\"$ES_INDICES\"',
"topics": '\"$OS_INDICES\"',
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -44,51 +44,51 @@ curl \
"transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn",
"transforms.debezExtract.add.fields.prefix": "source_",

"transforms.tsconvert01.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert01.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert01.field": "source_ts_ms",

"transforms.tsconvert02.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert02.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert02.field": "create_date",
"transforms.tsconvert02.input.type": "micro_sec",

"transforms.tsconvert03.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert03.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert03.field": "write_date",
"transforms.tsconvert03.input.type": "micro_sec",

"transforms.tsconvert04.type": "io.mosip.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert04.type": "io.openg2p.kafka.connect.transforms.TimestampConverterAdv$Value",
"transforms.tsconvert04.field": "enrollment_date",
"transforms.tsconvert04.input.type": "micro_sec",

"transforms.join01.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value",
"transforms.join01.es.index": '\"$DB_PREFIX_INDEX.public.g2p_program\"',
"transforms.join01.es.url": '\"$ES_URL\"',
"transforms.join01.es.input.query.add.keyword": "false",
"transforms.join01.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value",
"transforms.join01.os.index": '\"$DB_PREFIX_INDEX.public.g2p_program\"',
"transforms.join01.os.url": '\"$OS_URL\"',
"transforms.join01.os.input.query.add.keyword": "false",
"transforms.join01.input.fields": "program_id",
"transforms.join01.input.default.values": "null",
"transforms.join01.es.input.fields": "id",
"transforms.join01.es.output.field": "name",
"transforms.join01.os.input.fields": "id",
"transforms.join01.os.output.field": "name",
"transforms.join01.output.field": "program_name",

"transforms.join02.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value",
"transforms.join02.es.index": '\"$DB_PREFIX_INDEX.public.res_partner\"',
"transforms.join02.es.url": '\"$ES_URL\"',
"transforms.join02.es.input.query.add.keyword": "false",
"transforms.join02.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value",
"transforms.join02.os.index": '\"$DB_PREFIX_INDEX.public.res_partner\"',
"transforms.join02.os.url": '\"$OS_URL\"',
"transforms.join02.os.input.query.add.keyword": "false",
"transforms.join02.input.fields": "partner_id",
"transforms.join02.input.default.values": "null",
"transforms.join02.es.input.fields": "id",
"transforms.join02.es.output.field": "create_date",
"transforms.join02.os.input.fields": "id",
"transforms.join02.os.output.field": "create_date",
"transforms.join02.output.field": "registrant_create_datetime",

"transforms.join03.type": "io.mosip.kafka.connect.transforms.DynamicNewField$Value",
"transforms.join03.es.index": '\"$DB_PREFIX_INDEX.public.res_partner\"',
"transforms.join03.es.url": '\"$ES_URL\"',
"transforms.join03.type": "io.openg2p.kafka.connect.transforms.DynamicNewField$Value",
"transforms.join03.os.index": '\"$DB_PREFIX_INDEX.public.res_partner\"',
"transforms.join03.os.url": '\"$OS_URL\"',
"transforms.join03.input.fields": "partner_id",
"transforms.join03.input.default.values": "null",
"transforms.join03.es.input.fields": "id",
"transforms.join03.es.output.field": "gender",
"transforms.join03.os.input.fields": "id",
"transforms.join03.os.output.field": "gender",
"transforms.join03.output.field": "gender",

"transforms.tsSelect.type": "io.mosip.kafka.connect.transforms.TimestampSelector$Value",
"transforms.tsSelect.type": "io.openg2p.kafka.connect.transforms.TimestampSelector$Value",
"transforms.tsSelect.ts.order": "write_date,create_date",
"transforms.tsSelect.output.field": "@timestamp_gen"
}
Expand Down
Loading

0 comments on commit 8881a39

Please sign in to comment.