Skip to content

Commit

Permalink
add basic debezium-server plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
m-dwyer committed Aug 22, 2024
1 parent 9fbd2c3 commit 56ac056
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 0 deletions.
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
dynamodb_local = pkgs.callPackage ./packages/dynamodb_local.nix { };
adr-tools = pkgs.callPackage ./packages/adr-tools.nix { };
multi-gitter = pkgs.callPackage ./packages/multi-gitter.nix { };
debezium-server = pkgs.callPackage ./packages/debezium-server.nix { };
generate-netskope-combined-cert = pkgs.callPackage ./packages/generate-netskope-combined-cert.nix { };
};

Expand Down
36 changes: 36 additions & 0 deletions packages/debezium-server.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# command to convert hash to correct format
# nix-hash --to-sri --type sha256 hash-from-downloads-page

{
stdenv,
fetchzip,
makeWrapper,
jdk,
}: let
pname = "debezium-server";
version = "2.7.1.Final";
tarballName = "debezium-server-dist-${version}.tar.gz";
in
stdenv.mkDerivation {
inherit pname version;

src = fetchzip {
url = "https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/${version}/${tarballName}";
hash = "sha256-y7elU/9zBjGSpBFwAXHvhZvLb0QvvBazwDNP9yBQdJw=";
};

nativeBuildInputs = [makeWrapper];

installPhase = ''
runHook preInstall
cp -R . $out
RUNNER=$(ls $out/debezium-server-*runner.jar)
PATH_SEP=":"
LIB_PATH="$out/lib/*"
makeWrapper ${jdk}/bin/java \
$out/bin/run_debezium --add-flags "\
\''$DEBEZIUM_OPTS \''$JAVA_OPTS -cp \
$RUNNER$PATH_SEP\"\''${CONNECTOR_CONF_PATH:-conf}\"$PATH_SEP$LIB_PATH io.debezium.server.Main"
runHook postInstall
'';
}
47 changes: 47 additions & 0 deletions plugins/debezium-server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# CA Debezium Server Devbox Plugin

