Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPLT-1076 create node worker to process real time streams #159

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
75d9e6f
feat: Bootstrap `runner` repo
morgsmccauley Aug 1, 2023
f26b675
chore: Add `indexer.js`
morgsmccauley Aug 1, 2023
8f00884
refactor: Migrate `indexer.js` to TS
morgsmccauley Aug 1, 2023
183aa52
chore: Remove AWSXray
morgsmccauley Aug 1, 2023
92d9c65
chore: Remove AWS `Metrics`
morgsmccauley Aug 1, 2023
21af461
chore: Add `hasura-client.js`
morgsmccauley Aug 1, 2023
e9cff96
refactor: Migrate `hasura-client.js` to TS
morgsmccauley Aug 1, 2023
a760f0a
chore: Add `provisioner.js`
morgsmccauley Aug 1, 2023
75dd6f2
refactor: Migrate `provisioner.js` to TS
morgsmccauley Aug 1, 2023
751beb3
test: Provisioner
morgsmccauley Aug 1, 2023
acd212e
chore: Add `pg-client.ts`
morgsmccauley Aug 1, 2023
1eac6c4
test: HasuraClient
morgsmccauley Aug 1, 2023
54a918f
test: Indexer
morgsmccauley Aug 1, 2023
43200a4
fix: Update incorrect track foreign key req payload
morgsmccauley Aug 1, 2023
d25c963
fix: Incorrect condition conversions after TS migration
morgsmccauley Aug 1, 2023
36ca4bf
refactor: Remove unnecessary condition conversions
morgsmccauley Aug 1, 2023
096c53c
refactor: Move classes to their own directories
morgsmccauley Aug 1, 2023
40ebfce
feat: Add node app to process redis stream
morgsmccauley Aug 1, 2023
346bb26
chore: Remove logs
morgsmccauley Aug 1, 2023
013f4dd
chore: Separate JS/TS linting config
morgsmccauley Aug 1, 2023
aeaeaef
refactor: Remove unused code in `Indexer`
morgsmccauley Aug 1, 2023
df6673b
feat: Change redis keys to make consumption easier
morgsmccauley Aug 2, 2023
b822642
feat: Update redis keys in runner
morgsmccauley Aug 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,21 @@ async fn handle_streamer_message(
set_provisioned_flag(&mut indexer_registry_locked, &indexer_function);
}

storage::sadd(
context.redis_connection_manager,
storage::INDEXER_SET_KEY,
indexer_function.get_full_name(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing the indexer name alone to make it easier to build the many other keys related to this indexer, i.e. :storage and :stream

)
.await?;
storage::set(
context.redis_connection_manager,
&format!("{}:storage", indexer_function.get_full_name()),
storage::generate_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
)
.await?;

storage::add_to_registered_stream(
storage::xadd(
context.redis_connection_manager,
&format!("{}:stream", indexer_function.get_full_name()),
storage::generate_stream_key(&indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?;
Expand Down
30 changes: 14 additions & 16 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

const STREAMS_SET_KEY: &str = "streams";
pub const INDEXER_SET_KEY: &str = "indexers";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
}

pub fn generate_storage_key(name: &str) -> String {
format!("{}:storage", name)
}

pub fn generate_stream_key(name: &str) -> String {
format!("{}:stream", name)
}

pub async fn connect(redis_connection_str: &str) -> anyhow::Result<ConnectionManager> {
Ok(get_redis_client(redis_connection_str)
.await
Expand Down Expand Up @@ -53,7 +61,7 @@ pub async fn get<V: FromRedisValue + std::fmt::Debug>(
Ok(value)
}

async fn sadd(
pub async fn sadd(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
Expand All @@ -69,16 +77,16 @@ async fn sadd(
Ok(())
}

async fn xadd(
pub async fn xadd(
redis_connection_manager: &ConnectionManager,
stream_key: &str,
stream_key: impl ToRedisArgs + std::fmt::Debug,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields);
tracing::debug!(target: STORAGE, "XADD: {:?}, {:?}", stream_key, fields);

// TODO: Remove stream cap when we finally start processing it
redis::cmd("XTRIM")
.arg(stream_key)
.arg(&stream_key)
.arg("MAXLEN")
.arg(100)
.query_async(&mut redis_connection_manager.clone())
Expand All @@ -97,16 +105,6 @@ async fn xadd(
Ok(())
}

pub async fn add_to_registered_stream(
redis_connection_manager: &ConnectionManager,
key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?;
xadd(redis_connection_manager, key, fields).await?;

Ok(())
}
/// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage.
/// Increments the counter `receipts_{transaction_hash}` by one.
/// The counter holds how many Receipts related to the Transaction are in watching list
Expand Down
32 changes: 32 additions & 0 deletions runner/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module.exports = {
parser: '@typescript-eslint/parser',
env: {
es2021: true,
node: true,
},
overrides: [
{
files: ['.eslintrc.js', 'jest.config.js'],
parser: 'espree',
extends: ['standard'],
rules: {
semi: ['error', 'always'],
'comma-dangle': ['error', 'only-multiline'],
},
},
{
files: ['./src/**/*'],
parserOptions: {
project: './tsconfig.json',
},
extends: [
'standard-with-typescript',
],
rules: {
'@typescript-eslint/semi': ['error', 'always'],
'@typescript-eslint/comma-dangle': ['error', 'only-multiline'],
'@typescript-eslint/strict-boolean-expressions': 'off',
},
},
],
};
2 changes: 2 additions & 0 deletions runner/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
**/dist
/node_modules
4 changes: 4 additions & 0 deletions runner/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node'
};
Loading
Loading