From 5c8b639d88aa91151e71e9adb205b2bd0a50f976 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 12 Dec 2024 14:42:31 +0000 Subject: [PATCH] Add support for new stream name Signed-off-by: John Gomersall --- src/domain/services/messages.service.spec.ts | 83 +++++++++++++++++++- src/domain/services/messages.service.ts | 15 +++- src/domain/services/redis.listener.spec.ts | 75 +++++++++++++++++- src/domain/services/redis.listener.ts | 10 +++ 4 files changed, 177 insertions(+), 6 deletions(-) diff --git a/src/domain/services/messages.service.spec.ts b/src/domain/services/messages.service.spec.ts index 935e578..b33090a 100644 --- a/src/domain/services/messages.service.spec.ts +++ b/src/domain/services/messages.service.spec.ts @@ -269,7 +269,7 @@ describe('create', () => { }); }); - it('should not call importwithfilter for initialImport', async () => { + it('should not call importWithFilter for initialImport', async () => { await createTestingModule([DomainModule], async (app) => { const importService = app.get(ImportService); const importSpy = jest @@ -311,7 +311,7 @@ describe('create', () => { }); }); - it('should call importwithfilter for normal imports', async () => { + it('should not include non-food products in call to importWithFilter', async () => { await createTestingModule([DomainModule], async (app) => { const importService = app.get(ImportService); const importSpy = jest @@ -327,12 +327,91 @@ describe('create', () => { id: nextId(), message: { code: code1, + product_type: 'food', }, }, { id: nextId(), message: { code: code2, + product_type: 'beauty', + }, + }, + ]; + + const messagesService = app.get(MessagesService); + await messagesService.create(messages); + + // Then the import is called + expect(importSpy).toHaveBeenCalledTimes(1); + + // Update events are created for all codes + const events = + await sql`SELECT * FROM product_update_event WHERE message->>'code' IN ${sql( + [code1, code2], + )}`; + + expect(events).toHaveLength(2); + + // Import with filter only called for the food product + const importWithFilterIn = importSpy.mock.calls[0][0].code.$in; + expect(importWithFilterIn).toHaveLength(1); + expect(importWithFilterIn[0]).toBe(code1); + }); + }); + + it('should not call importWithFilter for updates to only non-food products', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + const importSpy = jest + .spyOn(importService, 'importWithFilter') + .mockImplementation(); + + const code1 = randomCode(); + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + const messages = [ + { + id: nextId(), + message: { + code: code1, + product_type: 'beauty', + }, + }, + ]; + + const messagesService = app.get(MessagesService); + await messagesService.create(messages); + + // Then the import is not called + expect(importSpy).toHaveBeenCalledTimes(0); + }); + }); + + it('should call importWithFilter for normal imports', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + const importSpy = jest + .spyOn(importService, 'importWithFilter') + .mockImplementation(); + + const code1 = randomCode(); + const code2 = randomCode(); + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + const messages = [ + { + id: nextId(), + message: { + code: code1, + product_type: 'food', + }, + }, + { + id: nextId(), + message: { + code: code2, + product_type: 'food', }, }, ]; diff --git a/src/domain/services/messages.service.ts b/src/domain/services/messages.service.ts index 8f38921..c66cf72 100644 --- a/src/domain/services/messages.service.ts +++ b/src/domain/services/messages.service.ts @@ -58,9 +58,18 @@ export class MessagesService { do nothing`; if (!initialImport) { - const productCodes = [...new Set(messages.map((m) => m.message.code))]; - const filter = { code: { $in: productCodes } }; - await this.importService.importWithFilter(filter, ProductSource.EVENT); + const productCodes = [ + ...new Set( + messages + // At the moment we only import food products. This can be removed when we import all flavours + .filter((m) => m.message.product_type === 'food') + .map((m) => m.message.code), + ), + ]; + if (productCodes.length) { + const filter = { code: { $in: productCodes } }; + await this.importService.importWithFilter(filter, ProductSource.EVENT); + } } // Update counts on product_update after products have been imported diff --git a/src/domain/services/redis.listener.spec.ts b/src/domain/services/redis.listener.spec.ts index eb256af..4b33f90 100644 --- a/src/domain/services/redis.listener.spec.ts +++ b/src/domain/services/redis.listener.spec.ts @@ -13,7 +13,8 @@ import { MessagesService } from './messages.service'; jest.setTimeout(300000); describe('receiveMessages', () => { - it('should call importWithFilter when a message is received', async () => { + // Note following code can be removed when the new event is being published + it('should call importWithFilter when a message is received on the old stream', async () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Redis is running const redis = await new GenericContainer('redis') @@ -43,6 +44,77 @@ describe('receiveMessages', () => { const messageId = await client.xAdd('product_updates_off', '*', { code: code1, rev: '1', + product_type: 'food', + }); + + // Wait for message to be delivered + await setTimeout(100); + + // Then the import is called + expect(importSpy).toHaveBeenCalledTimes(1); + expect(await settings.getLastMessageId()).toBe(messageId); + + // If a new message is added + importSpy.mockClear(); + await client.xAdd('product_updates_off', '*', { + code: code2, + product_type: 'food', + }); + + // Wait for message to be delivered + await setTimeout(100); + + // Then import is called again but only with the new code + expect(importSpy).toHaveBeenCalledTimes(1); + const codes = importSpy.mock.calls[0][0].code.$in; + expect(codes).toHaveLength(1); + expect(codes[0]).toBe(code2); + + // Update events are created + const events = + await sql`SELECT * FROM product_update_event WHERE message->>'code' = ${code1}`; + + expect(events).toHaveLength(1); + expect(events[0].message_id).toBe(messageId); + } finally { + await client.quit(); + await redisListener.stopRedisConsumer(); + await redis.stop(); + } + }); + }); + + it('should call importWithFilter when a message is received', async () => { + await createTestingModule([DomainModule], async (app) => { + // GIVEN: Redis is running + const redis = await new GenericContainer('redis') + .withExposedPorts(6379) + .start(); + const redisUrl = `redis://localhost:${redis.getMappedPort(6379)}`; + const settings = app.get(SettingsService); + jest.spyOn(settings, 'getRedisUrl').mockImplementation(() => redisUrl); + + // And lastmessageid is zero + await settings.setLastMessageId('0'); + const importService = app.get(ImportService); + const importSpy = jest + .spyOn(importService, 'importWithFilter') + .mockImplementation(); + + const redisListener = app.get(RedisListener); + await redisListener.startRedisConsumer(); + + const client = createClient({ url: redisUrl }); + await client.connect(); + try { + const code1 = randomCode(); + const code2 = randomCode(); + + // When: A message is sent + const messageId = await client.xAdd('product_updates', '*', { + code: code1, + product_type: 'food', + rev: '1', }); // Wait for message to be delivered @@ -56,6 +128,7 @@ describe('receiveMessages', () => { importSpy.mockClear(); await client.xAdd('product_updates_off', '*', { code: code2, + product_type: 'food', }); // Wait for message to be delivered diff --git a/src/domain/services/redis.listener.ts b/src/domain/services/redis.listener.ts index 8d40e97..969f1f0 100644 --- a/src/domain/services/redis.listener.ts +++ b/src/domain/services/redis.listener.ts @@ -37,6 +37,13 @@ export class RedisListener { [ // XREAD can read from multiple streams, starting at a // different ID for each... + { + key: 'product_updates', + id: lastMessageId, + }, + // Following can be deleted after PO is updated to use generic stream name + // Note should strictly have a different message id but PO will take more than one millisecond to switch + // so shouldn't be an issue { key: 'product_updates_off', id: lastMessageId, @@ -53,8 +60,11 @@ export class RedisListener { if (messages?.length) { /** Message looks like this: { + timestamp: 123456789, code: "0850026029062", + rev: 2, flavor: "off", + product_type: "food", user_id: "stephane", action: "updated", comment: "Modification : Remove changes",