Skip to content

Commit

Permalink
WLT-651 Better handling the messages channel for larges amount of mes…
Browse files Browse the repository at this point in the history
…sage with more support for various edge cases
  • Loading branch information
nirfireblocks authored and nadav-fireblocks committed Aug 8, 2024
1 parent 0b3c1f4 commit 30e60b8
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 15 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Fireblocks Agent is an open-source on-prem service written in Typescript which i
- Configure and run Fireblocks agent:
- Copy `.env.prod` and name it `.env.{env-name}` (e.g. `.env.test`)
- Edit your newly created `.env.{env-name}` file with the right configuration
- Build the Fireblocks agent:
- `npm run build`
- Start the Fireblocks agent with your desired environment:
- `npm run start --env=env-name`

Expand Down
4 changes: 3 additions & 1 deletion examples/server/env/dev.env
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
KNOWN_API_KEYS=""
KNOWN_API_KEYS=""
HSM_MODE=COLD
SIGNING_ORDER=RANDOM
7 changes: 7 additions & 0 deletions examples/server/src/dao/messages.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ export const getMessages = async (requestsIds: string[]): Promise<DbMsg[]> => {
return res;
};

export const getAllPendingMessages = async (): Promise<DbMsg[]> => {
const txRef = await getMessagesCollection();
const cursor = await txRef.find({ status: "PENDING_SIGN" });
const res = await cursor.toArray();
return res;
};

