forked from ovotech/castle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
with-middlewares.ts
63 lines (54 loc) · 1.88 KB
/
with-middlewares.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import { createCastle, describeCastle, produce, consumeEachMessage } from '@ovotech/castle';
import { StartEvent, StartEventSchema, CompleteEvent, CompleteEventSchema } from './avro';
import {
createDb,
createLogging,
DbContext,
LoggingContext,
createErrorHandling,
} from './middlewares';
const start = produce<StartEvent>({ topic: 'my-start-3', schema: StartEventSchema });
const complete = produce<CompleteEvent>({ topic: 'my-complete-3', schema: CompleteEventSchema });
const eachStart = consumeEachMessage<StartEvent, DbContext & LoggingContext>(
async ({ message, db, logger, producer }) => {
logger.log('Started', message.value?.id);
const { rows } = await db.query('SELECT avatar FROM users WHERE id = $1', [message.value?.id]);
logger.log('Found', rows, 'Sending Complete');
complete(producer, [{ value: { id: message.value?.id ?? 0 }, key: null }]);
},
);
const eachComplete = consumeEachMessage<CompleteEvent, LoggingContext>(
async ({ message, logger }) => {
logger.log('Complete received for', message.value?.id);
},
);
const main = async () => {
const db = createDb({
user: 'boost-statements-api',
database: 'boost-statements-api',
password: 'dev-pass',
host: '127.0.0.1',
});
const logging = createLogging(console);
const errorHandling = createErrorHandling();
const castle = createCastle({
schemaRegistry: { uri: 'http://localhost:8081' },
kafka: { brokers: ['localhost:29092'] },
consumers: [
{
topic: 'my-start-3',
groupId: 'my-start-3',
eachMessage: logging(errorHandling(db(eachStart))),
},
{
topic: 'my-complete-3',
groupId: 'my-complete-3',
eachMessage: logging(errorHandling(eachComplete)),
},
],
});
await castle.start();
console.log(describeCastle(castle));
await start(castle.producer, [{ value: { id: 1 }, key: null }]);
};
main();