Skip to content

Commit

Permalink
Merge pull request #11 from lalithkota/develop-G2P-2355
Browse files Browse the repository at this point in the history
Fixed Tombstone Records. OpenSearch Authentication and issues with connectors. Fixed Kafka Roles.
  • Loading branch information
lalithkota authored Jun 21, 2024
2 parents 369bf0a + 47fe175 commit 2c876e7
Show file tree
Hide file tree
Showing 21 changed files with 204 additions and 107 deletions.
5 changes: 3 additions & 2 deletions charts/debezium/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ envVars:
BOOTSTRAP_SERVERS: '{{ tpl .Values.kafkaBootstrapServers $ }}'
GROUP_ID: 1

HOST_NAME: "0.0.0.0"
PORT: '{{ .Values.containerPort }}'
# HOST_NAME: "0.0.0.0"
# PORT: '{{ .Values.containerPort }}'
LISTENERS: 'HTTP://:{{ .Values.containerPort }}'

# TODO: Evaluate the ADVERTISED options
ADVERTISED_HOST_NAME: '{{ include "common.names.fullname" . }}'
Expand Down
13 changes: 13 additions & 0 deletions charts/opensearch-kafka-connector/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "common.names.fullname" . }}-properties
labels: {{ include "common.labels.standard" (dict "customLabels" .Values.commonLabels "context" $) | nindent 4 }}
{{- if .Values.commonAnnotations }}
annotations: {{ include "common.tplvalues.render" (dict "value" .Values.commonAnnotations "context" $) | nindent 4 }}
{{- end }}
data:
connect-distributed.properties: |-
{{- range $k, $v := .Values.propertiesOverride }}
{{ $k }}={{ include "common.tplvalues.render" (dict "value" $v "context" $) }}
{{- end }}
20 changes: 14 additions & 6 deletions charts/opensearch-kafka-connector/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,20 @@ spec:
{{- if .Values.readinessProbe.enabled }}
readinessProbe: {{- include "common.tplvalues.render" (dict "value" (omit .Values.readinessProbe "enabled") "context" $) | nindent 12 }}
{{- end }}
{{- if .Values.extraVolumeMounts }}
volumeMounts: {{- include "common.tplvalues.render" (dict "value" .Values.extraVolumeMounts "context" $) | nindent 12 }}
{{- end }}
volumeMounts:
- name: connect-properties
mountPath: /opt/kafka/config/connect-distributed.properties
subPath: connect-distributed.properties
{{- if .Values.extraVolumeMounts }}
{{- include "common.tplvalues.render" (dict "value" .Values.extraVolumeMounts "context" $) | nindent 12 }}
{{- end }}
{{- if .Values.sidecars }}
{{- include "common.tplvalues.render" ( dict "value" .Values.sidecars "context" $) | nindent 8 }}
{{- end }}
{{- if .Values.extraVolumes }}
volumes: {{- include "common.tplvalues.render" (dict "value" .Values.extraVolumes "context" $) | nindent 8 }}
{{- end }}
volumes:
- name: connect-properties
configMap:
name: {{ include "common.names.fullname" . }}-properties
{{- if .Values.extraVolumes }}
{{- include "common.tplvalues.render" (dict "value" .Values.extraVolumes "context" $) | nindent 8 }}
{{- end }}
40 changes: 29 additions & 11 deletions charts/opensearch-kafka-connector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,36 @@ kafkaBootstrapServers: '{{ .Release.Name }}-kafka:9092'
## Metrics based variables, like JMX settings, can be set from "metrics.envVars".
envVars:
KAFKA_HEAP_OPTS: '{{ tpl .Values.heapOpts $ }}'
CONNECT_BOOTSTRAP_SERVERS: '{{ tpl .Values.kafkaBootstrapServers $ }}'
CONNECT_GROUP_ID: 1

CONNECT_HOST_NAME: '0.0.0.0'
CONNECT_REST_PORT: '{{ .Values.containerPort }}'
envVarsFrom: {}

# TODO: Evaluate the ADVERTISED options
CONNECT_REST_ADVERTISED_HOST_NAME: '{{ include "common.names.fullname" . }}'
CONNECT_REST_ADVERTISED_PORT: '{{ .Values.service.port }}'
propertiesOverride:
bootstrap.servers: '{{ tpl .Values.kafkaBootstrapServers $ }}'
group.id: 2

CONNECT_CONFIG_STORAGE_TOPIC: '{{ include "common.names.name" . }}-configs'
CONNECT_OFFSET_STORAGE_TOPIC: '{{ include "common.names.name" . }}-offsets'
CONNECT_STATUS_STORAGE_TOPIC: '{{ include "common.names.name" . }}-status'
# host.name: '0.0.0.0'
# rest.port: '{{ .Values.containerPort }}'
listeners: 'HTTP://:{{ .Values.containerPort }}'

