Skip to content

Commit

Permalink
fix: Resolve duplicate processing of messages in Runner (#545)
Browse files Browse the repository at this point in the history
It was observed that Runner would execute on the same block multiple
times. This was verified again when @Kevin101Zhang saw that the
components indexer, which he modified, incorrectly incremented the star
counter.

I noticed some indexers such as sweat_blockheight very rarely had
duplicate runs. To validate if Runner was the problem, I shut down
Runner in dev and triggered some social feed interactions. I verified
that the block only appeared in Redis once, indicating Coordinator was
not the problem. When I started Runner again, the problem appeared
again.

After that, I worked on replicating the issue locally. I built up 3
messages on a social feed indexer and modified Runner to write its
executions to a file. Afterward, I ran Runner. I searched the file and
found that the 3 blocks actually appeared in sequence like so: block 1,
block 2, block 3, block 1, block 2, block 3... and so on. This seems to
indicate duplicate workers was the problem but instead that the same
message was being read into the block array after they were all read.

Finally, with that I found the problem. The way Runner fills its array
of S3 promises, it reads a stream message and increments a stored
message ID. Subsequent stream fetches specify to fetch messages after
that stream ID. This is needed as deletion of stream messages can only
take place after the message is SUCCESSFULLY processed, to avoid
problems if Runner was reset.

However, the code which handles the case where no messages are available
in the stream reset that counting stream ID to '0', as a mechanism to
ensure messages that were somehow skipped are definitely read. This
resetting of the ID ended up being the cause. To illustrate, here's a
scenario:

1. The stream has 3 messages in it. 
2. Runner starts with 0 and then reads all 3 messages. The counting
stream ID is the ID of the last read message + 1.
3. The messages are added to the array as promises.
4. The first message begins to process. 
5. The producer loop sees that no messages are found anymore. 
6. **The producer waits 100ms and then resets the ID.**
7. The producer, while the first message was processing, fetches the
same 3 messages again.
8. This will repeat as long as no new messages appear, while messages
are in the promise array but not processed.

The fix is simple: Remove the resetting of the stream ID. It is no
longer necessary as the problem of remaining messages in the stream was
fixed back in December.
  • Loading branch information
darunrs authored Feb 1, 2024
1 parent 593f305 commit 10e2963
Showing 1 changed file with 0 additions and 1 deletion.
1 change: 0 additions & 1 deletion runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount);
if (messages == null) {
await sleep(100);
streamMessageStartId = '0';
continue;
}

Expand Down

0 comments on commit 10e2963

Please sign in to comment.