Skip to content

Commit

Permalink
Update tests and context passing
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed May 8, 2023
1 parent aea52d4 commit 9b82ae4
Show file tree
Hide file tree
Showing 17 changed files with 108 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod tests {
let id = Alias::new("my_id");
let res = Connector::from_config(&id, "fancy_schmancy".into(), &config);
assert!(res.is_err());
assert_eq!(String::from("Invalid Definition for connector \"app/flow::my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default());
assert_eq!(String::from("Invalid Definition for connector \"my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default());
}

#[test]
Expand Down
3 changes: 1 addition & 2 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use crate::{
log_error, pipeline, qsize, raft,
system::{flow::AppContext, KillSwitch, Runtime},
};
use beef::Cow;
use futures::Future;
use halfbrown::HashMap;
use simd_json::{Builder, Mutable, ValueAccess};
Expand Down Expand Up @@ -315,7 +314,7 @@ pub(crate) trait Context: Display + Clone {
'ct: 'event,
{
let t: &str = self.connector_type().into();
event_meta.get(&Cow::borrowed(t))
event_meta.get(t)
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,9 +971,7 @@ mod tests {
let connector_config =
ConnectorConfig::from_config(&alias, builder.connector_type(), &config)?;
assert_eq!(
String::from(
"Invalid Definition for connector \"app/flow::my_elastic\": empty nodes provided"
),
String::from("Invalid Definition for connector \"my_elastic\": empty nodes provided"),
builder
.build(&alias, &connector_config,)
.await
Expand Down
31 changes: 15 additions & 16 deletions src/connectors/impls/gbq/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.0.field.len(), 0);
Expand All @@ -802,7 +802,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.0.field.len(), 0);
Expand Down Expand Up @@ -832,7 +832,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.1.len(), 1);
Expand Down Expand Up @@ -864,7 +864,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.1.len(), 1);
Expand Down Expand Up @@ -1040,9 +1040,8 @@ mod test {

let mut result = Vec::new();
assert!(encode_field(&value, &field, &mut result).is_ok());

// json is currently not supported, so we expect the field to be skipped
assert_eq!([] as [u8; 0], result[..]);
assert_eq!([10, 2, 123, 125] as [u8; 4], result[..]);
}

#[test]
Expand Down Expand Up @@ -1079,7 +1078,7 @@ mod test {

#[test]
pub fn mapping_generates_a_correct_descriptor() {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![
TableFieldSchema {
Expand Down Expand Up @@ -1120,7 +1119,7 @@ mod test {

#[test]
pub fn can_map_json_to_protobuf() -> Result<()> {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![
TableFieldSchema {
Expand Down Expand Up @@ -1157,7 +1156,7 @@ mod test {

#[test]
fn map_field_ignores_fields_that_are_not_in_definition() -> Result<()> {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![
TableFieldSchema {
Expand Down Expand Up @@ -1195,7 +1194,7 @@ mod test {

#[test]
fn map_field_ignores_struct_fields_that_are_not_in_definition() -> Result<()> {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![TableFieldSchema {
name: "a".to_string(),
Expand Down Expand Up @@ -1231,7 +1230,7 @@ mod test {

#[test]
fn fails_on_bytes_type_mismatch() {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![TableFieldSchema {
name: "a".to_string(),
Expand All @@ -1258,7 +1257,7 @@ mod test {

#[test]
fn fails_if_the_event_is_not_an_object() {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![TableFieldSchema {
name: "a".to_string(),
Expand Down Expand Up @@ -1311,7 +1310,7 @@ mod test {
.on_event(
"",
Event::signal_tick(),
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
&mut EventSerializer::dummy(None)?,
0,
)
Expand Down Expand Up @@ -1340,7 +1339,7 @@ mod test {
.on_event(
"",
Event::signal_tick(),
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
&mut EventSerializer::dummy(None)?,
0,
)
Expand Down Expand Up @@ -1401,7 +1400,7 @@ mod test {
}),
);

let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");

sink.connect(&ctx, &Attempt::default()).await?;

Expand Down Expand Up @@ -1471,7 +1470,7 @@ mod test {
}),
);

let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");

sink.connect(&ctx, &Attempt::default()).await?;

Expand Down
4 changes: 2 additions & 2 deletions src/connectors/impls/gcl/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ mod test {
tx,
MockChannelFactory,
);
let sink_context = SinkContext::dummy();
let sink_context = SinkContext::dummy("gcl_writer");

sink.connect(&sink_context, &Attempt::default()).await?;

Expand Down Expand Up @@ -439,7 +439,7 @@ mod test {
.on_event(
"",
Event::signal_tick(),
&SinkContext::dummy(),
&SinkContext::dummy("gcl_writer"),
&mut EventSerializer::dummy(None)?,
0,
)
Expand Down
12 changes: 6 additions & 6 deletions src/connectors/impls/gcs/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate sink lifecycle
Expand Down Expand Up @@ -730,7 +730,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate sink lifecycle
Expand Down Expand Up @@ -888,7 +888,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate sink lifecycle
Expand Down Expand Up @@ -967,7 +967,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");

// simulate sink lifecycle
sink.on_start(&context).await?;
Expand Down Expand Up @@ -1011,7 +1011,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = ConsistentSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate standard sink lifecycle
Expand Down Expand Up @@ -1220,7 +1220,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = ConsistentSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate standard sink lifecycle
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ mod tests {
&literal!({
"measurement": "kafka_consumer_stats",
"tags": {
"connector": "app/fake::fake"
"connector": "fake"
},
"fields": {
"rx_msgs": 42,
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub(crate) trait ObjectStorageCommon {
}

fn get_object_id(&self, meta: Option<&Value<'_>>) -> Result<ObjectId> {
let name = meta
let name = &meta
.get(NAME)
.ok_or_else(|| {
err_object_storage(format!(
Expand Down
6 changes: 3 additions & 3 deletions src/connectors/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,12 @@ pub(crate) struct SinkContextInner {
pub(crate) struct SinkContext(Arc<SinkContextInner>);
impl SinkContext {
#[cfg(test)]
pub(crate) fn dummy() -> Self {
pub(crate) fn dummy(ct: &str) -> Self {
let (tx, _rx) = bounded(1024);
Self::new(
SinkUId::default(),
Alias::from("dummy"),
ConnectorType::from("dummy"),
Alias::from(ct),
ConnectorType::from(ct),
QuiescenceBeacon::default(),
ConnectionLostNotifier::new(tx),
AppContext::default(),
Expand Down
Loading

0 comments on commit 9b82ae4

Please sign in to comment.