Skip to content

Commit

Permalink
fix: Various Control Plane migration fixes (#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Feb 6, 2024
1 parent fd64a8c commit 305fc0d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 20 deletions.
6 changes: 3 additions & 3 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async fn synchronise_block_streams(
indexer_config.start_block_height
{
start_block_height
} else if let Ok(last_published_block) = redis_client
} else if let Ok(Some(last_published_block)) = redis_client
.get::<String, u64>(format!(
"{}:last_published_block",
indexer_config.get_full_name()
Expand Down Expand Up @@ -476,7 +476,7 @@ mod tests {
let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| Ok(500));
.returning(|_| Ok(Some(500)));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler.expect_list().returning(|| Ok(vec![]));
Expand Down Expand Up @@ -530,7 +530,7 @@ mod tests {
let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| Ok(500));
.returning(|_| Ok(Some(500)));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler.expect_list().returning(|| {
Expand Down
39 changes: 35 additions & 4 deletions coordinator/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ pub struct AllowlistEntry {
v1_ack: bool,
migrated: bool,
failed: bool,
v2_control: bool,
}

pub type Allowlist = Vec<AllowlistEntry>;

pub async fn fetch_allowlist(redis_client: &RedisClient) -> anyhow::Result<Allowlist> {
let raw_allowlist: String = redis_client.get(RedisClient::ALLOWLIST).await?;
let raw_allowlist: String = redis_client
.get(RedisClient::ALLOWLIST)
.await?
.ok_or(anyhow::anyhow!("Allowlist doesn't exist"))?;

serde_json::from_str(&raw_allowlist).context("Failed to parse allowlist")
}

Expand All @@ -31,7 +36,11 @@ pub async fn filter_registry_by_allowlist(
.into_iter()
.filter(|(account_id, _)| {
allowlist.iter().any(|entry| {
entry.account_id == *account_id && entry.v1_ack && entry.migrated && !entry.failed
entry.account_id == *account_id
&& entry.v1_ack
&& entry.migrated
&& !entry.failed
&& entry.v2_control
})
})
.collect();
Expand Down Expand Up @@ -104,8 +113,7 @@ async fn migrate_account(
.context("Failed to merge streams")?;
}

// TODO Uncomment when V2 correctly continues from V1 stop point
// set_migrated_flag(redis_client, account_id)?;
set_migrated_flag(redis_client, account_id)?;

tracing::info!("Finished migrating {}", account_id);

Expand All @@ -125,6 +133,9 @@ async fn remove_from_streams_set(
)
.await?
.is_some()
&& redis_client
.exists(indexer_config.get_historical_redis_stream())
.await?
{
result.push(indexer_config.get_historical_redis_stream());
}
Expand All @@ -136,6 +147,9 @@ async fn remove_from_streams_set(
)
.await?
.is_some()
&& redis_client
.exists(indexer_config.get_real_time_redis_stream())
.await?
{
result.push(indexer_config.get_real_time_redis_stream());
};
Expand Down Expand Up @@ -303,6 +317,7 @@ mod tests {
v1_ack: true,
migrated: true,
failed: false,
v2_control: false,
}];

let redis_client = RedisClient::default();
Expand All @@ -327,6 +342,7 @@ mod tests {
v1_ack: true,
migrated: true,
failed: false,
v2_control: false,
}];

let redis_client = RedisClient::default();
Expand Down Expand Up @@ -374,6 +390,7 @@ mod tests {
v1_ack: true,
migrated: false,
failed: false,
v2_control: false,
}];

let mut redis_client = RedisClient::default();
Expand All @@ -393,6 +410,20 @@ mod tests {
)
.returning(|_, _| Ok(Some(())))
.once();
redis_client
.expect_exists::<String>()
.with(predicate::eq(String::from(
"morgs.near/test:historical:stream",
)))
.returning(|_| Ok(true))
.once();
redis_client
.expect_exists::<String>()
.with(predicate::eq(String::from(
"morgs.near/test:real_time:stream",
)))
.returning(|_| Ok(true))
.once();
redis_client
.expect_rename::<String, String>()
.with(
Expand Down
55 changes: 42 additions & 13 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ impl RedisClientImpl {
})
}

pub async fn get<T, U>(&self, key: T) -> anyhow::Result<U>
pub async fn get<T, U>(&self, key: T) -> anyhow::Result<Option<U>>
where
T: ToRedisArgs + Debug + 'static,
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: FromRedisValue + Debug + 'static,
{
let value = redis::cmd("GET")
.arg(&key)
.query_async(&mut self.connection.clone())
let value: Option<U> = self
.connection
.clone()
.get(&key)
.await
.map_err(|e| anyhow::format_err!(e))?;
.context(format!("GET: {key:?}"))?;

tracing::debug!("GET: {:?}={:?}", key, value);

Expand All @@ -57,7 +58,11 @@ impl RedisClientImpl {
{
tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key);

self.connection.clone().rename(old_key, new_key).await?;
self.connection
.clone()
.rename(&old_key, &new_key)
.await
.context(format!("RENAME: {old_key:?} {new_key:?}"))?;

Ok(())
}
Expand All @@ -69,11 +74,12 @@ impl RedisClientImpl {
{
tracing::debug!("SREM: {:?}={:?}", key, value);

match self.connection.clone().srem(key, value).await {
match self.connection.clone().srem(&key, &value).await {
Ok(1) => Ok(Some(())),
Ok(_) => Ok(None),
Err(e) => Err(anyhow::format_err!(e)),
}
.context(format!("SREM: {key:?} {value:?}"))
}

pub async fn xread<K, V>(
Expand All @@ -92,11 +98,12 @@ impl RedisClientImpl {
.connection
.clone()
.xread_options(
&[key],
&[start_id],
&[&key],
&[&start_id],
&streams::StreamReadOptions::default().count(count),
)
.await?;
.await
.context(format!("XREAD {key:?} {start_id:?} {count:?}"))?;

if results.keys.is_empty() {
return Ok([].to_vec());
Expand All @@ -112,7 +119,11 @@ impl RedisClientImpl {
{
tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields);

self.connection.clone().xadd(key, "*", fields).await?;
self.connection
.clone()
.xadd(&key, "*", fields)
.await
.context(format!("XADD {key:?} {fields:?}"))?;

Ok(())
}
Expand All @@ -124,11 +135,29 @@ impl RedisClientImpl {
{
tracing::debug!("XDEL: {:?} {:?}", key, id);

self.connection.clone().xdel(key, &[id]).await?;
self.connection
.clone()
.xdel(&key, &[&id])
.await
.context(format!("XDEL {key:?} {id:?}"))?;

Ok(())
}

pub async fn exists<K>(&self, key: K) -> anyhow::Result<bool>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("EXISTS {key:?}");

self.connection
.clone()
.exists(&key)
.await
.map_err(|e| anyhow::format_err!(e))
.context(format!("EXISTS {key:?}"))
}

// `redis::transaction`s currently don't work with async connections, so we have to create a _new_
// blocking connection to atmoically update a value.
pub fn atomic_update<K, O, N, F>(&self, key: K, update_fn: F) -> anyhow::Result<()>
Expand Down
3 changes: 3 additions & 0 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub(crate) struct QueryApiContext<'a> {
struct DenylistEntry {
account_id: AccountId,
v1_ack: bool,
migrated: bool,
failed: bool,
v2_control: bool,
}

type Denylist = Vec<DenylistEntry>;
Expand Down

0 comments on commit 305fc0d

Please sign in to comment.