At Culture Amp, we use [Debezium Server](https://debezium.io/documentation/reference/stable/operations/debezium-server.html) to publish data to a Kafka sink via the Outbox Event Router. The debezium-server plugin allows us to run this locally.

Use this plugin to run Debezium Server locally and capture an outbox table within a Postgresql instance, before publishing to a local Kafka cluster such as provided by the `ca-kafka-local` plugin in conjunction with `kafka-local`.

What it provides:

- Environment variables. See [plugin.json](./plugin.json) for which variables are supplied and their values. These variables control configuration for the source Postgresql instance and Kafka sink. By default, the Kafka sink configuration assumes the environment variables imported by `ca-kafka-local`. Configuration is also provided for the seeding of data in the outbox table.
- Process compose job
- Debezium Server instance pre-configured with the outbox event router
- readme detailing environment variables and basic usage
- init_outbox service that assumes an existing Postgresql setup and configures the required outbox table, publication and heartbeat table for Debezium to use.
- init_topic service that attempts to auto-create the heartbeat topic, which is required by Debezium Server. Normally auto topic creation would suffice, but debezium server will error if the heartbeat topic does not exist.'
- Various CLI tools
- Kafka CLI tools are included for creating the heartbeat topic
- psql is provided and used for setting up Postgresql
- A nodejs based populate script. This uses kafkajs to Avro-encode sample data provided according to the defined schema, before inserting this data into your outbox table

## Usage

To start Debezium Server

Include the plugin in your `devbox.json`:
{
"$schema": "https://raw.githubusercontent.com/jetify-com/devbox/main/.schema/devbox.schema.json",
"include": [
"github:cultureamp/devbox-extras?dir=plugins/debezium-server"
]
}

You will need to add the `ca-kafka-local` plugin and follow the plugin [README.md](../ca-kafka-local/README.md).
You will also need to add `postgresql` package to your project, and correctly define the postgresql service with WAL level set to logical:

```
postgresql:
command: 'pg_ctl start -o "-k $PGHOST -c wal_level=logical"'
is_daemon: true
shutdown:
command: pg_ctl stop -m fast
readiness_probe:
period_seconds: 1
exec:
command: pg_isready -U postgres
availability:
restart: always
```
27 changes: 27 additions & 0 deletions plugins/debezium-server/bin/debezium-server-readme
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/sh

set -e

echo
echo "== DEBEZIUM SERVER =="
echo "To start Debezium Server (https://github.com/cultureamp/debezium-server):"
echo " devbox services up"
echo
echo "You will need to first ensure both postgres and kafka-local (https://github.com/cultureamp/kafka-local) are running"
echo
echo "== Environment variables =="
echo "DB_HOSTNAME: ${DB_HOSTNAME}"
echo "DB_PORT: ${DB_PORT}"
echo "DB_AUTH_USERNAME: ${DB_AUTH_USERNAME}"
echo "DB_AUTH_PASSWORD: ${DB_AUTH_PASSWORD}"
echo "DB_USERNAME: ${DB_USERNAME}"
echo "DB_PASSWORD: ${DB_PASSWORD}"
echo "DB_NAME: ${DB_NAME}"
echo "DB_SCHEMA: ${DB_SCHEMA}"
echo "FARM: ${FARM}"
echo "INTERNAL_TOPIC_PREFIX: ${INTERNAL_TOPIC_PREFIX}"
echo "SLOT_NAME: ${SLOT_NAME}"
echo "OFFSET_TOPIC: ${OFFSET_TOPIC}"
echo "PUBLICATION_NAME: ${PUBLICATION_NAME}"
echo "OUTBOX_TABLE: ${OUTBOX_TABLE}"
echo "HEARTBEAT_TABLE: ${HEARTBEAT_TABLE}"
99 changes: 99 additions & 0 deletions plugins/debezium-server/bin/populate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import pgPromise from "pg-promise";
import { SchemaRegistry, SchemaType } from "@kafkajs/confluent-schema-registry";
import { readFileSync } from "fs";
import { v4 as uuid } from "uuid";

// Schema and sample data configuration
const registry = new SchemaRegistry({
host: `http://${process.env.SCHEMA_REGISTRY_URL}`,
});
const schemaPath = process.env.SCHEMA_PATH;
const sampleDataPath = process.env.SAMPLE_DATA_PATH;

// Kafka sink configuration
const outboxTable = process.env.OUTBOX_TABLE;
const targetTopic = process.env.TARGET_TOPIC;

// Postgres source configuration
const dbSchema = process.env.DB_SCHEMA;
const dbHost = process.env.DB_HOST;
const dbPort = parseInt(process.env.DB_PORT);
const dbName = process.env.DB_NAME;
const dbUsername = process.env.DB_USERNAME;
const dbPassword = process.env.DB_PASSWORD;

let db: pgPromise.IDatabase<{}>;

const connectDatabase = () => {
db = pgPromise()({
host: dbHost,
port: dbPort,
database: dbName,
user: dbUsername,
password: dbPassword,
});
};

const disconnectDatabase = async () => {
await db.$pool.end();
};

const registerSchema = async () => {
const schema = readFileSync(schemaPath, "utf-8");
const { id } = await registry.register({
type: SchemaType.AVRO,
schema,
});
console.log(`Auto-registered schema with id ${id}`);

return id;
};

const encodePayload = async (schemaId: number, payload: any) => {
const encodedPayload = await registry.encode(schemaId, payload);

return encodedPayload;
};

const addToOutboxTable = async (payload: Buffer) => {
const id = uuid();
const messageKey = uuid();
const partitionKey = uuid();
const accountId = uuid();
const createdAt = new Date().toISOString();

console.log(
`Publishing encoded payload to table ${dbSchema}.${outboxTable} with target topic ${targetTopic}`
);

await db.none(
`INSERT INTO ${dbSchema}.${outboxTable} (id, topic, message_key, partition_key, payload, account_id, created_at) VALUES (\${id}, \${topic}, \${message_key}, \${partition_key}, \${payload}, \${account_id}, \${created_at})`,
{
id: id,
topic: targetTopic,
message_key: messageKey,
partition_key: partitionKey,
payload: payload,
account_id: accountId,
created_at: createdAt,
}
);
};

(async () => {
try {
const sampleData = readFileSync(sampleDataPath, "utf8");
const sampleJson = JSON.parse(sampleData);

connectDatabase();

const schemaId = await registerSchema();
for (const payload of sampleJson) {
const encodedPayload = await encodePayload(schemaId, payload);
addToOutboxTable(encodedPayload);
}
disconnectDatabase();
} catch (error) {
console.error("Error: ", error);
}
})();
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
debezium.sink.type=kafka
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
debezium.sink.kafka.producer.max.request.size=50000000
debezium.sink.kafka.producer.delivery.timeout.ms=1500000
debezium.sink.kafka.producer.retry.backoff.ms=300000
debezium.format.key=json
debezium.format.key.converter=org.apache.kafka.connect.storage.StringConverter
debezium.format.key.schemas.enable=false
debezium.format.value=binary
debezium.format.value.schemas.enable=false
debezium.format.value.converter=io.debezium.converters.BinaryDataConverter
debezium.format.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
debezium.format.delegate.converter.type.schemas.enable=false
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.tasks.max=1
debezium.source.plugin.name=pgoutput
debezium.source.topic.prefix=${FARM}.${INTERNAL_TOPIC_PREFIX}
debezium.source.publication.autocreate.mode=filtered
debezium.source.slot.name=${SLOT_NAME}
debezium.source.slot.drop.on.stop=false
debezium.source.table.include.list=${DB_SCHEMA}.${OUTBOX_TABLE}, ${DB_SCHEMA}.${HEARTBEAT_TABLE}
debezium.source.publication.name=${PUBLICATION_NAME}
debezium.source.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
debezium.source.offset.storage.topic=${OFFSET_TOPIC}
debezium.source.offset.storage.partitions=1
debezium.source.offset.storage.replication.factor=1
debezium.transforms=router, heartbeat1, heartbeat2
debezium.transforms.router.type=io.debezium.transforms.outbox.EventRouter
debezium.transforms.router.route.topic.replacement=${FARM}.$1
debezium.transforms.router.route.by.field=topic
debezium.transforms.router.table.field.event.payload=payload
debezium.transforms.router.table.field.event.key=partition_key
debezium.transforms.router.table.field.event.id=id
debezium.transforms.router.predicate=outbox
debezium.predicates=outbox
debezium.predicates.outbox.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
debezium.predicates.outbox.pattern=${FARM}.${INTERNAL_TOPIC_PREFIX}.${DB_SCHEMA}.${OUTBOX_TABLE}
debezium.transforms.heartbeat1.type=org.apache.kafka.connect.transforms.RegexRouter
debezium.transforms.heartbeat1.regex=__debezium-heartbeat.${FARM}.${INTERNAL_TOPIC_PREFIX}
debezium.transforms.heartbeat1.replacement=${FARM}.${INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-interval
debezium.transforms.heartbeat2.type=org.apache.kafka.connect.transforms.RegexRouter
debezium.transforms.heartbeat2.regex=${FARM}.${INTERNAL_TOPIC_PREFIX}.${DB_SCHEMA}.${HEARTBEAT_TABLE}(.*?)
debezium.transforms.heartbeat2.replacement=_${FARM}.${INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-table
debezium.source.database.initial.statements=INSERT INTO ${DB_SCHEMA}.${HEARTBEAT_TABLE} (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;
debezium.source.heartbeat.action.query=INSERT INTO ${DB_SCHEMA}.${HEARTBEAT_TABLE} (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;
debezium.source.heartbeat.interval.ms=10000
debezium.source.database.hostname=${DB_HOSTNAME}
debezium.source.database.port=${DB_PORT}
debezium.source.database.user=${DB_USERNAME}
debezium.source.database.password=${DB_PASSWORD}
debezium.source.database.dbname=${DB_NAME}
debezium.source.bootstrap.servers=${BOOTSTRAP_SERVERS}
debezium.sink.kafka.producer.bootstrap.servers=${BOOTSTRAP_SERVERS}
102 changes: 102 additions & 0 deletions plugins/debezium-server/config/debezium-server/init_outbox.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
\echo Creating debezium user: :debezium_user
\echo Using schema: :schema_name
\echo Creating outbox table: :table_name

SET vars.debezium_user TO :debezium_user;
SET vars.debezium_password TO :debezium_password;
SET vars.schema_name TO :schema_name;
SET vars.table_name TO :table_name;

-- Basic permissions
DO $$
DECLARE
debezium_user text := current_setting('vars.debezium_user');
debezium_password text := current_setting('vars.debezium_password');
BEGIN
-- Create role if it does not exist
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dbz_replication_role') THEN
CREATE ROLE dbz_replication_role REPLICATION LOGIN;
END IF;

-- Create user if it does not exist
IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_user WHERE usename = debezium_user) THEN
EXECUTE FORMAT('CREATE USER %s WITH PASSWORD ''%s''', debezium_user, debezium_password);
END IF;

-- Alter user to add REPLICATION permission
EXECUTE FORMAT('ALTER USER %s WITH REPLICATION', debezium_user);

-- Grant role to user and postgres
EXECUTE FORMAT('GRANT dbz_replication_role to %s', debezium_user);
GRANT dbz_replication_role to postgres;
END $$;

-- Outbox table
DO $$
DECLARE
schema_name text := current_setting('vars.schema_name');
table_name text := current_setting('vars.table_name');
BEGIN
-- Create table if it does not exist
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = table_name AND schemaname = schema_name) THEN
EXECUTE FORMAT('
CREATE TABLE %s.%s (
id UUID NOT NULL PRIMARY KEY,
topic TEXT NOT NULL,
message_key UUID NOT NULL,
partition_key UUID NOT NULL,
payload BYTEA NOT NULL,
account_id UUID NOT NULL,
created_at TIMESTAMP NOT NULL
)', schema_name, table_name);
END IF;

-- Set table owner if not already set
IF (SELECT tableowner FROM pg_tables WHERE tablename = table_name AND schemaname = schema_name) != 'dbz_replication_role' THEN
EXECUTE FORMAT('ALTER TABLE %s.%s OWNER TO dbz_replication_role', schema_name, table_name);
END IF;
END $$;

-- Create heartbeat table
DO $$
DECLARE
schema_name text := current_setting('vars.schema_name');
BEGIN
-- Create the sequence if it doesn't exist
EXECUTE FORMAT('
CREATE SEQUENCE IF NOT EXISTS %s.debezium_heartbeat_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;', schema_name);

-- Create table if it does not exist
EXECUTE FORMAT('
CREATE TABLE IF NOT EXISTS %s.debezium_heartbeat (
id bigint NOT NULL DEFAULT nextval(''%s.debezium_heartbeat_id_seq''::regclass),
ts timestamp without time zone NOT NULL,
CONSTRAINT debezium_heartbeat_id_key UNIQUE (id)
);', schema_name, schema_name);

EXECUTE FORMAT('ALTER TABLE %s.debezium_heartbeat OWNER TO dbz_replication_role;', schema_name);
END $$;

-- Create publication
DO $$
DECLARE
schema_name text := current_setting('vars.schema_name');
table_name text := current_setting('vars.table_name');
BEGIN
-- Create publication if it does not exist
IF NOT EXISTS (SELECT 1 FROM pg_publication_tables WHERE pubname = 'dbz_publication' AND schemaname = schema_name AND tablename = table_name) THEN
DROP PUBLICATION IF EXISTS dbz_publication;
EXECUTE FORMAT('CREATE PUBLICATION dbz_publication FOR TABLE %s.%s, %s.debezium_heartbeat;', schema_name, table_name, schema_name);
END IF;

-- Set publication owner if not already set
ALTER PUBLICATION dbz_publication OWNER TO dbz_replication_role;
END $$;

-- Allow UUID extension for inserting into the outbox table
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
Loading

0 comments on commit 56ac056

Please sign in to comment.