Skip to content

Commit

Permalink
refactor: Remove SQS references from Coordinator (#365)
Browse files Browse the repository at this point in the history
Doing some clean-up to make the development of #200 a bit easier. Now
that we have moved completely to Redis, this PR removes all SQS
code/references from Coordinator.

Additionally, I did some small refactoring to `QueryApiContext`, adding
both `s3_client` and `json_rpc_client`, allowing a single instance of
each to be passed around and used throughout the code. Previously, many
`s3_client`s were being instantiated in various parts of the code. This
removed the dependancy on `Opts` from Historical Backfill, so that has
also been removed.

Finally, I fixed all Clippy errors :)
  • Loading branch information
morgsmccauley authored Nov 7, 2023
1 parent 35bc949 commit 6b53ca2
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 400 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ With QueryApi you can
## 🧩 Components
1. [QueryApi Coordinator](./indexer)
An Indexer that tracks changes to the QueryApi registry contract. It triggers the execution of those IndexerFunctions
when they match new blocks by placing messages on an SQS queue. Spawns historical processing threads when needed.
when they match new blocks by placing messages on a Redis Stream. Spawns historical processing threads when needed.
1.a. Subfolders provide crates for the different components of the Indexer: indexer_rule_type (shared with registry contract),
indexer_rules_engine, storage.
2. [Indexer Runner](.indexer-js-queue-handler)
Expand Down Expand Up @@ -70,7 +70,6 @@ docker compose up

### Local Configuration
- Coordinator watches the dev registry contract by default (`dev-queryapi.dataplatform.near`). To use a different contract, you can update the `REGISTRY_CONTRACT_ID` environment variable.
- Coodinator will log SQS messages rather than sending them. To use an actual Queue, you can update the `QUEUE_URL` and `START_FROM_BLOCK_QUEUE_URL` environment variables.

### Known Issues

Expand Down
28 changes: 0 additions & 28 deletions indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ This project is using `workspace` feature of Cargo.
Some tests require blocks with matching data. To download the test block, run
`./download_test_blocks.sh 93085141`. Some other useful blocks are 80854399 92476362 93085141 93659695.

To log a message instead of sending SQS messages set your `QUEUE_URL` to `MOCK` in your `.env` file.

## Design concept

