diff --git a/src/connectors/impls/gbq/writer.rs b/src/connectors/impls/gbq/writer.rs index 5460ce0d82..e15cf074ef 100644 --- a/src/connectors/impls/gbq/writer.rs +++ b/src/connectors/impls/gbq/writer.rs @@ -93,7 +93,7 @@ mod tests { table_id: "test".into(), connect_timeout: 1, request_timeout: 1, -// request_size_limit: 10 * 1024 * 1024, + // request_size_limit: 10 * 1024 * 1024, }, }; diff --git a/src/connectors/impls/gbq/writer/sink.rs b/src/connectors/impls/gbq/writer/sink.rs index b649fb3968..a4274d0410 100644 --- a/src/connectors/impls/gbq/writer/sink.rs +++ b/src/connectors/impls/gbq/writer/sink.rs @@ -1088,4 +1088,4 @@ mod test { assert!(result.is_err()); Ok(()) } -} \ No newline at end of file +} diff --git a/src/connectors/impls/gcl/writer/sink.rs b/src/connectors/impls/gcl/writer/sink.rs index dc83733131..e6233f0b51 100644 --- a/src/connectors/impls/gcl/writer/sink.rs +++ b/src/connectors/impls/gcl/writer/sink.rs @@ -205,10 +205,10 @@ impl Sink for GclSink { async fn connect(&mut self, ctx: &SinkContext, _attempt: &Attempt) -> Result { if let Some(logic) = self.mock_logic { info!("{} Mocking connection to Google Cloud Logging", ctx); - self.client = Some(LoggingServiceV2Client::new( - TremorGoogleAuthz::new_mock(logic), - )); - } else { + self.client = Some(LoggingServiceV2Client::new(TremorGoogleAuthz::new_mock( + logic, + ))); + } else { info!("{} Connecting to Google Cloud Logging", ctx); let channel = make_tonic_channel(Duration::from_nanos(self.config.connect_timeout)).await?; @@ -237,18 +237,16 @@ mod test { use super::*; use crate::connectors::impls::gcl; use crate::connectors::tests::ConnectorHarness; + use crate::connectors::utils::quiescence::QuiescenceBeacon; use crate::connectors::ConnectionLostNotifier; - use crate::connectors::{ - utils::quiescence::QuiescenceBeacon, - }; use async_std::channel::bounded; use futures::executor::block_on; use google_api_proto::google::logging::{r#type::LogSeverity, v2::WriteLogEntriesResponse}; use http::{HeaderMap, HeaderValue}; + use tremor_common::ids::SinkId; use tremor_pipeline::CbAction::Trigger; use tremor_pipeline::EventId; use tremor_value::{literal, structurize}; - use tremor_common::ids::SinkId; #[async_std::test] async fn on_event_can_send_an_event() -> Result<()> { diff --git a/src/connectors/impls/gpubsub/consumer.rs b/src/connectors/impls/gpubsub/consumer.rs index 769089867f..c0305c38cd 100644 --- a/src/connectors/impls/gpubsub/consumer.rs +++ b/src/connectors/impls/gpubsub/consumer.rs @@ -31,7 +31,7 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tonic::transport::{Channel, ClientTlsConfig}; -use tonic::{Code}; +use tonic::Code; use tremor_common::blue_green_hashmap::BlueGreenHashMap; use tremor_pipeline::ConfigImpl; @@ -80,7 +80,7 @@ impl ConnectorBuilder for Builder { ) -> Result> { let config = Config::new(raw)?; let url = Url::::parse(config.url.as_str())?; -// let client_id = format!("tremor-{}-{alias}-{:?}", hostname(), task::current().id()); + // let client_id = format!("tremor-{}-{alias}-{:?}", hostname(), task::current().id()); let client_id = "snot".to_string(); Ok(Box::new(GSub { config, @@ -102,7 +102,7 @@ type AsyncTaskMessage = Result<(u64, PubsubMessage)>; struct GSubSource { config: Config, hostname: String, -// client: Option, + // client: Option, receiver: Option>, ack_sender: Option>, task_handle: Option>, @@ -120,7 +120,8 @@ impl GSubSource { "gpubsub-consumer".to_string(), "Missing hostname".to_string(), ) - })?.to_string(); + })? + .to_string(); Ok(GSubSource { config, hostname, @@ -286,7 +287,6 @@ impl Source for GSubSource { task_handle.cancel().await; } - let mut client = SubscriberClient::new(TremorGoogleAuthz::new(channel.clone()).await?); // check that the subscription exists let res = client @@ -301,9 +301,10 @@ impl Source for GSubSource { ); debug!("{ctx} Subscription details {res:?}"); -// let client_background = client.clone(); -// let client_background = SubscriberClient::new(auth_channel.clone()); - let client_background = SubscriberClient::new(TremorGoogleAuthz::new(channel.clone()).await?); + // let client_background = client.clone(); + // let client_background = SubscriberClient::new(auth_channel.clone()); + let client_background = + SubscriberClient::new(TremorGoogleAuthz::new(channel.clone()).await?); let (tx, rx) = async_std::channel::bounded(QSIZE.load(Ordering::Relaxed)); let (ack_tx, ack_rx) = async_std::channel::bounded(QSIZE.load(Ordering::Relaxed)); @@ -323,7 +324,7 @@ impl Source for GSubSource { self.receiver = Some(rx); self.ack_sender = Some(ack_tx); -// self.client = Some(client); + // self.client = Some(client); self.task_handle = Some(join_handle); Ok(true) diff --git a/src/connectors/impls/otel/common.rs b/src/connectors/impls/otel/common.rs index de8376f796..d11d84e885 100644 --- a/src/connectors/impls/otel/common.rs +++ b/src/connectors/impls/otel/common.rs @@ -22,7 +22,7 @@ use simd_json::Builder; use tremor_otelapis::opentelemetry::proto::common::v1::{ any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList, }; -use tremor_value::{StaticNode, Value, literal}; +use tremor_value::{literal, StaticNode, Value}; use value_trait::ValueAccess; pub(crate) struct OtelDefaults; diff --git a/src/connectors/tests/gpubsub/gpub.rs b/src/connectors/tests/gpubsub/gpub.rs index 4dd3d9f216..6a96a1747d 100644 --- a/src/connectors/tests/gpubsub/gpub.rs +++ b/src/connectors/tests/gpubsub/gpub.rs @@ -17,6 +17,11 @@ use crate::connectors::tests::ConnectorHarness; use crate::errors::Result; use crate::instance::State; use async_std::prelude::FutureExt; +use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient; +use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient; +use google_api_proto::google::pubsub::v1::PullRequest; +use google_api_proto::google::pubsub::v1::Subscription; +use google_api_proto::google::pubsub::v1::Topic; use serial_test::serial; use std::collections::HashSet; use std::time::Duration; @@ -26,11 +31,6 @@ use tonic::transport::Channel; use tremor_common::ports::IN; use tremor_pipeline::{Event, EventId}; use tremor_value::{literal, Value}; -use google_api_proto::google::pubsub::v1::Subscription; -use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient; -use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient; -use google_api_proto::google::pubsub::v1::Topic; -use google_api_proto::google::pubsub::v1::PullRequest; // use tremor_common::ids::SinkId; #[async_std::test] diff --git a/src/connectors/tests/gpubsub/gsub.rs b/src/connectors/tests/gpubsub/gsub.rs index 19db314205..e6fba6293c 100644 --- a/src/connectors/tests/gpubsub/gsub.rs +++ b/src/connectors/tests/gpubsub/gsub.rs @@ -15,21 +15,21 @@ use crate::connectors::impls::gpubsub::consumer::Builder; use crate::connectors::tests::ConnectorHarness; use crate::errors::Result; -use std::collections::BTreeMap; +use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient; +use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient; +use google_api_proto::google::pubsub::v1::GetSubscriptionRequest; +use google_api_proto::google::pubsub::v1::PublishRequest; +use google_api_proto::google::pubsub::v1::PubsubMessage; +use google_api_proto::google::pubsub::v1::Subscription; +use google_api_proto::google::pubsub::v1::Topic; use serial_test::serial; +use std::collections::BTreeMap; use testcontainers::clients::Cli; use testcontainers::RunnableImage; use tonic::transport::Channel; use tremor_pipeline::CbAction; use tremor_value::{literal, Value}; use value_trait::ValueAccess; -use google_api_proto::google::pubsub::v1::PubsubMessage; -use google_api_proto::google::pubsub::v1::Subscription; -use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient; -use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient; -use google_api_proto::google::pubsub::v1::GetSubscriptionRequest; -use google_api_proto::google::pubsub::v1::Topic; -use google_api_proto::google::pubsub::v1::PublishRequest; // use tremor_common::ids::SinkId; #[async_std::test]