From be45584b0aa487e034ed89b8231d75525cb67312 Mon Sep 17 00:00:00 2001 From: Jorge Sanmiguel <8038323+jsanmigimeno@users.noreply.github.com> Date: Mon, 22 Jul 2024 11:47:53 +0000 Subject: [PATCH] feat: Allow recovery of passed LZ collector events --- src/collector/layer-zero/layer-zero.utils.ts | 2 +- src/collector/layer-zero/layer-zero.worker.ts | 161 ++++++++++++++++-- src/store/store.lib.ts | 16 +- 3 files changed, 159 insertions(+), 20 deletions(-) diff --git a/src/collector/layer-zero/layer-zero.utils.ts b/src/collector/layer-zero/layer-zero.utils.ts index 7253c43..c1543c7 100644 --- a/src/collector/layer-zero/layer-zero.utils.ts +++ b/src/collector/layer-zero/layer-zero.utils.ts @@ -52,5 +52,5 @@ export function decodeHeader(encodedHeader: string): LayerZeroHeader { export function calculatePayloadHash(guid: string, message: string): string { const payload = `${guid}${message.slice(2)}`; // 'slice(2)' used to remove the '0x' from the 'message' - return keccak256(payload); + return keccak256(payload).toLowerCase(); } diff --git a/src/collector/layer-zero/layer-zero.worker.ts b/src/collector/layer-zero/layer-zero.worker.ts index 611a36d..f4bb50a 100644 --- a/src/collector/layer-zero/layer-zero.worker.ts +++ b/src/collector/layer-zero/layer-zero.worker.ts @@ -42,12 +42,21 @@ import { LayerZeroEnpointV2Interface, PacketSentEvent } from 'src/contracts/Laye import { STATUS_LOG_INTERVAL } from 'src/logger/logger.service'; import { calculatePayloadHash, decodeHeader, decodePacket } from './layer-zero.utils'; +const ON_PACKET_SENT_PROCESSED_CHANNEL = 'packet_sent_processed'; +const ON_PACKET_SENT_PROCESSED_DELAY = 30 * 1000; +const MAX_PENDING_PAYLOAD_VERIFIED_EVENT_DURATION = 6 * 60 * 60 * 1000; interface LayerZeroPayloadData { messageIdentifier: string, payload: string, } +interface PayloadVerifiedEvent { + timestamp: number, + payloadHash: string, + log: Log, +} + class LayerZeroWorker { private readonly config: LayerZeroWorkerData; @@ -71,6 +80,11 @@ class LayerZeroWorker { private readonly layerZeroChainIdMap: Record; private readonly incentivesAddresses: Record; + // Keep track of unprocessed PayloadVerified events caused by not having processed yet the + // corresponding PacketSent event on the source chain. This is most relevant for when + // recovering past relays. + private readonly pendingPayloadVerifiedEvents: PayloadVerifiedEvent[] = []; + private currentStatus: MonitorStatus | null = null; private monitor: MonitorInterface; @@ -201,6 +215,8 @@ class LayerZeroWorker { `LayerZero collector worker started.`, ); + await this.listenForProcessedPackets(); + this.fromBlock = await this.getStartingBlock(); const stopBlock = this.config.stoppingBlock ?? Infinity; @@ -330,6 +346,65 @@ class LayerZeroWorker { return logs; } + private async listenForProcessedPackets(): Promise { + // Listen for whenever a packet is registered. + await this.store.on( + this.getOnPacketSentChannel(), + (payloadHash: string) => { + // Add a delay to prevent this handler from being executed at the exact same time + // as the `handlePayloadVerifiedEvent()` handler, which can cause this handler to + // search for a pending PayloadVerified event before it's registered. + setTimeout( + () => void this.onProcessedPacketHandler(payloadHash), + ON_PACKET_SENT_PROCESSED_DELAY + ); + } + ) + } + + private async onProcessedPacketHandler( + payloadHash: string + ): Promise { + + this.logger.debug( + { payloadHash }, + `On PacketSent event recovery handler triggered.` + ); + + const pendingPayloadVerifiedEventIndex = this.pendingPayloadVerifiedEvents.findIndex( + (event) => event.payloadHash === payloadHash, + ); + + if (pendingPayloadVerifiedEventIndex == -1) { + return; + } + + this.logger.info( + { payloadHash }, + `Recovering PayloadVerified event.` + ); + + const [pendingEvent] = this.pendingPayloadVerifiedEvents.splice(pendingPayloadVerifiedEventIndex, 1); + + const parsedLog = this.receiveULN302Interface.parseLog(pendingEvent!.log); + + try { + await this.handlePayloadVerifiedEvent( + pendingEvent!.log, + parsedLog! // The log has been previously parsed, `parsedLog` should never be null. + ); + } + catch (error) { + this.logger.error( + { + payloadHash, + error: tryErrorToString(error), + }, + `Error on PayloadVerified event recovery.` + ); + } + } + // Event handlers @@ -463,8 +538,8 @@ class LayerZeroWorker { messageIdentifier, amb: 'layer-zero', - fromChainId: fromChainId.toString(), - toChainId: toChainId.toString(), + fromChainId, + toChainId, fromIncentivesAddress: packet.sender, toIncentivesAddress, @@ -484,17 +559,31 @@ class LayerZeroWorker { await this.store.setAdditionalAMBData( 'layer-zero', - payloadHash.toLowerCase(), + payloadHash, { messageIdentifier, payload: encodedPayload }, ); + + // Broadcast that the PacketSent event has been processed to recover any pending logic + // resulting from PayloadVerified events. + await this.store.postMessage( + this.getOnPacketSentChannel(), + payloadHash + ); } /** * Handles PayloadVerified events. * + * ! A PayloadVerified event is emitted every time a specific packet is verified, but a single + * ! event may not be enough to indicate that the packet is valid. Thus, there may be multiple + * ! events for a single packet, which, depending at the time at which they are processed, can + * ! result in this function submitting the proof for the same packet multiple times. This + * ! undesired side effect is mitigated by the Store's `setAMBProof()` function, which will not + * ! register proofs for the same packet more than once. + * * @param log - The log data. * @param parsedLog - The parsed log description. */ @@ -544,24 +633,22 @@ class LayerZeroWorker { return; } - this.logger.info( - { - transactionHash: log.transactionHash, - payloadHash, - }, - 'PayloadVerified event decoded.', - ); - // Recover the encoded payload data from storage (saved on an earlier PacketSent event). const payloadData = await this.store.getAdditionalAMBData( 'layer-zero', payloadHash.toLowerCase() ); if (!payloadData) { - this.logger.warn( + this.logger.info( { payloadHash }, - 'No payload data found for the given payloadHash.', + 'No payload data found for the given payloadHash. Queueing for recovery for when the payload is available.', ); + + this.queuePendingPayloadVerifiedEvent( + payloadHash, + log, + ); + return; } @@ -581,8 +668,8 @@ class LayerZeroWorker { messageIdentifier: payloadData.messageIdentifier, amb: 'layer-zero', - fromChainId: fromChainId.toString(), - toChainId: toChainId.toString(), + fromChainId, + toChainId, message: payloadData.payload, messageCtx: '0x', @@ -610,6 +697,47 @@ class LayerZeroWorker { ); } } + + private queuePendingPayloadVerifiedEvent( + payloadHash: string, + log: Log, + ): void { + // Prune any old pending events (note that events are stored in chronological order). + const pruneTimestamp = Date.now() - MAX_PENDING_PAYLOAD_VERIFIED_EVENT_DURATION; + + let firstNonStaleIndex; + for ( + firstNonStaleIndex = 0; + firstNonStaleIndex < this.pendingPayloadVerifiedEvents.length; + firstNonStaleIndex++ + ) { + if (this.pendingPayloadVerifiedEvents[firstNonStaleIndex]!.timestamp > pruneTimestamp) { + break; + } + } + + if (firstNonStaleIndex != 0) { + this.pendingPayloadVerifiedEvents.splice(0, firstNonStaleIndex); + } + + + // Register the pending event if not already pending, otherwise update the pending's event + // 'timestamp'. + const alreadyPendingEvent = this.pendingPayloadVerifiedEvents.find((event) => { + event.payloadHash === payloadHash + }); + + if (alreadyPendingEvent != undefined) { + alreadyPendingEvent.timestamp = Date.now(); + } + else { + this.pendingPayloadVerifiedEvents.push({ + timestamp: Date.now(), + payloadHash, + log, + }); + } + } async checkIfVerifiable( @@ -684,6 +812,9 @@ class LayerZeroWorker { throw new Error(`Failed to query the ULN configuration. (dvn: ${dvn}, destination eid: ${dstEid}).`); } + private getOnPacketSentChannel(): string { + return Store.getChannel('layer-zero', ON_PACKET_SENT_PROCESSED_CHANNEL); + } // Misc Helpers diff --git a/src/store/store.lib.ts b/src/store/store.lib.ts index 4577e26..41b5459 100644 --- a/src/store/store.lib.ts +++ b/src/store/store.lib.ts @@ -172,9 +172,9 @@ export class Store { ); } - async on( + async on( channel: string, - callback: (payload: Record) => void, + callback: (payload: T) => void, ) { await this.redisSubscriptions.subscribe(channel); @@ -185,9 +185,9 @@ export class Store { }); } - async onPattern( + async onPattern( pattern: string, - callback: (payload: Record) => void, + callback: (payload: T) => void, ) { await this.redisSubscriptions.psubscribe(pattern); @@ -582,6 +582,14 @@ export class Store { chainId, ambProof.messageIdentifier ); + + const currentProof = await this.get(key); + if (currentProof != undefined) { + //TODO log + // Do not allow proofs to be set multiple times (prevent submitting the same relay more than once). + return; + } + await this.set(key, JSON.stringify(ambProof)); const channel = Store.getOnAMBProofChannel(