diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index 70b4a03ef..297af810a 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -2,6 +2,16 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" +dependencies = [ + "lazy_static", + "regex", +] + [[package]] name = "actix-codec" version = "0.5.2" @@ -872,6 +882,7 @@ dependencies = [ "tonic", "tonic-build", "tracing", + "tracing-stackdriver", "tracing-subscriber", "wildmatch", ] @@ -3758,6 +3769,21 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-stackdriver" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80048836e000e1f058562f01d69cc46f476955bf389c0dc2d2d7edb98ca63ac1" +dependencies = [ + "Inflector", + "serde", + "serde_json", + "thiserror", + "time", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 9805907a5..84c8daf31 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -21,6 +21,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1.0.55" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +tracing-stackdriver = "0.10.0" tokio = { version = "1.28.0", features = ["full"]} tokio-util = "0.7.10" tokio-stream = "0.1.14" diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 6d79a11a2..e00e9a9a0 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -148,7 +148,8 @@ pub(crate) async fn start_block_stream( indexer, redis_stream.clone(), ) - .await?; + .await + .context("Failed during Delta Lake processing")?; let last_indexed_near_lake_block = process_near_lake_blocks( last_indexed_delta_lake_block, @@ -159,7 +160,8 @@ pub(crate) async fn start_block_stream( redis_stream, chain_id, ) - .await?; + .await + .context("Failed during Near Lake processing")?; tracing::debug!( last_indexed_block = last_indexed_near_lake_block, @@ -192,7 +194,7 @@ async fn process_delta_lake_blocks( .. } => { if affected_account_id - .split(",") + .split(',') .any(|account_id| DELTA_LAKE_SKIP_ACCOUNTS.contains(&account_id.trim())) { tracing::debug!( diff --git a/block-streamer/src/delta_lake_client.rs b/block-streamer/src/delta_lake_client.rs index ed0a0d542..d45a7391c 100644 --- a/block-streamer/src/delta_lake_client.rs +++ b/block-streamer/src/delta_lake_client.rs @@ -194,6 +194,7 @@ impl DeltaLakeClientImpl { .await } } + .context("Failed to list matching index files") } fn date_from_s3_path(&self, path: &str) -> Option { diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 9ee08ad74..cf86a0c1e 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -15,11 +15,17 @@ mod test_utils; #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer()) + let subscriber = tracing_subscriber::registry() .with(metrics::LogCounter) - .with(tracing_subscriber::EnvFilter::from_default_env()) - .init(); + .with(tracing_subscriber::EnvFilter::from_default_env()); + + if std::env::var("GCP_LOGGING_ENABLED").is_ok() { + subscriber.with(tracing_stackdriver::layer()).init(); + } else { + subscriber + .with(tracing_subscriber::fmt::layer().compact()) + .init(); + } let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set"); let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set"); diff --git a/block-streamer/src/s3_client.rs b/block-streamer/src/s3_client.rs index 412fd4ee0..0599aa18c 100644 --- a/block-streamer/src/s3_client.rs +++ b/block-streamer/src/s3_client.rs @@ -1,5 +1,7 @@ #![cfg_attr(test, allow(dead_code))] +use anyhow::Context; + const MAX_S3_LIST_REQUESTS: usize = 1000; #[cfg(test)] @@ -60,7 +62,10 @@ impl S3ClientImpl { } pub async fn get_text_file(&self, bucket: &str, prefix: &str) -> anyhow::Result { - let object = self.get_object(bucket, prefix).await?; + let object = self + .get_object(bucket, prefix) + .await + .context(format!("Failed to fetch {bucket}/{prefix}"))?; let bytes = object.body.collect().await?; @@ -83,7 +88,8 @@ impl S3ClientImpl { let list = self .list_objects(bucket, prefix, continuation_token) - .await?; + .await + .context(format!("Failed to list {bucket}/{prefix}"))?; if let Some(common_prefixes) = list.common_prefixes { let keys: Vec = common_prefixes