From 30e60b80a64dae2628ddce89043c0a79bdb29e2a Mon Sep 17 00:00:00 2001 From: Nir Berman Date: Thu, 8 Aug 2024 11:18:10 +0000 Subject: [PATCH] WLT-651 Better handling the messages channel for larges amount of message with more support for various edge cases --- README.md | 2 ++ examples/server/env/dev.env | 4 ++- examples/server/src/dao/messages.dao.ts | 7 +++++ examples/server/src/routes/messages.router.ts | 30 +++++++++++++++++-- src/constants.ts | 2 +- src/customer-server-client.test.ts | 4 +-- src/customer-server-client.ts | 2 +- src/services/messages.service.ts | 30 +++++++++++++++---- src/version.ts | 2 +- 9 files changed, 68 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index c9427ce..f82dd92 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ Fireblocks Agent is an open-source on-prem service written in Typescript which i - Configure and run Fireblocks agent: - Copy `.env.prod` and name it `.env.{env-name}` (e.g. `.env.test`) - Edit your newly created `.env.{env-name}` file with the right configuration +- Build the Fireblocks agent: + - `npm run build` - Start the Fireblocks agent with your desired environment: - `npm run start --env=env-name` diff --git a/examples/server/env/dev.env b/examples/server/env/dev.env index b0e4b87..2c25998 100644 --- a/examples/server/env/dev.env +++ b/examples/server/env/dev.env @@ -1 +1,3 @@ -KNOWN_API_KEYS="" \ No newline at end of file +KNOWN_API_KEYS="" +HSM_MODE=COLD +SIGNING_ORDER=RANDOM diff --git a/examples/server/src/dao/messages.dao.ts b/examples/server/src/dao/messages.dao.ts index 5552582..3c85d4d 100644 --- a/examples/server/src/dao/messages.dao.ts +++ b/examples/server/src/dao/messages.dao.ts @@ -84,6 +84,13 @@ export const getMessages = async (requestsIds: string[]): Promise => { return res; }; +export const getAllPendingMessages = async (): Promise => { + const txRef = await getMessagesCollection(); + const cursor = await txRef.find({ status: "PENDING_SIGN" }); + const res = await cursor.toArray(); + return res; +}; + function toMsgStatus(dbMsgs: Partial[]): MessageStatus[] { dbMsgs.forEach((_) => { delete _._id; diff --git a/examples/server/src/routes/messages.router.ts b/examples/server/src/routes/messages.router.ts index 3748627..9d39898 100644 --- a/examples/server/src/routes/messages.router.ts +++ b/examples/server/src/routes/messages.router.ts @@ -2,6 +2,7 @@ import { Request, Response, Router } from 'express'; import { paths } from '../customer-server'; import * as messagesDao from '../dao/messages.dao'; import * as hsmSignService from '../services/hsm-sign-service'; +import logger from '../services/logger'; import { MessageStatus } from '../types'; const msgRouter = Router(); @@ -9,14 +10,39 @@ msgRouter.post('/messagesToSign', async (req: Request<{}, {}, MessagesRequest>, const { messages } = req.body; const messagesStatus: MessageStatus[] = await messagesDao.insertMessages(messages); - //the next line we simulate the hsm work and sign the messages. + //the next line we simulate the hsm work and sign the messages immediately if HSM_MODE is HOT, otherwise don't sign just save message in DB. const requestsIds = messagesStatus.map((_) => _.requestId); - await hsmSignService.signMessages(requestsIds); + const hsmMode = process.env.HSM_MODE || "HOT" + const isHotMode = hsmMode === "COLD" ? false : true; + logger.info(`HSM mode is ${isHotMode ? "HOT" : "COLD"}`) + if (isHotMode) { + await hsmSignService.signMessages(requestsIds); + } + const messagesStatusAfterSign = await messagesDao.getMessagesStatus(requestsIds); res.status(200).json({ statuses: messagesStatusAfterSign }); }); +msgRouter.post('/signAllPendingMessages', + async (req: Request<{}, {}, MessagesStatusRequest>, res: Response) => { + const messagesStatus = await messagesDao.getAllPendingMessages(); + const requestsIds = messagesStatus.map((_) => _.requestId); + logger.info(`found ${requestsIds.length} pending messages ${JSON.stringify(requestsIds)}`); + const signingOrder = process.env.SIGNING_ORDER || "NORMAL" + if (signingOrder === "RANDOM") { + // randomize requestsIds + requestsIds.sort(() => Math.random() - 0.5); + logger.info(`have ${requestsIds.length} pending messages after random: ${JSON.stringify(requestsIds)}`); + } + await hsmSignService.signMessages(requestsIds); + const messagesStatusAfterSign = await messagesDao.getMessagesStatus(requestsIds); + + res.status(200).json({ statuses: messagesStatusAfterSign }); + }, +); + + msgRouter.post( '/messagesStatus', async (req: Request<{}, {}, MessagesStatusRequest>, res: Response) => { diff --git a/src/constants.ts b/src/constants.ts index 65e9489..a39529f 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -5,7 +5,7 @@ require('dotenv').config({ export const MOBILE_GATEWAY_URL = process.env.MOBILE_GATEWAY_URL; export const CUSTOMER_SERVER_URL = process.env.CUSTOMER_SERVER_URL; export const CUSTOMER_SERVER_PULL_CADENCE_MS = Number(process.env.CUSTOMER_SERVER_PULL_CADENCE_MS ?? 30000); -export const AGENT_REQUESTS_CACHE_SIZE = Number(process.env.AGENT_REQUESTS_CACHE_SIZE ?? 1024); +export const AGENT_REQUESTS_CACHE_SIZE = Number(process.env.AGENT_REQUESTS_CACHE_SIZE ?? 2048); export const CUSTOMER_SERVER_AUTHORIZATION = process.env.CUSTOMER_SERVER_AUTHORIZATION; export const TOKEN_PATH = `${__dirname}/.fireblocks-refresh-token`; export const SSL_CERT_PATH = process.env.SSL_CERT_PATH; diff --git a/src/customer-server-client.test.ts b/src/customer-server-client.test.ts index 76ade82..2d17ad7 100644 --- a/src/customer-server-client.test.ts +++ b/src/customer-server-client.test.ts @@ -100,8 +100,6 @@ describe('Customer server client', () => { }); await service.pullMessagesStatus(httpsAgent); - expect(messagesService.updateStatus).toHaveBeenCalledWith([ - { msgId: expect.any(Number), request: msgEnvelop, messageStatus }, - ]); + expect(messagesService.updateStatus).toHaveBeenCalledWith([{ msgId: null, request: msgEnvelop, messageStatus }]); }); }); diff --git a/src/customer-server-client.ts b/src/customer-server-client.ts index 052f54a..18dbea6 100644 --- a/src/customer-server-client.ts +++ b/src/customer-server-client.ts @@ -32,7 +32,7 @@ class CustomerClient { return null; } return { - msgId: decodedMsg.msgId, + msgId: null, request: decodedMsg.request, messageStatus: messagesStatus, }; diff --git a/src/services/messages.service.ts b/src/services/messages.service.ts index 406ae75..5c390e1 100644 --- a/src/services/messages.service.ts +++ b/src/services/messages.service.ts @@ -1,10 +1,10 @@ +import https from 'https'; import { AGENT_REQUESTS_CACHE_SIZE } from '../constants'; import { DecodedMessage, ExtendedMessageStatusCache, FBMessageEnvelope, RequestType } from '../types'; import { decodeAndVerifyMessage } from '../utils/messages-utils'; import customerServerApi from './customer-server.api'; import fbServerApi from './fb-server.api'; import logger from './logger'; -import https from 'https'; interface IMessageService { getPendingMessages(): ExtendedMessageStatusCache[]; handleMessages(messages: FBMessageEnvelope[], httpsAgent: https.Agent): Promise; @@ -121,6 +121,7 @@ class MessageService implements IMessageService { } async updateStatus(messagesStatus: ExtendedMessageStatusCache[]) { + let promises = []; for (const msgStatus of messagesStatus) { try { const { msgId, request, messageStatus } = msgStatus; @@ -128,19 +129,35 @@ class MessageService implements IMessageService { const isInCache = this.msgCache[requestId]; if (!isInCache) { await this.addMessageToCache(msgStatus); - } else { + } else if (msgId) { + // If there was a change in the prefix of msgId, all cached msgIds are invalid + if ( + this.msgCache[messageStatus.requestId].msgId != null && + Math.floor(this.msgCache[messageStatus.requestId].msgId / 1000000) != Math.floor(msgId / 1000000) + ) { + for (const key in this.msgCache) { + this.msgCache[key].msgId = null; + } + } this.msgCache[messageStatus.requestId].msgId = msgId; } + let latestMsgId = this.msgCache[messageStatus.requestId].msgId; if (status === 'SIGNED' || status === 'FAILED') { logger.info( `Got ${ isInCache ? 'cached' : 'from customer server' - } message with final status: ${status}, msgId ${msgId}, cacheId: ${requestId}`, + } message with final status: ${status}, msgId ${latestMsgId}, cacheId: ${requestId}`, + ); + // broadcast always and ack only if we have a valid msgId + promises.push( + fbServerApi + .broadcastResponse(messageStatus, request) + .then(() => (this.msgCache[messageStatus.requestId].messageStatus = messageStatus)), ); - await fbServerApi.broadcastResponse(messageStatus, request); - await fbServerApi.ackMessage(msgId); - this.msgCache[messageStatus.requestId].messageStatus = messageStatus; + if (latestMsgId) { + promises.push(fbServerApi.ackMessage(latestMsgId)); + } } } catch (e) { logger.error( @@ -150,6 +167,7 @@ class MessageService implements IMessageService { ); } } + await Promise.all(promises); } async ackMessages(messagesIds: number[]) { diff --git a/src/version.ts b/src/version.ts index 42bcf39..5b4ee79 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const AGENT_VERSION = '2.0.1'; +export const AGENT_VERSION = '2.1.1';