Skip to content

Commit

Permalink
feat: Add GCP compatible logging format to Block Streamer (#655)
Browse files Browse the repository at this point in the history
This PR adds
[tracing-stackdriver](https://github.com/NAlexPear/tracing-stackdriver),
which outputs logs in a GCP compatible JSON format. To enable this, the
`GCP_LOGGING_ENABLED` environment variable must be set.

Further, I've added additional context to errors to aid debugging.

near/near-ops#1695
  • Loading branch information
morgsmccauley authored Apr 10, 2024
1 parent a5fc5e9 commit 0cee6fa
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 9 deletions.
26 changes: 26 additions & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-stackdriver = "0.10.0"
tokio = { version = "1.28.0", features = ["full"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
Expand Down
8 changes: 5 additions & 3 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ pub(crate) async fn start_block_stream(
indexer,
redis_stream.clone(),
)
.await?;
.await
.context("Failed during Delta Lake processing")?;

let last_indexed_near_lake_block = process_near_lake_blocks(
last_indexed_delta_lake_block,
Expand All @@ -159,7 +160,8 @@ pub(crate) async fn start_block_stream(
redis_stream,
chain_id,
)
.await?;
.await
.context("Failed during Near Lake processing")?;

tracing::debug!(
last_indexed_block = last_indexed_near_lake_block,
Expand Down Expand Up @@ -192,7 +194,7 @@ async fn process_delta_lake_blocks(
..
} => {
if affected_account_id
.split(",")
.split(',')
.any(|account_id| DELTA_LAKE_SKIP_ACCOUNTS.contains(&account_id.trim()))
{
tracing::debug!(
Expand Down
1 change: 1 addition & 0 deletions block-streamer/src/delta_lake_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl DeltaLakeClientImpl {
.await
}
}
.context("Failed to list matching index files")
}

fn date_from_s3_path(&self, path: &str) -> Option<chrono::NaiveDate> {
Expand Down
14 changes: 10 additions & 4 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ mod test_utils;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
let subscriber = tracing_subscriber::registry()
.with(metrics::LogCounter)
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();
.with(tracing_subscriber::EnvFilter::from_default_env());

if std::env::var("GCP_LOGGING_ENABLED").is_ok() {
subscriber.with(tracing_stackdriver::layer()).init();
} else {
subscriber
.with(tracing_subscriber::fmt::layer().compact())
.init();
}

let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set");
Expand Down
10 changes: 8 additions & 2 deletions block-streamer/src/s3_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![cfg_attr(test, allow(dead_code))]

use anyhow::Context;

const MAX_S3_LIST_REQUESTS: usize = 1000;

#[cfg(test)]
Expand Down Expand Up @@ -60,7 +62,10 @@ impl S3ClientImpl {
}

pub async fn get_text_file(&self, bucket: &str, prefix: &str) -> anyhow::Result<String> {
let object = self.get_object(bucket, prefix).await?;
let object = self
.get_object(bucket, prefix)
.await
.context(format!("Failed to fetch {bucket}/{prefix}"))?;

let bytes = object.body.collect().await?;

Expand All @@ -83,7 +88,8 @@ impl S3ClientImpl {

let list = self
.list_objects(bucket, prefix, continuation_token)
.await?;
.await
.context(format!("Failed to list {bucket}/{prefix}"))?;

if let Some(common_prefixes) = list.common_prefixes {
let keys: Vec<String> = common_prefixes
Expand Down

0 comments on commit 0cee6fa

Please sign in to comment.