forked from ovotech/castle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ssl-auth.ts
39 lines (32 loc) · 1.24 KB
/
ssl-auth.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import { createCastle, produce, consumeEachMessage, describeCastle } from '@ovotech/castle';
import { env } from 'process';
import { Event, EventSchema } from './avro';
// Define producers as pure functions
// With statically setting the typescript types and avro schemas
const mySender = produce<Event>({ topic: 'my-topic-1', schema: EventSchema });
// Define consumers as pure functions
// With statically setting which types it will accept
const eachEvent = consumeEachMessage<Event>(async ({ message }) => {
console.log(message.value);
});
const main = async () => {
const castle = createCastle({
// You can pass the username and password in the uri string to the schema registry
schemaRegistry: { uri: 'http://username@password:localhost:8081' },
kafka: {
brokers: ['localhost:29092'],
// Pass ssl certs to kafkajs
ssl: {
ca: env.KAFKA_SSL_CA,
key: env.KAFKA_SSL_KEY,
cert: env.KAFKA_SSL_CERT,
},
},
consumers: [{ topic: 'my-topic-1', groupId: 'my-group-1', eachMessage: eachEvent }],
});
// Start all consumers and producers
await castle.start();
console.log(describeCastle(castle));
await mySender(castle.producer, [{ value: { field1: 'my-string' }, key: null }]);
};
main();