envVarsFrom: {}
# TODO: Evaluate the ADVERTISED options
rest.advertised.host.name: '{{ include "common.names.fullname" . }}'
rest.advertised.port: '{{ .Values.service.port }}'
#rest.advertised.listener:

config.storage.topic: '{{ include "common.names.name" . }}-configs'
config.storage.replication.factor: 1
offset.storage.topic: '{{ include "common.names.name" . }}-offsets'
offset.storage.replication.factor: 1
#offset.storage.partitions: 25
status.storage.topic: '{{ include "common.names.name" . }}-status'
status.storage.replication.factor: 1
#status.storage.partitions: 5

key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true

offset.flush.interval.ms: 10000

plugin.path: /opt/kafka/connectors
54 changes: 37 additions & 17 deletions charts/reporting-init/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ podLabels: {}
## ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
##
podAnnotations:
sidecar.istio.io/inject: False
sidecar.istio.io/inject: "false"

## Add init containers to the pods.
## Example:
Expand Down Expand Up @@ -142,7 +142,7 @@ gitConfig:
key: subDir

envVars:
GIT_CONFIG_ENABLED: '{{ .Values.gitConfig.enabled | quote }}'
GIT_CONFIG_ENABLED: '{{ .Values.gitConfig.enabled }}'

DEBEZIUM_URL: '{{ tpl .Values.global.reportingDebeziumInstallationName $ }}'
OS_KAFKA_CONNECTOR_URL: '{{ tpl .Values.global.reportingOsKafkaConnectorInstallationName $ }}'
Expand All @@ -151,33 +151,43 @@ envVars:
DB_PORT: 5432
DB_USER: postgres
DB_NAME: '' # To be filled
DB_PREFIX_INDEX: '{{ .Release.Namespace }}'
DB_PREFIX_INDEX: '{{ .Release.Namespace | replace "-" "_" }}_{{ .Release.Name | replace "-" "_" }}' # To be modified according to app
DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS: 5000

OPENSEARCH_URL: 'https://{{ tpl .Values.global.reportingOpensearchInstallationName $ }}:9200'

OPENSEARCH_SECURITY_ENABLED: true
OPENSEARCH_USERNAME: 'logstash'
OPENSEARCH_USERNAME: admin

dollar: '$'

envVarsFrom:
DB_PASS:
secretKeyRef:
name: '{{ tpl .Values.global.reportingPostgresqlInstallationName $ }}'
key: postgres-passsword
key: postgres-password
OPENSEARCH_PASSWORD:
secretKeyRef:
name: '{{ tpl .Values.global.reportingOpensearchInstallationName $ }}'
key: 'logstash-password'
key: opensearch-password