function toMsgStatus(dbMsgs: Partial<DbMsg>[]): MessageStatus[] {
dbMsgs.forEach((_) => {
delete _._id;
Expand Down
30 changes: 28 additions & 2 deletions examples/server/src/routes/messages.router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,47 @@ import { Request, Response, Router } from 'express';
import { paths } from '../customer-server';
import * as messagesDao from '../dao/messages.dao';
import * as hsmSignService from '../services/hsm-sign-service';
import logger from '../services/logger';
import { MessageStatus } from '../types';
const msgRouter = Router();

msgRouter.post('/messagesToSign', async (req: Request<{}, {}, MessagesRequest>, res: Response<MessagesResponse>) => {
const { messages } = req.body;
const messagesStatus: MessageStatus[] = await messagesDao.insertMessages(messages);

//the next line we simulate the hsm work and sign the messages.
//the next line we simulate the hsm work and sign the messages immediately if HSM_MODE is HOT, otherwise don't sign just save message in DB.
const requestsIds = messagesStatus.map((_) => _.requestId);
await hsmSignService.signMessages(requestsIds);
const hsmMode = process.env.HSM_MODE || "HOT"
const isHotMode = hsmMode === "COLD" ? false : true;
logger.info(`HSM mode is ${isHotMode ? "HOT" : "COLD"}`)
if (isHotMode) {
await hsmSignService.signMessages(requestsIds);
}

const messagesStatusAfterSign = await messagesDao.getMessagesStatus(requestsIds);

res.status(200).json({ statuses: messagesStatusAfterSign });
});

msgRouter.post('/signAllPendingMessages',
async (req: Request<{}, {}, MessagesStatusRequest>, res: Response<MessagesStatusResponse>) => {
const messagesStatus = await messagesDao.getAllPendingMessages();
const requestsIds = messagesStatus.map((_) => _.requestId);
logger.info(`found ${requestsIds.length} pending messages ${JSON.stringify(requestsIds)}`);
const signingOrder = process.env.SIGNING_ORDER || "NORMAL"
if (signingOrder === "RANDOM") {
// randomize requestsIds
requestsIds.sort(() => Math.random() - 0.5);
logger.info(`have ${requestsIds.length} pending messages after random: ${JSON.stringify(requestsIds)}`);
}
await hsmSignService.signMessages(requestsIds);
const messagesStatusAfterSign = await messagesDao.getMessagesStatus(requestsIds);

res.status(200).json({ statuses: messagesStatusAfterSign });
},
);


msgRouter.post(
'/messagesStatus',
async (req: Request<{}, {}, MessagesStatusRequest>, res: Response<MessagesStatusResponse>) => {
Expand Down
2 changes: 1 addition & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require('dotenv').config({
export const MOBILE_GATEWAY_URL = process.env.MOBILE_GATEWAY_URL;
export const CUSTOMER_SERVER_URL = process.env.CUSTOMER_SERVER_URL;
export const CUSTOMER_SERVER_PULL_CADENCE_MS = Number(process.env.CUSTOMER_SERVER_PULL_CADENCE_MS ?? 30000);
export const AGENT_REQUESTS_CACHE_SIZE = Number(process.env.AGENT_REQUESTS_CACHE_SIZE ?? 1024);
export const AGENT_REQUESTS_CACHE_SIZE = Number(process.env.AGENT_REQUESTS_CACHE_SIZE ?? 2048);
export const CUSTOMER_SERVER_AUTHORIZATION = process.env.CUSTOMER_SERVER_AUTHORIZATION;
export const TOKEN_PATH = `${__dirname}/.fireblocks-refresh-token`;
export const SSL_CERT_PATH = process.env.SSL_CERT_PATH;
4 changes: 1 addition & 3 deletions src/customer-server-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ describe('Customer server client', () => {
});

await service.pullMessagesStatus(httpsAgent);
expect(messagesService.updateStatus).toHaveBeenCalledWith([
{ msgId: expect.any(Number), request: msgEnvelop, messageStatus },
]);
expect(messagesService.updateStatus).toHaveBeenCalledWith([{ msgId: null, request: msgEnvelop, messageStatus }]);
});
});
2 changes: 1 addition & 1 deletion src/customer-server-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CustomerClient {
return null;
}
return {
msgId: decodedMsg.msgId,
msgId: null,
request: decodedMsg.request,
messageStatus: messagesStatus,
};
Expand Down
30 changes: 24 additions & 6 deletions src/services/messages.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import https from 'https';
import { AGENT_REQUESTS_CACHE_SIZE } from '../constants';
import { DecodedMessage, ExtendedMessageStatusCache, FBMessageEnvelope, RequestType } from '../types';
import { decodeAndVerifyMessage } from '../utils/messages-utils';
import customerServerApi from './customer-server.api';
import fbServerApi from './fb-server.api';
import logger from './logger';
import https from 'https';
interface IMessageService {
getPendingMessages(): ExtendedMessageStatusCache[];
handleMessages(messages: FBMessageEnvelope[], httpsAgent: https.Agent): Promise<void>;
Expand Down Expand Up @@ -121,26 +121,43 @@ class MessageService implements IMessageService {
}

async updateStatus(messagesStatus: ExtendedMessageStatusCache[]) {
let promises = [];
for (const msgStatus of messagesStatus) {
try {
const { msgId, request, messageStatus } = msgStatus;
const { requestId, status } = messageStatus;
const isInCache = this.msgCache[requestId];
if (!isInCache) {
await this.addMessageToCache(msgStatus);
} else {
} else if (msgId) {
// If there was a change in the prefix of msgId, all cached msgIds are invalid
if (
this.msgCache[messageStatus.requestId].msgId != null &&
Math.floor(this.msgCache[messageStatus.requestId].msgId / 1000000) != Math.floor(msgId / 1000000)
) {
for (const key in this.msgCache) {
this.msgCache[key].msgId = null;
}
}
this.msgCache[messageStatus.requestId].msgId = msgId;
}

let latestMsgId = this.msgCache[messageStatus.requestId].msgId;
if (status === 'SIGNED' || status === 'FAILED') {
logger.info(
`Got ${
isInCache ? 'cached' : 'from customer server'
} message with final status: ${status}, msgId ${msgId}, cacheId: ${requestId}`,
} message with final status: ${status}, msgId ${latestMsgId}, cacheId: ${requestId}`,
);
// broadcast always and ack only if we have a valid msgId
promises.push(
fbServerApi
.broadcastResponse(messageStatus, request)
.then(() => (this.msgCache[messageStatus.requestId].messageStatus = messageStatus)),
);
await fbServerApi.broadcastResponse(messageStatus, request);
await fbServerApi.ackMessage(msgId);
this.msgCache[messageStatus.requestId].messageStatus = messageStatus;
if (latestMsgId) {
promises.push(fbServerApi.ackMessage(latestMsgId));
}
}
} catch (e) {
logger.error(
Expand All @@ -150,6 +167,7 @@ class MessageService implements IMessageService {
);
}
}
await Promise.all(promises);
}

async ackMessages(messagesIds: number[]) {
Expand Down
2 changes: 1 addition & 1 deletion src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const AGENT_VERSION = '2.0.1';
export const AGENT_VERSION = '2.1.1';

0 comments on commit 30e60b8

Please sign in to comment.