From 10e2963f4d9986c67d3b920943a5d8f3c46b83c7 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 31 Jan 2024 17:54:27 -0800 Subject: [PATCH] fix: Resolve duplicate processing of messages in Runner (#545) It was observed that Runner would execute on the same block multiple times. This was verified again when @Kevin101Zhang saw that the components indexer, which he modified, incorrectly incremented the star counter. I noticed some indexers such as sweat_blockheight very rarely had duplicate runs. To validate if Runner was the problem, I shut down Runner in dev and triggered some social feed interactions. I verified that the block only appeared in Redis once, indicating Coordinator was not the problem. When I started Runner again, the problem appeared again. After that, I worked on replicating the issue locally. I built up 3 messages on a social feed indexer and modified Runner to write its executions to a file. Afterward, I ran Runner. I searched the file and found that the 3 blocks actually appeared in sequence like so: block 1, block 2, block 3, block 1, block 2, block 3... and so on. This seems to indicate duplicate workers was the problem but instead that the same message was being read into the block array after they were all read. Finally, with that I found the problem. The way Runner fills its array of S3 promises, it reads a stream message and increments a stored message ID. Subsequent stream fetches specify to fetch messages after that stream ID. This is needed as deletion of stream messages can only take place after the message is SUCCESSFULLY processed, to avoid problems if Runner was reset. However, the code which handles the case where no messages are available in the stream reset that counting stream ID to '0', as a mechanism to ensure messages that were somehow skipped are definitely read. This resetting of the ID ended up being the cause. To illustrate, here's a scenario: 1. The stream has 3 messages in it. 2. Runner starts with 0 and then reads all 3 messages. The counting stream ID is the ID of the last read message + 1. 3. The messages are added to the array as promises. 4. The first message begins to process. 5. The producer loop sees that no messages are found anymore. 6. **The producer waits 100ms and then resets the ID.** 7. The producer, while the first message was processing, fetches the same 3 messages again. 8. This will repeat as long as no new messages appear, while messages are in the promise array but not processed. The fix is simple: Remove the resetting of the stream ID. It is no longer necessary as the problem of remaining messages in the stream was fixed back in December. --- runner/src/stream-handler/worker.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 116a6bfe6..311657112 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -69,7 +69,6 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); if (messages == null) { await sleep(100); - streamMessageStartId = '0'; continue; }