startUpCommand: |-
#!/usr/bin/env bash
set -o pipefail
set -e
until pg_isready -h ${DB_HOSTNAME} -p ${DB_PORT}; do sleep 3; done && echo "Connection with PostgreSQL successful."
curl -k -I -s -o /dev/null -m 10 --retry 100 --retry-delay 10 --retry-all-errors "${OPENSEARCH_URL}" && echo "Connection with OpenSearch successful."
curl -k -I -s -o /dev/null -m 10 --retry 100 --retry-delay 10 --retry-all-errors "${DEBEZIUM_URL}" && echo "Connection with Debezium successful"
curl -k -I -s -o /dev/null -m 10 --retry 100 --retry-delay 10 --retry-all-errors "${OS_KAFKA_CONNECTOR_URL}" && echo "Connection with OpenSearch Kafka Connector successful"
# TODO: wait for kafka (?)
if [ "$GIT_CONFIG_ENABLED" = "true" ]; then
cd /tmp
git clone -d "$GIT_CONFIG_BRANCH" --depth 1 "$GIT_CONFIG_REPO_URL" config
git clone -b "$GIT_CONFIG_BRANCH" --depth 1 "$GIT_CONFIG_REPO_URL" config
config_dir=config/${GIT_CONFIG_SUB_DIR}
sudo mv $config_dir /reporting-init/config
mv $config_dir /reporting-init/config
fi
cd /reporting-init/config
Expand All @@ -188,8 +198,9 @@ startUpCommand: |-
for conn in $arr ; do if [ "$conn" = "$value_to_check" ]; then echo "yes"; fi ; done
}
echo "==> DEBEZIUM CONNECTORS"
echo "====> DEBEZIUM CONNECTORS"
if [ -d debezium-connectors ]; then
echo "==> Starting Debezium Connector Initialization"
debezium_existing_connectors_list=$(curl -s $DEBEZIUM_URL/connectors | jq -cr '.[]')
debezium_new_connectors_list=()
for debezium_conn in debezium-connectors/* ; do
Expand All @@ -199,22 +210,27 @@ startUpCommand: |-
if_exists=$(contains $debezium_existing_connectors_list $debezium_connector_name)
if [ -z "$if_exists" ]; then
curl -s -XPOST -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors -d "$debezium_connector_config"
echo "==> Creating new Connector - $debezium_connector_name."
curl -s -XPOST -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors -d "$debezium_connector_config" | jq
else
curl -s -XPUT -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors/${debezium_connector_name}/config -d "$(echo $debezium_connector_config | jq -cr '.config')"
echo "==> Connector - $debezium_connector_name - already exists. Updating config."
curl -s -XPUT -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors/${debezium_connector_name}/config -d "$(echo $debezium_connector_config | jq -cr '.config')" | jq
fi
done
echo "==> Starting deletion process for old debezium connectors."
debezium_new_connectors_list=${debezium_new_connectors_list[@]}
for connector_to_delete in $debezium_existing_connectors_list; do
if_exists=$(contains $debezium_new_connectors_list $connector_to_delete)
if [ -z "$if_exists" ]; then
curl -s -XDELETE $DEBEZIUM_URL/connectors/${connector_to_delete}
echo "==> Deleting old connector - $connector_to_delete."
curl -s -XDELETE $DEBEZIUM_URL/connectors/${connector_to_delete} | jq
fi
done
fi
echo "==> OPENSEARCH CONNECTORS"
echo "====> OPENSEARCH CONNECTORS"
if [ -d opensearch-connectors ]; then
echo "==> Starting Opensearch Connector Initialization"
os_existing_connectors_list=$(curl -s $OS_KAFKA_CONNECTOR_URL/connectors | jq -cr '.[]')
os_new_connectors_list=()
for os_conn in opensearch-connectors/* ; do
Expand All @@ -224,19 +240,23 @@ startUpCommand: |-
if_exists=$(contains $os_existing_connectors_list $os_connector_name)
if [ -z "$if_exists" ]; then
curl -s -XPOST -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors -d "$os_connector_config"
echo "==> Creating new Connector - $os_connector_name."
curl -s -XPOST -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors -d "$os_connector_config" | jq
else
curl -s -XPUT -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors/${os_connector_name}/config -d "$(echo $os_connector_config | jq -cr '.config')"
echo "==> Connector - $os_connector_name - already exists. Updating config."
curl -s -XPUT -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors/${os_connector_name}/config -d "$(echo $os_connector_config | jq -cr '.config')" | jq
fi
done
echo "==> Starting deletion process for old opensearch-connectors."
os_new_connectors_list=${os_new_connectors_list[@]}
for connector_to_delete in $os_existing_connectors_list; do
if_exists=$(contains $os_new_connectors_list $connector_to_delete)
if [ -z "$if_exists" ]; then
curl -s -XDELETE $OS_KAFKA_CONNECTOR_URL/connectors/${connector_to_delete}
echo "==> Deleting old connector - $connector_to_delete."
curl -s -XDELETE $OS_KAFKA_CONNECTOR_URL/connectors/${connector_to_delete} | jq
fi
done
fi
echo "==> OPENSEARCH DASHBOARDS"
echo "====> OPENSEARCH DASHBOARDS"
## TODO: Load dashboards also from this script
6 changes: 3 additions & 3 deletions charts/reporting/templates/opensearch/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "common.names.fullname" .Subcharts.opensearch }}-custom-config
labels: {{ include "common.labels.standard" (dict "customLabels" .Values.opensearch.commonLabels "context" $) | nindent 4 }}
{{- if .Values.opensearch.commonAnnotations }}
annotations: {{ include "common.tplvalues.render" (dict "value" .Values.opensearch.commonAnnotations "context" $) | nindent 4 }}
labels: {{ include "common.labels.standard" (dict "customLabels" .Values.commonLabels "context" $) | nindent 4 }}
{{- if .Values.commonAnnotations }}
annotations: {{ include "common.tplvalues.render" (dict "value" .Values.commonAnnotations "context" $) | nindent 4 }}
{{- end }}
data:
{{- if .Values.opensearch.security.enabled }}
Expand Down
6 changes: 5 additions & 1 deletion charts/reporting/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ kafka:

kafkaUi:
enabled: true
# kafka.nameOverride will apply here too.

basePath: ""

kafkaBootstrapServers: '{{ tpl .Values.global.reportingKafkaInstallationName $ }}:9092'
Expand Down Expand Up @@ -203,7 +205,9 @@ kafkaUi:
- name: "admins"
clusters: ["main"]
subjects:
# FILL THIS
- provider: oauth
type: role
value: "Admin"
permissions:
- resource: applicationconfig
actions: all
Expand Down
2 changes: 1 addition & 1 deletion reporting-init-docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ARG container_group_id=1001
ARG container_user=openg2p
ARG container_group=openg2p

RUN apk add bash vim git curl jq yq gettext kubectl helm
RUN apk add bash vim git curl jq yq gettext kubectl helm postgresql-client

RUN addgroup -S ${container_group} -g ${container_group_id} && \
adduser -S ${container_user} -G ${container_group} -u ${container_user_id}
Expand Down
8 changes: 4 additions & 4 deletions scripts/pbms/debezium-connectors/default.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{
"name": "pbms_${DB_PREFIX_INDEX//-/_}",
"name": "${DB_PREFIX_INDEX}_${DB_NAME}",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"slot.name": "dbz_pbms_${DB_NAME}_${DB_PREFIX_INDEX//-/_}",
"publication.name": "dbz_pub_pbms_${DB_NAME}_${DB_PREFIX_INDEX//-/_}",
"slot.name": "dbz_${DB_PREFIX_INDEX}_${DB_NAME}",
"publication.name": "dbz_pub_${DB_PREFIX_INDEX}_${DB_NAME}",
"database.hostname": "${DB_HOSTNAME}",
"database.port": "${DB_PORT}",
"database.user": "${DB_USER}",
"database.password": "${DB_PASS}",
"database.dbname": "${DB_NAME}",
"database.server.name": "${DB_PREFIX_INDEX//-/_}",
"topic.prefix": "${DB_PREFIX_INDEX}",
"table.include.list": "public.res_partner,public.g2p_program,public.g2p_program_fund,public.g2p_program_membership,public.g2p_program_membership_duplicate,public.g2p_program_registrant_info,public.g2p_program_assessment,public.g2p_entitlement,public.g2p_payment_batch,public.g2p_payment",
"column.exclude.list": "",
"heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}",
Expand Down
11 changes: 7 additions & 4 deletions scripts/pbms/opensearch-connectors/10.g2p_program.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{
"name": "pbms_g2p_program_${DB_PREFIX_INDEX//-/_}",
"name": "g2p_program_${DB_PREFIX_INDEX}",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"connection.url": "${OPENSEARCH_URL}",
"connection.username": "${OPENSEARCH_USERNAME}",
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX//-/_}.public.g2p_program",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -13,15 +15,16 @@
"value.converter.schemas.enable": "true",

"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

"transforms": "keyExtId,debezExtract,tsconvert01,tsconvert02,tsconvert03,tsconvert04,tsSelect",

"transforms.keyExtId.type": "org.apache.kafka.connect.transforms.ExtractField${dollar}Key",
"transforms.keyExtId.field": "id",

"transforms.debezExtract.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.debezExtract.delete.handling.mode": "drop",
"transforms.debezExtract.drop.tombstones": "false",
"transforms.debezExtract.delete.tombstone.handling.mode": "tombstone",
"transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn",
"transforms.debezExtract.add.fields.prefix": "source_",

Expand Down
13 changes: 8 additions & 5 deletions scripts/pbms/opensearch-connectors/20.res_partner.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{
"name": "pbms_res_partner_${DB_PREFIX_INDEX//-/_}",
"name": "res_partner_${DB_PREFIX_INDEX}",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"connection.url": "$OPENSEARCH_URL",
"connection.url": "${OPENSEARCH_URL}",
"connection.username": "${OPENSEARCH_USERNAME}",
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX//-/_}.public.res_partner",
"topics": "${DB_PREFIX_INDEX}.public.res_partner",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -13,15 +15,16 @@
"value.converter.schemas.enable": "true",

"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

"transforms": "keyExtId,debezExtract,tsconvert01,tsconvert02,tsconvert03,tsconvert04,tsconvert05,tsSelect",

"transforms.keyExtId.type": "org.apache.kafka.connect.transforms.ExtractField${dollar}Key",
"transforms.keyExtId.field": "id",

"transforms.debezExtract.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.debezExtract.delete.handling.mode": "drop",
"transforms.debezExtract.drop.tombstones": "false",
"transforms.debezExtract.delete.tombstone.handling.mode": "tombstone",
"transforms.debezExtract.add.fields": "source.ts_ms:ts_ms,table,lsn",
"transforms.debezExtract.add.fields.prefix": "source_",

Expand Down
Loading

0 comments on commit 2c876e7

Please sign in to comment.