Skip to content

Commit

Permalink
refactor: Unify logic for generating Redis keys (#807)
Browse files Browse the repository at this point in the history
Both the `IndexerState` and `IndexerConfig` structs are used
individually to access Redis, and therefore have duplicate code for
generating the required keys. This is problematic for the following
reasons:
1. Redis access is limited to these structs, anything else requiring
access needs to duplicate the logic again
2. Updates need to be reflected across both structs, creating more room
for error

This PR creates a common Redis `KeyProvider` interface/trait for
accessing Indexer Redis data/keys. Now the required structs, i.e.
`IndexerState` and `IndexerConfig`, can implement this trait, and
automatically get access to the methods which generate the keys.

Also did some minor clean up, removing the old Coordinator migration
code.
  • Loading branch information
morgsmccauley authored Jul 9, 2024
1 parent 8c40b2e commit 680c6c9
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 155 deletions.
1 change: 1 addition & 0 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tonic::transport::channel::Channel;
use tonic::Request;

use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;
use crate::utils::exponential_retry;

#[cfg(not(test))]
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tonic::transport::channel::Channel;
use tonic::Request;

use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;
use crate::utils::exponential_retry;

#[cfg(not(test))]
Expand Down
24 changes: 12 additions & 12 deletions coordinator/src/indexer_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use near_primitives::types::AccountId;
use registry_types::{Rule, StartBlock};

use crate::redis::KeyProvider;

#[derive(Debug, Clone, PartialEq)]
pub struct IndexerConfig {
pub account_id: AccountId,
Expand All @@ -13,6 +15,16 @@ pub struct IndexerConfig {
pub created_at_block_height: u64,
}

impl KeyProvider for IndexerConfig {
fn account_id(&self) -> String {
self.account_id.to_string()
}

fn function_name(&self) -> String {
self.function_name.clone()
}
}

#[cfg(test)]
impl Default for IndexerConfig {
fn default() -> Self {
Expand All @@ -37,18 +49,6 @@ impl IndexerConfig {
format!("{}/{}", self.account_id, self.function_name)
}

pub fn get_redis_stream_key(&self) -> String {
format!("{}:block_stream", self.get_full_name())
}

pub fn get_last_published_block_key(&self) -> String {
format!("{}:last_published_block", self.get_full_name())
}

pub fn get_state_key(&self) -> String {
format!("{}:state", self.get_full_name())
}

pub fn get_registry_version(&self) -> u64 {
self.updated_at_block_height
.unwrap_or(self.created_at_block_height)
Expand Down
80 changes: 7 additions & 73 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context;
use near_primitives::types::AccountId;

use crate::indexer_config::IndexerConfig;
use crate::redis::RedisClient;
use crate::redis::{KeyProvider, RedisClient};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum ProvisionedState {
Expand All @@ -15,14 +15,6 @@ pub enum ProvisionedState {
Failed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct OldIndexerState {
pub account_id: AccountId,
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
pub enabled: bool,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct IndexerState {
pub account_id: AccountId,
Expand All @@ -32,16 +24,13 @@ pub struct IndexerState {
pub provisioned_state: ProvisionedState,
}

// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to
// construct the state key without it. But, this isn't ideal as we now have two places which
// define this key - we need to consolidate these somehow.
impl IndexerState {
pub fn get_state_key(&self) -> String {
format!("{}/{}:state", self.account_id, self.function_name)
impl KeyProvider for IndexerState {
fn account_id(&self) -> String {
self.account_id.to_string()
}

pub fn get_redis_stream_key(&self) -> String {
format!("{}/{}:block_stream", self.account_id, self.function_name)
fn function_name(&self) -> String {
self.function_name.clone()
}
}

Expand All @@ -60,39 +49,6 @@ impl IndexerStateManagerImpl {
Self { redis_client }
}

pub async fn migrate(&self) -> anyhow::Result<()> {
let raw_states = self.redis_client.list_indexer_states().await?;

for raw_state in raw_states {
if let Ok(state) = serde_json::from_str::<IndexerState>(&raw_state) {
tracing::info!(
"{}/{} already migrated, skipping",
state.account_id,
state.function_name
);
continue;
}

tracing::info!("Migrating {}", raw_state);

let old_state: OldIndexerState = serde_json::from_str(&raw_state)?;

let state = IndexerState {
account_id: old_state.account_id,
function_name: old_state.function_name,
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: old_state.enabled,
provisioned_state: ProvisionedState::Provisioned,
};

self.redis_client
.set(state.get_state_key(), serde_json::to_string(&state)?)
.await?;
}

Ok(())
}

fn get_default_state(&self, indexer_config: &IndexerConfig) -> IndexerState {
IndexerState {
account_id: indexer_config.account_id.clone(),
Expand Down Expand Up @@ -233,28 +189,6 @@ mod tests {
use mockall::predicate;
use registry_types::{Rule, StartBlock, Status};

#[tokio::test]
async fn migrate() {
let config = IndexerConfig::default();
let mut mock_redis_client = RedisClient::default();
mock_redis_client
.expect_list_indexer_states()
.returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true }).to_string()]))
.once();
mock_redis_client
.expect_set::<String, String>()
.with(
predicate::eq(config.get_state_key()),
predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":200,\"enabled\":true,\"provisioned_state\":\"Provisioned\"}".to_string()),
)
.returning(|_, _| Ok(()))
.once();

let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client);

indexer_manager.migrate().await.unwrap();
}

#[tokio::test]
async fn list_indexer_states() {
let mut mock_redis_client = RedisClient::default();
Expand Down Expand Up @@ -300,7 +234,7 @@ mod tests {
))
});
redis_client
.expect_set_indexer_state()
.expect_set_indexer_state::<IndexerConfig>()
.with(
predicate::always(),
predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()),
Expand Down
2 changes: 0 additions & 2 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ async fn main() -> anyhow::Result<()> {
async move { server::init(grpc_port, indexer_state_manager, registry).await }
});

indexer_state_manager.migrate().await?;

loop {
tokio::try_join!(synchroniser.sync(), sleep(CONTROL_LOOP_THROTTLE_SECONDS))?;
}
Expand Down
124 changes: 68 additions & 56 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,26 @@ use std::fmt::Debug;
use anyhow::Context;
use redis::{aio::ConnectionManager, FromRedisValue, ToRedisArgs};

use crate::{indexer_config::IndexerConfig, indexer_state::IndexerState};
pub trait KeyProvider {
fn account_id(&self) -> String;
fn function_name(&self) -> String;

fn prefix(&self) -> String {
format!("{}/{}", self.account_id(), self.function_name())
}

fn get_redis_stream_key(&self) -> String {
format!("{}:block_stream", self.prefix())
}

fn get_last_published_block_key(&self) -> String {
format!("{}:last_published_block", self.prefix())
}

fn get_state_key(&self) -> String {
format!("{}:state", self.prefix())
}
}

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
Expand Down Expand Up @@ -119,60 +138,48 @@ impl RedisClientImpl {
.context(format!("SADD {set:?} {member:?}"))
}

pub async fn exists<K>(&self, key: K) -> anyhow::Result<bool>
pub async fn get_last_published_block<P>(&self, key_provider: &P) -> anyhow::Result<Option<u64>>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
P: KeyProvider + 'static,
{
tracing::debug!("EXISTS {key:?}");

redis::cmd("EXISTS")
.arg(&key)
.query_async(&mut self.connection.clone())
self.get::<_, u64>(key_provider.get_last_published_block_key())
.await
.context(format!("EXISTS {key:?}"))
}

pub async fn indexer_states_set_exists(&self) -> anyhow::Result<bool> {
self.exists(Self::INDEXER_STATES_SET).await
}

pub async fn get_last_published_block(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.get::<_, u64>(indexer_config.get_last_published_block_key())
.await
}

pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
let stream_key = indexer_config.get_redis_stream_key();
pub async fn clear_block_stream<P>(&self, key_provider: &P) -> anyhow::Result<()>
where
P: KeyProvider + 'static,
{
let stream_key = key_provider.get_redis_stream_key();
self.del(stream_key.clone())
.await
.context(format!("Failed to clear Redis Stream: {}", stream_key))
}

pub async fn get_indexer_state(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<String>> {
self.get(indexer_config.get_state_key()).await
pub async fn get_indexer_state<P>(&self, key_provider: &P) -> anyhow::Result<Option<String>>
where
P: KeyProvider + 'static,
{
self.get(key_provider.get_state_key()).await
}

pub async fn set_indexer_state(
&self,
indexer_config: &IndexerConfig,
state: String,
) -> anyhow::Result<()> {
self.set(indexer_config.get_state_key(), state).await?;
pub async fn set_indexer_state<P>(&self, key_provider: &P, state: String) -> anyhow::Result<()>
where
P: KeyProvider + 'static,
{
self.set(key_provider.get_state_key(), state).await?;

self.sadd(Self::INDEXER_STATES_SET, indexer_config.get_state_key())
self.sadd(Self::INDEXER_STATES_SET, key_provider.get_state_key())
.await
}

pub async fn delete_indexer_state(&self, state: &IndexerState) -> anyhow::Result<()> {
self.del(state.get_state_key()).await?;
pub async fn delete_indexer_state<P>(&self, key_provider: &P) -> anyhow::Result<()>
where
P: KeyProvider + 'static,
{
self.del(key_provider.get_state_key()).await?;

self.srem(Self::INDEXER_STATES_SET, state.get_state_key())
self.srem(Self::INDEXER_STATES_SET, key_provider.get_state_key())
.await
}

Expand Down Expand Up @@ -202,30 +209,34 @@ mockall::mock! {
pub RedisClientImpl {
pub async fn connect(redis_url: &str) -> anyhow::Result<Self>;

pub async fn get_indexer_state(&self, indexer_config: &IndexerConfig) -> anyhow::Result<Option<String>>;
pub async fn get_indexer_state<P>(&self, key_provider: &P) -> anyhow::Result<Option<String>>
where P: KeyProvider + 'static;

pub async fn set_indexer_state(
pub async fn set_indexer_state<P>(
&self,
indexer_config: &IndexerConfig,
key_provider: &P,
state: String,
) -> anyhow::Result<()>;
) -> anyhow::Result<()>
where P: KeyProvider + 'static;

pub async fn get_last_published_block(
pub async fn get_last_published_block<P>(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>>;
key_provider: &P,
) -> anyhow::Result<Option<u64>>
where P: KeyProvider + 'static;

pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()>;
pub async fn clear_block_stream<P>(&self, key_provider: &P) -> anyhow::Result<()>
where P: KeyProvider + 'static;

pub async fn get<T, U>(&self, key: T) -> anyhow::Result<Option<U>>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: FromRedisValue + Debug + 'static;
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: FromRedisValue + Debug + 'static;

pub async fn set<K, V>(&self, key: K, value: V) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static;
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static;

pub async fn del<K>(&self, key: K) -> anyhow::Result<()>
where
Expand All @@ -234,13 +245,14 @@ mockall::mock! {
pub async fn indexer_states_set_exists(&self) -> anyhow::Result<bool>;

pub async fn sadd<S, V>(&self, set: S, value: V) -> anyhow::Result<()>
where
S: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static;
where
S: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static;

pub async fn list_indexer_states(&self) -> anyhow::Result<Vec<String>>;

pub async fn delete_indexer_state(&self, state: &IndexerState) -> anyhow::Result<()>;
pub async fn delete_indexer_state<P>(&self, key_provider: &P) -> anyhow::Result<()>
where P: KeyProvider + 'static;
}

impl Clone for RedisClientImpl {
Expand Down
2 changes: 2 additions & 0 deletions coordinator/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl IndexerRegistry {
Self(slice.iter().cloned().collect())
}

#[cfg(test)]
pub fn new() -> Self {
Self(HashMap::new())
}
Expand All @@ -34,6 +35,7 @@ impl IndexerRegistry {
}
}

#[cfg(test)]
pub fn get(&self, account_id: &AccountId, function_name: &str) -> Option<&IndexerConfig> {
self.0.get(account_id)?.get(function_name)
}
Expand Down
Loading

0 comments on commit 680c6c9

Please sign in to comment.