Skip to content

Commit

Permalink
Obey fmt
Browse files Browse the repository at this point in the history
Signed-off-by: Darach Ennis <[email protected]>
  • Loading branch information
darach committed Jan 12, 2023
1 parent 3a92a43 commit 165a999
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/connectors/impls/gbq/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ mod tests {
table_id: "test".into(),
connect_timeout: 1,
request_timeout: 1,
// request_size_limit: 10 * 1024 * 1024,
// request_size_limit: 10 * 1024 * 1024,
},
};

Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/gbq/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,4 +1088,4 @@ mod test {
assert!(result.is_err());
Ok(())
}
}
}
14 changes: 6 additions & 8 deletions src/connectors/impls/gcl/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ impl Sink for GclSink {
async fn connect(&mut self, ctx: &SinkContext, _attempt: &Attempt) -> Result<bool> {
if let Some(logic) = self.mock_logic {
info!("{} Mocking connection to Google Cloud Logging", ctx);
self.client = Some(LoggingServiceV2Client::new(
TremorGoogleAuthz::new_mock(logic),
));
} else {
self.client = Some(LoggingServiceV2Client::new(TremorGoogleAuthz::new_mock(
logic,
)));
} else {
info!("{} Connecting to Google Cloud Logging", ctx);
let channel =
make_tonic_channel(Duration::from_nanos(self.config.connect_timeout)).await?;
Expand Down Expand Up @@ -237,18 +237,16 @@ mod test {
use super::*;
use crate::connectors::impls::gcl;
use crate::connectors::tests::ConnectorHarness;
use crate::connectors::utils::quiescence::QuiescenceBeacon;
use crate::connectors::ConnectionLostNotifier;
use crate::connectors::{
utils::quiescence::QuiescenceBeacon,
};
use async_std::channel::bounded;
use futures::executor::block_on;
use google_api_proto::google::logging::{r#type::LogSeverity, v2::WriteLogEntriesResponse};
use http::{HeaderMap, HeaderValue};
use tremor_common::ids::SinkId;
use tremor_pipeline::CbAction::Trigger;
use tremor_pipeline::EventId;
use tremor_value::{literal, structurize};
use tremor_common::ids::SinkId;

#[async_std::test]
async fn on_event_can_send_an_event() -> Result<()> {
Expand Down
19 changes: 10 additions & 9 deletions src/connectors/impls/gpubsub/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tonic::transport::{Channel, ClientTlsConfig};
use tonic::{Code};
use tonic::Code;
use tremor_common::blue_green_hashmap::BlueGreenHashMap;
use tremor_pipeline::ConfigImpl;

Expand Down Expand Up @@ -80,7 +80,7 @@ impl ConnectorBuilder for Builder {
) -> Result<Box<dyn Connector>> {
let config = Config::new(raw)?;
let url = Url::<HttpsDefaults>::parse(config.url.as_str())?;
// let client_id = format!("tremor-{}-{alias}-{:?}", hostname(), task::current().id());
// let client_id = format!("tremor-{}-{alias}-{:?}", hostname(), task::current().id());
let client_id = "snot".to_string();
Ok(Box::new(GSub {
config,
Expand All @@ -102,7 +102,7 @@ type AsyncTaskMessage = Result<(u64, PubsubMessage)>;
struct GSubSource {
config: Config,
hostname: String,
// client: Option<PubSubClient>,
// client: Option<PubSubClient>,
receiver: Option<Receiver<AsyncTaskMessage>>,
ack_sender: Option<Sender<u64>>,
task_handle: Option<JoinHandle<()>>,
Expand All @@ -120,7 +120,8 @@ impl GSubSource {
"gpubsub-consumer".to_string(),
"Missing hostname".to_string(),
)
})?.to_string();
})?
.to_string();
Ok(GSubSource {
config,
hostname,
Expand Down Expand Up @@ -286,7 +287,6 @@ impl Source for GSubSource {
task_handle.cancel().await;
}


let mut client = SubscriberClient::new(TremorGoogleAuthz::new(channel.clone()).await?);
// check that the subscription exists
let res = client
Expand All @@ -301,9 +301,10 @@ impl Source for GSubSource {
);
debug!("{ctx} Subscription details {res:?}");

// let client_background = client.clone();
// let client_background = SubscriberClient::new(auth_channel.clone());
let client_background = SubscriberClient::new(TremorGoogleAuthz::new(channel.clone()).await?);
// let client_background = client.clone();
// let client_background = SubscriberClient::new(auth_channel.clone());
let client_background =
SubscriberClient::new(TremorGoogleAuthz::new(channel.clone()).await?);

let (tx, rx) = async_std::channel::bounded(QSIZE.load(Ordering::Relaxed));
let (ack_tx, ack_rx) = async_std::channel::bounded(QSIZE.load(Ordering::Relaxed));
Expand All @@ -323,7 +324,7 @@ impl Source for GSubSource {

self.receiver = Some(rx);
self.ack_sender = Some(ack_tx);
// self.client = Some(client);
// self.client = Some(client);
self.task_handle = Some(join_handle);

Ok(true)
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/otel/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use simd_json::Builder;
use tremor_otelapis::opentelemetry::proto::common::v1::{
any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList,
};
use tremor_value::{StaticNode, Value, literal};
use tremor_value::{literal, StaticNode, Value};
use value_trait::ValueAccess;

pub(crate) struct OtelDefaults;
Expand Down
10 changes: 5 additions & 5 deletions src/connectors/tests/gpubsub/gpub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use crate::connectors::tests::ConnectorHarness;
use crate::errors::Result;
use crate::instance::State;
use async_std::prelude::FutureExt;
use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient;
use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient;
use google_api_proto::google::pubsub::v1::PullRequest;
use google_api_proto::google::pubsub::v1::Subscription;
use google_api_proto::google::pubsub::v1::Topic;
use serial_test::serial;
use std::collections::HashSet;
use std::time::Duration;
Expand All @@ -26,11 +31,6 @@ use tonic::transport::Channel;
use tremor_common::ports::IN;
use tremor_pipeline::{Event, EventId};
use tremor_value::{literal, Value};
use google_api_proto::google::pubsub::v1::Subscription;
use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient;
use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient;
use google_api_proto::google::pubsub::v1::Topic;
use google_api_proto::google::pubsub::v1::PullRequest;
// use tremor_common::ids::SinkId;

#[async_std::test]
Expand Down
16 changes: 8 additions & 8 deletions src/connectors/tests/gpubsub/gsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
use crate::connectors::impls::gpubsub::consumer::Builder;
use crate::connectors::tests::ConnectorHarness;
use crate::errors::Result;
use std::collections::BTreeMap;
use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient;
use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient;
use google_api_proto::google::pubsub::v1::GetSubscriptionRequest;
use google_api_proto::google::pubsub::v1::PublishRequest;
use google_api_proto::google::pubsub::v1::PubsubMessage;
use google_api_proto::google::pubsub::v1::Subscription;
use google_api_proto::google::pubsub::v1::Topic;
use serial_test::serial;
use std::collections::BTreeMap;
use testcontainers::clients::Cli;
use testcontainers::RunnableImage;
use tonic::transport::Channel;
use tremor_pipeline::CbAction;
use tremor_value::{literal, Value};
use value_trait::ValueAccess;
use google_api_proto::google::pubsub::v1::PubsubMessage;
use google_api_proto::google::pubsub::v1::Subscription;
use google_api_proto::google::pubsub::v1::publisher_client::PublisherClient;
use google_api_proto::google::pubsub::v1::subscriber_client::SubscriberClient;
use google_api_proto::google::pubsub::v1::GetSubscriptionRequest;
use google_api_proto::google::pubsub::v1::Topic;
use google_api_proto::google::pubsub::v1::PublishRequest;
// use tremor_common::ids::SinkId;

#[async_std::test]
Expand Down

0 comments on commit 165a999

Please sign in to comment.