Skip to content

Commit

Permalink
DPLT-1076 create node worker to process real time streams (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley authored Aug 4, 2023
1 parent 4f66ec3 commit c64e862
Show file tree
Hide file tree
Showing 23 changed files with 10,538 additions and 20 deletions.
13 changes: 9 additions & 4 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,21 @@ async fn handle_streamer_message(
set_provisioned_flag(&mut indexer_registry_locked, &indexer_function);
}

storage::sadd(
context.redis_connection_manager,
storage::INDEXER_SET_KEY,
indexer_function.get_full_name(),
)
.await?;
storage::set(
context.redis_connection_manager,
&format!("{}:storage", indexer_function.get_full_name()),
storage::generate_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
)
.await?;

storage::add_to_registered_stream(
storage::xadd(
context.redis_connection_manager,
&format!("{}:stream", indexer_function.get_full_name()),
storage::generate_stream_key(&indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?;
Expand Down
30 changes: 14 additions & 16 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

const STREAMS_SET_KEY: &str = "streams";
pub const INDEXER_SET_KEY: &str = "indexers";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
}

pub fn generate_storage_key(name: &str) -> String {
format!("{}:storage", name)
}

pub fn generate_stream_key(name: &str) -> String {
format!("{}:stream", name)
}

pub async fn connect(redis_connection_str: &str) -> anyhow::Result<ConnectionManager> {
Ok(get_redis_client(redis_connection_str)
.await
Expand Down Expand Up @@ -53,7 +61,7 @@ pub async fn get<V: FromRedisValue + std::fmt::Debug>(
Ok(value)
}

async fn sadd(
pub async fn sadd(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
Expand All @@ -69,16 +77,16 @@ async fn sadd(
Ok(())
}

async fn xadd(
pub async fn xadd(
redis_connection_manager: &ConnectionManager,
stream_key: &str,
stream_key: impl ToRedisArgs + std::fmt::Debug,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields);
tracing::debug!(target: STORAGE, "XADD: {:?}, {:?}", stream_key, fields);

// TODO: Remove stream cap when we finally start processing it
redis::cmd("XTRIM")
.arg(stream_key)
.arg(&stream_key)
.arg("MAXLEN")
.arg(100)
.query_async(&mut redis_connection_manager.clone())
Expand All @@ -97,16 +105,6 @@ async fn xadd(
Ok(())
}

pub async fn add_to_registered_stream(
redis_connection_manager: &ConnectionManager,
key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?;
xadd(redis_connection_manager, key, fields).await?;

Ok(())
}
/// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage.
/// Increments the counter `receipts_{transaction_hash}` by one.
/// The counter holds how many Receipts related to the Transaction are in watching list
Expand Down
32 changes: 32 additions & 0 deletions runner/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module.exports = {
parser: '@typescript-eslint/parser',
env: {
es2021: true,
node: true,
},
overrides: [
{
files: ['.eslintrc.js', 'jest.config.js'],
parser: 'espree',
extends: ['standard'],
rules: {
semi: ['error', 'always'],
'comma-dangle': ['error', 'only-multiline'],
},
},
{
files: ['./src/**/*'],
parserOptions: {
project: './tsconfig.json',
},
extends: [
'standard-with-typescript',
],
rules: {
'@typescript-eslint/semi': ['error', 'always'],
'@typescript-eslint/comma-dangle': ['error', 'only-multiline'],
'@typescript-eslint/strict-boolean-expressions': 'off',
},
},
],
};
2 changes: 2 additions & 0 deletions runner/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
**/dist
/node_modules
4 changes: 4 additions & 0 deletions runner/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node'
};
Loading

0 comments on commit c64e862

Please sign in to comment.