Identified major types of the events on the network:
Expand All @@ -46,9 +44,6 @@ Identified major types of the events on the network:
DATABASE_URL=postgres://user:pass@host/database
LAKE_AWS_ACCESS_KEY=AKI_LAKE_ACCESS...
LAKE_AWS_SECRET_ACCESS_KEY=LAKE_SECRET...
QUEUE_AWS_ACCESS_KEY=AKI_SQS_ACCESS...
QUEUE_AWS_SECRET_ACCESS_KEY=SQS_ACCESS_SECRET
QUEUE_URL=https://sqs.eu-central-1.amazonaws.com/754641474505/alertexer-queue
```
## Running locally
Expand Down
4 changes: 2 additions & 2 deletions indexer/indexer_rules_engine/src/matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ fn match_account(
outcome_with_receipt: &IndexerExecutionOutcomeWithReceipt,
) -> bool {
match account_id {
x if x.contains(",") => x
.split(",")
x if x.contains(',') => x
.split(',')
.any(|sub_account_id| match_account(sub_account_id.trim(), outcome_with_receipt)),
_ => {
wildmatch::WildMatch::new(account_id).matches(&outcome_with_receipt.receipt.receiver_id)
Expand Down
1 change: 0 additions & 1 deletion indexer/queryapi_coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,3 @@ unescape = "0.1.0"
aws-types = "0.53.0"
aws-credential-types = "0.53.0"
aws-sdk-s3 = "0.23.0"
aws-sdk-sqs = "0.23.0"
1 change: 0 additions & 1 deletion indexer/queryapi_coordinator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ see terraform scripts https://github.com/near/near-ops/tree/master/provisioning/
This app requires:
* a connection to a database containing "alert" rules to match blocks against;
* a redis server where identifiers of processed blocks are stored;
* a SQS queue to write to.
129 changes: 39 additions & 90 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use crate::indexer_types::{IndexerFunction, IndexerQueueMessage};
use crate::opts::{Opts, Parser};
use crate::queue;
use crate::indexer_types::IndexerFunction;
use crate::s3;
use anyhow::{bail, Context};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use aws_sdk_sqs::Client;
use aws_types::SdkConfig;
use chrono::{DateTime, LocalResult, TimeZone, Utc};
use indexer_rule_type::indexer_rule::MatchingRule;
use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload};
use indexer_rules_engine::types::indexer_rule_match::ChainId;
use near_jsonrpc_client::JsonRpcClient;
use near_jsonrpc_primitives::types::blocks::RpcBlockRequest;
use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, BlockReference};
Expand All @@ -26,16 +21,25 @@ pub fn spawn_historical_message_thread(
block_height: BlockHeight,
new_indexer_function: &IndexerFunction,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> Option<JoinHandle<i64>> {
let redis_connection_manager = redis_connection_manager.clone();
let s3_client = s3_client.clone();
let chain_id = chain_id.clone();
let json_rpc_client = json_rpc_client.clone();

new_indexer_function.start_block_height.map(|_| {
let new_indexer_function_copy = new_indexer_function.clone();
tokio::spawn(async move {
process_historical_messages_or_handle_error(
block_height,
new_indexer_function_copy,
Opts::parse(),
&redis_connection_manager,
&s3_client,
&chain_id,
&json_rpc_client,
)
.await
})
Expand All @@ -45,14 +49,18 @@ pub fn spawn_historical_message_thread(
pub(crate) async fn process_historical_messages_or_handle_error(
block_height: BlockHeight,
indexer_function: IndexerFunction,
opts: Opts,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> i64 {
match process_historical_messages(
block_height,
indexer_function,
opts,
redis_connection_manager,
s3_client,
chain_id,
json_rpc_client,
)
.await
{
Expand All @@ -71,8 +79,10 @@ pub(crate) async fn process_historical_messages_or_handle_error(
pub(crate) async fn process_historical_messages(
block_height: BlockHeight,
indexer_function: IndexerFunction,
opts: Opts,
redis_connection_manager: &storage::ConnectionManager,
s3_client: &S3Client,
chain_id: &ChainId,
json_rpc_client: &JsonRpcClient,
) -> anyhow::Result<i64> {
let start_block = indexer_function.start_block_height.unwrap();
let block_difference: i64 = (block_height - start_block) as i64;
Expand All @@ -95,25 +105,15 @@ pub(crate) async fn process_historical_messages(
indexer_function.function_name
);

let chain_id = opts.chain_id().clone();
let aws_region = opts.aws_queue_region.clone();
let queue_client = queue::queue_client(aws_region, opts.queue_credentials());
let queue_url = opts.start_from_block_queue_url.clone();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();

let json_rpc_client = JsonRpcClient::connect(opts.rpc_url());
let start_date =
lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?;
lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?;

let mut indexer_function = indexer_function.clone();

let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?;
let last_indexed_block = last_indexed_block;
let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?;

let mut blocks_from_index = filter_matching_blocks_from_index_files(
start_block,
&indexer_function,
aws_config,
s3_client,
start_date,
)
.await?;
Expand All @@ -131,15 +131,13 @@ pub(crate) async fn process_historical_messages(
last_indexed_block,
block_height,
&indexer_function,
aws_config,
chain_id.clone(),
s3_client,
chain_id,
)
.await?;

blocks_from_index.append(&mut blocks_between_indexed_and_current_block);

let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block);

if !blocks_from_index.is_empty() {
storage::sadd(
redis_connection_manager,
Expand All @@ -163,30 +161,16 @@ pub(crate) async fn process_historical_messages(
&[("block_height", current_block)],
)
.await?;

send_execution_message(
block_height,
first_block_in_index,
chain_id.clone(),
&queue_client,
queue_url.clone(),
&mut indexer_function,
current_block,
None,
)
.await;
}
}
}
Ok(block_difference)
}

pub(crate) async fn last_indexed_block_from_metadata(
aws_config: &SdkConfig,
s3_client: &S3Client,
) -> anyhow::Result<BlockHeight> {
let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json");
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client: S3Client = S3Client::from_conf(s3_config);
let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?;

let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap();
Expand All @@ -207,7 +191,7 @@ pub(crate) async fn last_indexed_block_from_metadata(
pub(crate) async fn filter_matching_blocks_from_index_files(
start_block_height: BlockHeight,
indexer_function: &IndexerFunction,
aws_config: &SdkConfig,
s3_client: &S3Client,
start_date: DateTime<Utc>,
) -> anyhow::Result<Vec<BlockHeight>> {
let s3_bucket = INDEXED_DATA_FILES_BUCKET;
Expand All @@ -224,7 +208,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files(
needs_dedupe_and_sort = true;
}
s3::fetch_contract_index_files(
aws_config,
s3_client,
s3_bucket,
INDEXED_ACTIONS_FILES_FOLDER,
start_date,
Expand Down Expand Up @@ -304,12 +288,10 @@ async fn filter_matching_unindexed_blocks_from_lake(
last_indexed_block: BlockHeight,
ending_block_height: BlockHeight,
indexer_function: &IndexerFunction,
aws_config: &SdkConfig,
chain_id: ChainId,
s3_client: &S3Client,
chain_id: &ChainId,
) -> anyhow::Result<Vec<u64>> {
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build();
let s3_client: S3Client = S3Client::from_conf(s3_config);
let lake_bucket = lake_bucket_for_chain(chain_id.clone());
let lake_bucket = lake_bucket_for_chain(chain_id);

let indexer_rule = &indexer_function.indexer_rule;
let count = ending_block_height - last_indexed_block;
Expand All @@ -331,11 +313,14 @@ async fn filter_matching_unindexed_blocks_from_lake(
for current_block in (last_indexed_block + 1)..ending_block_height {
// fetch block file from S3
let key = format!("{}/block.json", normalize_block_height(current_block));
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await;
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await;

if s3_result.is_err() {
let error = s3_result.err().unwrap();
if let Some(_) = error.downcast_ref::<aws_sdk_s3::error::NoSuchKey>() {
if error
.downcast_ref::<aws_sdk_s3::error::NoSuchKey>()
.is_some()
{
tracing::info!(
target: crate::INDEXER,
"In manual filtering, skipping block number {} which was not found. For function {:?} {:?}",
Expand All @@ -362,7 +347,7 @@ async fn filter_matching_unindexed_blocks_from_lake(
normalize_block_height(current_block),
shard_id
);
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?;
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?;
match serde_json::from_slice::<near_lake_framework::near_indexer_primitives::IndexerShard>(
shard.as_ref(),
) {
Expand Down Expand Up @@ -401,50 +386,14 @@ async fn filter_matching_unindexed_blocks_from_lake(
Ok(blocks_to_process)
}

fn lake_bucket_for_chain(chain_id: ChainId) -> String {
fn lake_bucket_for_chain(chain_id: &ChainId) -> String {
format!("{}{}", LAKE_BUCKET_PREFIX, chain_id)
}

fn normalize_block_height(block_height: BlockHeight) -> String {
format!("{:0>12}", block_height)
}

async fn send_execution_message(
block_height: BlockHeight,
first_block: BlockHeight,
chain_id: ChainId,
queue_client: &Client,
queue_url: String,
indexer_function: &mut IndexerFunction,
current_block: u64,
payload: Option<IndexerRuleMatchPayload>,
) {
// only request provisioning on the first block
if current_block != first_block {
indexer_function.provisioned = true;
}

let msg = IndexerQueueMessage {
chain_id,
indexer_rule_id: 0,
indexer_rule_name: indexer_function.function_name.clone(),
payload,
block_height: current_block,
indexer_function: indexer_function.clone(),
is_historical: true,
};

match queue::send_to_indexer_queue(queue_client, queue_url, vec![msg]).await {
Ok(_) => {}
Err(err) => tracing::error!(
target: crate::INDEXER,
"#{} an error occurred when sending messages to the queue\n{:#?}",
block_height,
err
),
}
}

// if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks
pub async fn lookup_block_date_or_next_block_date(
block_height: u64,
Expand Down
Loading

0 comments on commit 6b53ca2

Please sign in to comment.