Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve message handling #16

Closed
wants to merge 13 commits into from
18 changes: 10 additions & 8 deletions api/customer-server.api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ components:
MessagesStatusRequest:
type: object
required:
- msgIds
- requestsIds
nirfireblocks marked this conversation as resolved.
Show resolved Hide resolved
properties:
msgIds:
requestsIds:
type: array
items:
type: number
example: 425878000014
type: string
format: uuid
example: 8c2b2b3d-fb83-497e-8138-72446b9184b6
MessagesStatusResponse:
type: object
required:
Expand All @@ -78,13 +79,18 @@ components:
type: object
required:
- msgId
- requestId
- type
- message
- payload
properties:
msgId:
type: number
example: 425878000014
requestId:
description: Unique request id, used to track the request
type: string
format: uuid
type:
$ref: "#/components/schemas/TxType"
message:
Expand All @@ -96,17 +102,13 @@ components:
MessageStatus:
type: object
required:
- msgId
- requestId
- status
- type
- payload
properties:
type:
$ref: "#/components/schemas/TxType"
msgId:
type: number
example: 425878000014
requestId:
type: string
status:
Expand Down
37 changes: 24 additions & 13 deletions examples/server/src/dao/messages.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,52 @@ const getMessagesCollection = async () => {
export const updateMessageStatus = async (msg: MessageStatus) => {
const msgRef = await getMessagesCollection();
const dbMsg = {
_id: msg.msgId,
_id: msg.requestId,
...msg,
};
return msgRef.updateOne({ _id: dbMsg._id }, { $set: dbMsg }, { upsert: true });
};

export const insertMessages = async (messages: MessageEnvelope[]): Promise<MessageStatus[]> => {
logger.info(`entering insertMessages ${JSON.stringify(messages.map((_) => _.requestId))}`);
const msgRef = await getMessagesCollection();
const dbMsgs = messages.map(({ msgId, type, message, payload }: MessageEnvelope) => {
const dbMsgs = messages.map(({ requestId, type, message, payload }: MessageEnvelope) => {
return {
_id: msgId,
msgId,
requestId: message.requestId,
_id: requestId,
requestId,
type,
message,
payload,
status: 'PENDING_SIGN',
} as DbMsg;
});
const insertRes = await msgRef.insertMany(dbMsgs);
const messagesRes = await getMessagesStatus(Object.values(insertRes.insertedIds));

const bulkOperations = dbMsgs.map((dbMsg) => ({
updateOne: {
filter: { _id: dbMsg._id },
update: {
$set: dbMsg,
},
upsert: true,
},
}));

await msgRef.bulkWrite(bulkOperations);
const messagesRes = await getMessagesStatus(dbMsgs.map((dbMsg) => dbMsg._id));
return messagesRes;
};

export const getMessagesStatus = async (msgIds: number[]): Promise<MessageStatus[]> => {
logger.info(`entering getMessagesStatus ${JSON.stringify(msgIds)}`);
export const getMessagesStatus = async (requestIds: string[]): Promise<MessageStatus[]> => {
logger.info(`entering getMessagesStatus ${JSON.stringify(requestIds)}`);
const txRef = await getMessagesCollection();
const cursor = await txRef.find({ _id: { $in: msgIds } });
const cursor = await txRef.find({ _id: { $in: requestIds } });
const res = await cursor.toArray();
return toMsgStatus(res);
};

export const getMessages = async (msgIds: number[]): Promise<DbMsg[]> => {
export const getMessages = async (requestsIds: string[]): Promise<DbMsg[]> => {
const txRef = await getMessagesCollection();
const cursor = await txRef.find({ _id: { $in: msgIds } });
const cursor = await txRef.find({ _id: { $in: requestsIds } });
const res = await cursor.toArray();
return res;
};
Expand All @@ -66,6 +77,6 @@ function toMsgStatus(dbMsgs: Partial<DbMsg>[]): MessageStatus[] {
}

interface DbMsg extends MessageStatus {
_id: number;
_id: string;
message: Message;
}
10 changes: 5 additions & 5 deletions examples/server/src/routes/messages.router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ msgRouter.post('/messagesToSign', async (req: Request<{}, {}, MessagesRequest>,
const messagesStatus: MessageStatus[] = await messagesDao.insertMessages(messages);

//the next line we simulate the hsm work and sign the messages.
const msgIds = messagesStatus.map((_) => _.msgId);
await hsmSignService.signMessages(msgIds);
const messagesStatusAfterSign = await messagesDao.getMessagesStatus(msgIds);
const requestIds = messagesStatus.map((_) => _.requestId);
await hsmSignService.signMessages(requestIds);
const messagesStatusAfterSign = await messagesDao.getMessagesStatus(requestIds);

res.status(200).json({ statuses: messagesStatusAfterSign });
});

msgRouter.post(
'/messagesStatus',
async (req: Request<{}, {}, MessagesStatusRequest>, res: Response<MessagesStatusResponse>) => {
const { msgIds } = req.body;
const messagesStatus = await messagesDao.getMessagesStatus(msgIds);
const { requestsIds } = req.body;
const messagesStatus = await messagesDao.getMessagesStatus(requestsIds);
res.status(200).json({ statuses: messagesStatus });
},
);
Expand Down
18 changes: 9 additions & 9 deletions examples/server/src/services/hsm-sign-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { SUPPORTED_ALGORITHMS } from './algorithm-info';
import hsmFacade from './hsm-facade';
import logger from './logger';

export async function randomlySignOrFailMessagesAsync(msgIds: number[]) {
const messages = await getMessages(msgIds);
export async function randomlySignOrFailMessagesAsync(requestsIds: string[]) {
const messages = await getMessages(requestsIds);
messages.forEach((msg) => {
const oneToFiveSeconds = Math.ceil(Math.random() * 5) * 1000;
const algorithm = msg.message.algorithm;
Expand All @@ -19,22 +19,22 @@ export async function randomlySignOrFailMessagesAsync(msgIds: number[]) {
const previousStatus = msg.status;
msg.status = Math.round(Math.random()) ? 'FAILED' : 'SIGNED';
if (msg.status === 'FAILED') {
msg.errorMessage = `Simulate error while signing this message ${msg.msgId}`;
msg.errorMessage = `Simulate error while signing this message ${msg.requestId}`;
}

const { signingDeviceKeyId, data } = msg.message;
if (msg.status === 'SIGNED') {
msg.signedPayload = await hsmFacade.sign(signingDeviceKeyId, data, algorithm);
}
await messagesDao.updateMessageStatus(msg);
logger.info(`Set ${msg.msgId} from status ${previousStatus} to ${msg.status}`);
logger.info(`Set ${msg.requestId} from status ${previousStatus} to ${msg.status}`);
}, oneToFiveSeconds);
});
}

export async function signMessages(msgIds: number[]) {
logger.info(`enter signing messages ${msgIds}`);
const messages = await getMessages(msgIds);
export async function signMessages(requestIds: string[]) {
logger.info(`enter signing messages ${requestIds}`);
const messages = await getMessages(requestIds);
for (const msg of messages) {
const algorithm = msg.message.algorithm;
if (typeof algorithm !== 'string' || !SUPPORTED_ALGORITHMS.includes(algorithm)) {
Expand All @@ -48,14 +48,14 @@ export async function signMessages(msgIds: number[]) {
const { signingDeviceKeyId, data } = msg.message;
msg.signedPayload = await hsmFacade.sign(signingDeviceKeyId, data, algorithm);
msg.status = 'SIGNED';
logger.info(`signed message ${msg.msgId}. signature: ${msg.signedPayload}`);
logger.info(`signed message ${msg.requestId}. signature: ${msg.signedPayload}`);
} catch (e) {
logger.error(e);
msg.status = 'FAILED';
msg.errorMessage = e.toString();
}

await messagesDao.updateMessageStatus(msg);
logger.info(`Set ${msg.msgId} to ${msg.status}`);
logger.info(`Set ${msg.requestId} to ${msg.status}`);
}
}
5 changes: 4 additions & 1 deletion src/customer-server-client.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { afterEach, beforeEach, describe, expect, it, jest } from '@jest/globals';
import Chance from 'chance';
import service from './customer-server-client';
import customerServerApi from './services/customer-server.api';
import { aProofOfOwnershipSignedMessageStatus } from './services/fb-server.api.test';
import messagesService from './services/messages.service';
const c = new Chance();

describe('Customer server client', () => {
beforeEach(() => {
Expand All @@ -22,7 +25,7 @@ describe('Customer server client', () => {
});

it('should fetch tx status every 30 sec', async () => {
jest.spyOn(messagesService, 'getPendingMessages').mockReturnValue([1]);
jest.spyOn(messagesService, 'getPendingMessages').mockReturnValue([{ msgId: c.natural(), messageStatus: aProofOfOwnershipSignedMessageStatus() }]);
//@ts-ignore
jest.spyOn(customerServerApi, 'messagesStatus').mockImplementation(() => {
return {
Expand Down
12 changes: 8 additions & 4 deletions src/customer-server-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import messagesService from './services/messages.service';
class CustomerClient {
pullMessagesStatus = async () => {
try {
const msgIds = messagesService.getPendingMessages();
if (!!msgIds.length) {
const status = await customerServerApi.messagesStatus({ msgIds });
await messagesService.updateStatus(status.statuses);
const statuses = messagesService.getPendingMessages();
const requestsIds = statuses.map((status) => status.messageStatus.requestId);
if (!!requestsIds.length) {
const { statuses: serverStatuses } = await customerServerApi.messagesStatus({ requestsIds });
await messagesService.updateStatus(serverStatuses.map((messageStatus) => ({
msgId: statuses.find((status) => status.messageStatus.requestId === messageStatus.requestId).msgId,
messageStatus,
})));
}
} catch (e) {
logger.error(`Got error from customer server ${e.message}`);
Expand Down
8 changes: 1 addition & 7 deletions src/services/customer-server.api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ describe('Customer Server API', () => {
const messagesToSign = customerServerApiDriver.given.aMessageRequest(msgId, aMessage);
const expectedRes: MessageStatus[] = [
{
msgId,
requestId: aMessage.requestId,
status: 'PENDING_SIGN',
payload: messagesToSign[0].payload,
Expand All @@ -33,12 +32,7 @@ describe('Customer Server API', () => {
export const customerServerApiDriver = {
given: {
aMessageRequest: (msgId: number, message: Message): MessageEnvelop[] => {
return [{ msgId, message, type: 'EXTERNAL_KEY_PROOF_OF_OWNERSHIP_REQUEST', payload: JSON.stringify(message) }];
},
aTxStatusRequest: (msgIds: number[] = []): MessagesStatusRequest => {
return {
msgIds,
};
return [{ msgId, requestId: message.requestId, message, type: 'EXTERNAL_KEY_PROOF_OF_OWNERSHIP_REQUEST', payload: JSON.stringify(message) }];
},
},
mock: {
Expand Down
5 changes: 2 additions & 3 deletions src/services/fb-server.api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ describe('Server API', () => {

export function aProofOfOwnershipSignedMessageStatus(): MessageStatus {
return {
msgId: c.natural(),
requestId: c.guid(),
status: 'SIGNED',
payload: JSON.stringify(messageBuilder.aMessage()),
Expand All @@ -184,7 +183,6 @@ export function aProofOfOwnershipSignedMessageStatus(): MessageStatus {
}
export function aProofOfOwnershipFailedMessageStatus(): MessageStatus {
return {
msgId: c.natural(),
requestId: c.guid(),
status: 'FAILED',
payload: JSON.stringify(messageBuilder.aMessage()),
Expand Down Expand Up @@ -222,9 +220,10 @@ export const messageBuilder = {
payload: fbMessagePayload,
};
},
anMessageEnvelope: (msgId: number, type: TxType, message: Message): MessageEnvelop => {
anMessageEnvelope: (msgId: number, requestId: string, type: TxType, message: Message): MessageEnvelop => {
return {
msgId,
requestId,
payload: JSON.stringify(message),
type,
message,
Expand Down
1 change: 1 addition & 0 deletions src/services/fb-server.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ const fbServerApi = {
ackMessage: async (msgId: number) => {
try {
const accessToken = await fbServerApi.getAccessToken(deviceService.getDeviceData());
logger.info(`Acking message ${msgId}`);
const res = await axios.put(`${MOBILE_GATEWAY_URL}/msg`, { msgId, nack: false }, buildHeaders(accessToken));
return res.data;
} catch (e) {
Expand Down
Loading
Loading