Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prod Release 09/11/23 #376

Merged
merged 6 commits into from
Nov 9, 2023
Merged

Prod Release 09/11/23 #376

merged 6 commits into from
Nov 9, 2023

Conversation

morgsmccauley
Copy link
Collaborator

@morgsmccauley morgsmccauley commented Nov 9, 2023

morgsmccauley and others added 5 commits November 8, 2023 07:13
Doing some clean-up to make the development of #200 a bit easier. Now
that we have moved completely to Redis, this PR removes all SQS
code/references from Coordinator.

Additionally, I did some small refactoring to `QueryApiContext`, adding
both `s3_client` and `json_rpc_client`, allowing a single instance of
each to be passed around and used throughout the code. Previously, many
`s3_client`s were being instantiated in various parts of the code. This
removed the dependancy on `Opts` from Historical Backfill, so that has
also been removed.

Finally, I fixed all Clippy errors :)
This PR updates Coordinator such that **only one** Historical Backfill
process can exist for a given Indexer Function. Currently, these
processes do not have awareness of each other, meaning a developer can
trigger multiple simultaneous backfills, with them all writing to the
same Redis Stream. The major changes are detailed below.

## Streamer/Historical backfill tasks
In this PR, I've introduced the `historical_block_processing::Streamer`
struct. This is used to encapsulate the async task which is spawned for
handling the Historical Backfill. I went with "Streamer", as I envision
this becoming the single process which handles both historical/real-time
messages for a given Indexer, as described in
#216 (comment).

`Streamer` is responsible for starting the `async` task, and provides a
basic API for interacting with it. Right now, that only includes
cancelling it. Cancelling a `Streamer` will drop the existing future,
preventing any new messages from being added, and then delete the Redis
Stream removing all existing messages.

## Indexer Registry Mutex/Async
`indexer_registry` is the in-memory representation of the given Indexer
Registry. Currently, we pass around the `MutexGuard` to mutate this
data, resulting in the lock being held for much longer than is needed,
blocking all other consumers from taking the lock. There isn't much
contention for this lock, so it isn't a problem right now, but could end
up being one in future.

Adding in the `Streamer` `Mutex` made mitigating this problem trivial,
so I went ahead and made some changes. I've updated the code so that we
pass around the `Mutex` itself, rather than the guard, allowing us to
the hold the lock for the least amount of time possible. This required
updating most methods within `indexer_registry` to `async`, so that we
could take the `async` `lock`.

With the `Mutex` being used rather than the `MutexGuard`, it seemed
sensible to add `indexer_registry` to `QueryAPIContext`, rather than
passing it round individually.
Historical streamer messages are not fetched in coordinator prior to the
IndexerRunner call. As a result, we cannot apply the latency saving
benefits of having coordinator cache the streamer message for use by
runner. Instead, we want to pre fetch from S3 so that runner invocations
don't have to wait for the streamer message to be retrieved from S3.

In addition, it's possible for real-time messages to backup temporarily
preventing the cached message from being used. So, we also want to
prefetch any messages which aren't found in the cache.

The new workflow works by having two loops for each worker thread: a
producer and a consumer. The producer loads a promise for fetching the
block (either from cache or S3) into an array. The consumer then removes
the first element from the array and processes it, deleting the streamer
message upon success. While one block is being processed, the other
blocks are being fetched. This ensures that wait time is minimal. The
producer loop attempts to keep the array as close to full as possible.
…375)

On cancelling an existing Historical Process the Redis Stream is
cleared/deleted, but this is only the case if the async task is still
active and the cancellation token is consumed. This leads to the
following potential problems:
- When restarting Coordinator, if any Historical Processes are currently
active, the task and cancellation token will be lost, meaning the new
process will start without clearing the existing Stream
- When any branch within `tokio::select!` completes, all others are
cancelled. Therefore, if the Historical Process finishes successfully,
the cancellation branch will never be executed, meaning the Stream won't
be cleared. This becomes a problem if the Redis Stream still contains
messages when the next process is kicked off.

This PR updates cancellation so that the Stream is _always_ cleared when
a new Historical Process is kicked off.
@morgsmccauley morgsmccauley requested a review from a team as a code owner November 9, 2023 02:18
@morgsmccauley morgsmccauley merged commit 46a91ba into stable Nov 9, 2023
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants