Skip to content

Commit

Permalink
feat: Auto migrate indexers to Control Plane (#527)
Browse files Browse the repository at this point in the history
This PR builds on the Redis `allowlist` to auto migrate indexers from
the existing infrastructure to the Control Plane. An account migration
requires coordination between both the V1 and V2 architecture - the
indexer must be removed/ignored from V1, and then correctly configured
within V2.

## Allowlist shape
Each `allowlist` entry now contain the following properties:
```rs
pub struct AllowlistEntry {
    account_id: AccountId, // The account which should be migrated
    v1_ack: bool,          // True if Coordinator V1 has acknowledged the entry
    migrated: bool,        // True if the migration was successful
    failed: bool,          // True if the migration failed
}
```

## Coordinator V1
For Coordinator V1, the `allowlist` is really a Denylist, the code/types
have therefore been named as such. Accounts within the "denylist" should
be ignored completely by V1. Because we cannot guarantee the timing of
when this "ignore" actually happens, a flag (`v1_ack`) will be set from
V1. V2 will only take over once this flag has been set.

Accounts within the "denylist" will be filtered out of the in-memory
registry. Any new indexer "registrations" will also be ignored.
In-progress historical backfills haven't been considered as we'll
disable this functionality anyway.

## Coordinator V2
Once acknowledged by V1, Coordinator V2 will attempt to migrate all
functions under the relevant account. The steps for migration are:
1. Remove the streams from the Redis `streams` set - preventing Runner
from starting these indexers implicitly
2. Stop the existing V1 executors which have already been started via
the `streams` set
3. Merge the existing historical (if it exists) and real-time streams

Once migrated, accounts which have `v1_ack && migrated && !failed` will
be exposed to the control loop, prompting V2 to act on these indexers.

### `migrated` flag
For now, the `migrated` flag will not be set on success, preventing V2
from running the indexer on the new architecture. There are some issues
around V2 continuing from the right block correctly, so it's best to not
run them for now. This allows us to test the migration in isolation, not
worrying about what V2 does after that. I'll add this logic back in once
#536 is complete.

### `failed` flag
If any part of the migration fails, the `failed` flag will be set for
that account. It would take a significant amount of time to cover all
the edge cases in code so it would be faster to just set this flag to
ignore the account, fix the migration manually and then reset the
`failed` flag.
  • Loading branch information
morgsmccauley committed Feb 6, 2024
1 parent 2c28dc3 commit 9ef1f29
Show file tree
Hide file tree
Showing 7 changed files with 754 additions and 43 deletions.
43 changes: 12 additions & 31 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::time::Duration;

use anyhow::Context;
use near_primitives::types::AccountId;
use tokio::time::sleep;
use tracing_subscriber::prelude::*;
Expand All @@ -12,6 +11,7 @@ use crate::registry::{IndexerRegistry, Registry};

mod block_streams_handler;
mod executors_handler;
mod migration;
mod redis;
mod registry;
mod utils;
Expand Down Expand Up @@ -53,8 +53,18 @@ async fn main() -> anyhow::Result<()> {
loop {
let indexer_registry = registry.fetch().await?;

let allowlist = migration::fetch_allowlist(&redis_client).await?;

migration::migrate_pending_accounts(
&indexer_registry,
&allowlist,
&redis_client,
&executors_handler,
)
.await?;

let indexer_registry =
filter_registry_by_allowlist(indexer_registry, &redis_client).await?;
migration::filter_registry_by_allowlist(indexer_registry, &allowlist).await?;

tokio::try_join!(
synchronise_executors(&indexer_registry, &executors_handler),
Expand All @@ -67,35 +77,6 @@ async fn main() -> anyhow::Result<()> {
}
}

#[derive(serde::Deserialize, Debug)]
struct AllowListEntry {
account_id: AccountId,
}

type AllowList = Vec<AllowListEntry>;

async fn filter_registry_by_allowlist(
indexer_registry: IndexerRegistry,
redis_client: &RedisClient,
) -> anyhow::Result<IndexerRegistry> {
let raw_allowlist: String = redis_client.get(String::from("allowlist")).await?;
let allowlist: AllowList =
serde_json::from_str(&raw_allowlist).context("Failed to parse allowlist")?;

let filtered_registry = indexer_registry
.into_iter()
.filter(|(account_id, _)| {
allowlist
.iter()
.any(|entry| entry.account_id == *account_id)
})
.collect();

tracing::debug!("Using filtered registry: {:#?}", filtered_registry);

Ok(filtered_registry)
}

async fn synchronise_executors(
indexer_registry: &IndexerRegistry,
executors_handler: &ExecutorsHandler,
Expand Down
Loading

0 comments on commit 9ef1f29

Please sign in to comment.