Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update simd-json and halfbrown #2336

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,203 changes: 661 additions & 542 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ file-mode = "0.1"
futures = "0.3.28"
event-listener = "2.5"
glob = "0.3"
halfbrown = "0.1"
hashbrown = { version = "0.13", features = ["serde"] }
halfbrown = "0.2"
hashbrown = { version = "0.14", features = ["serde"] }
hex = "0.4"
hostname = "0.3"
http-types = "2.12"
Expand All @@ -90,8 +90,8 @@ regex = "1.7"
rmp-serde = "1.1"
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
simd-json = { version = "0.8", features = ["known-key"] }
simd-json-derive = "0.8"
simd-json = { version = "0.10", features = ["known-key"] }
simd-json-derive = "0.10"
snap = "1"
socket2 = { version = "0.5", features = ["all"] }
syslog_loose = "0.18"
Expand All @@ -101,7 +101,7 @@ tremor-pipeline = { path = "tremor-pipeline" }
tremor-script = { path = "tremor-script" }
tremor-value = { path = "tremor-value" }
url = "2.3"
value-trait = "0.5"
value-trait = "0.6"
zstd = "0.12"

# blaster / blackhole
Expand Down
5 changes: 3 additions & 2 deletions src/codec/binflux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
use super::prelude::*;
use beef::Cow;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use simd_json::ObjectHasher;
use std::convert::TryFrom;
use std::io::{Cursor, Write};
use std::str;
Expand Down Expand Up @@ -154,14 +155,14 @@ impl BInflux {
let measurement = read_string(&mut c)?;
let timestamp = c.read_u64::<BigEndian>()?;
let tag_count = c.read_u16::<BigEndian>()? as usize;
let mut tags = Object::with_capacity(tag_count);
let mut tags = Object::with_capacity_and_hasher(tag_count, ObjectHasher::default());
for _i in 0..tag_count {
let key = read_string(&mut c)?;
let value = read_string(&mut c)?;
tags.insert(key, Value::from(value));
}
let field_count = c.read_u16::<BigEndian>()? as usize;
let mut fields = Object::with_capacity(field_count);
let mut fields = Object::with_capacity_and_hasher(field_count, ObjectHasher::default());
for _i in 0..field_count {
let key = read_string(&mut c)?;
let kind = c.read_u8()?;
Expand Down
14 changes: 8 additions & 6 deletions src/codec/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@
//! }
//! ```

use simd_json::ObjectHasher;

use super::prelude::*;
use std::io::Write;

Expand Down Expand Up @@ -321,8 +323,8 @@ fn decode(data: &[u8], _ingest_ns: u64) -> Result<Value> {
}

fn decode_metric(data: &str) -> Result<Value> {
let mut map = Object::with_capacity(1);
let mut m = Object::with_capacity(6);
let mut map = Object::with_capacity_and_hasher(1, ObjectHasher::default());
let mut m = Object::with_capacity_and_hasher(6, ObjectHasher::default());

let (metric, data) = data.split_once(':').ok_or_else(invalid)?;
m.insert_nocheck("metric".into(), Value::from(metric));
Expand Down Expand Up @@ -372,8 +374,8 @@ fn decode_metric(data: &str) -> Result<Value> {

// _e{21,36}:An exception occurred|Cannot parse CSV file from 10.0.0.17|t:warning|#err_type:bad_file
fn decode_event(data: &str) -> Result<Value> {
let mut map = Object::with_capacity(1);
let mut m = Object::with_capacity(10);
let mut map = Object::with_capacity_and_hasher(1, ObjectHasher::default());
let mut m = Object::with_capacity_and_hasher(10, ObjectHasher::default());

let (titel_len, data) = data.split_once(',').ok_or_else(invalid)?;
let (text_len, data) = data.split_once("}:").ok_or_else(invalid)?;
Expand Down Expand Up @@ -419,8 +421,8 @@ fn decode_event(data: &str) -> Result<Value> {

//_sc|Redis connection|2|#env:dev|m:Redis connection timed out after 10s
fn decode_service_check(data: &str) -> Result<Value> {
let mut map = Object::with_capacity(1);
let mut m = Object::with_capacity(7);
let mut map = Object::with_capacity_and_hasher(1, ObjectHasher::default());
let mut m = Object::with_capacity_and_hasher(7, ObjectHasher::default());

let (name, data) = data.split_once('|').ok_or_else(invalid)?;
m.insert_nocheck("name".into(), Value::from(name));
Expand Down
4 changes: 3 additions & 1 deletion src/codec/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

use std::io::Write;

use simd_json::ObjectHasher;

use super::prelude::*;

#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -132,7 +134,7 @@ fn decode(data: &[u8], _ingest_ns: u64) -> Result<Value> {
}
let data = simdutf8::basic::from_utf8(data)?;

let mut m = Object::with_capacity(4);
let mut m = Object::with_capacity_and_hasher(4, ObjectHasher::default());

let (metric, data) = data.split_once(':').ok_or_else(invalid)?;
m.insert_nocheck("metric".into(), Value::from(metric));
Expand Down
15 changes: 8 additions & 7 deletions src/codec/tremor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
use super::prelude::*;
use beef::Cow;
use byteorder::{ByteOrder, NetworkEndian, ReadBytesExt, WriteBytesExt};
use halfbrown::HashMap;
use simd_json::StaticNode;

/// Tremor to Tremor codec
Expand Down Expand Up @@ -122,7 +121,12 @@
let (len, mut total) = Self::read_len::<E>(t, data)?;
//ALLOW: `total` is the data we've already read so we know they exist
data = unsafe { data.get_unchecked(total..) };
let mut o = HashMap::with_capacity(len);
let mut v = Value::object_with_capacity(len);
let o = match v.as_object_mut() {
Some(o) => o,
// ALLOW: We knowm tis is an object
None => unreachable!(),

Check warning on line 128 in src/codec/tremor.rs

View check run for this annotation

Codecov / codecov/patch

src/codec/tremor.rs#L128

Added line #L128 was not covered by tests
};
for _ in 0..len {
let mut cursor = Cursor::new(data);
let len = cursor.read_u64::<E>()? as usize;
Expand All @@ -139,7 +143,7 @@
data = unsafe { data.get_unchecked(read..) };
o.insert_nocheck(Cow::from(k), v);
}
Ok((Value::Object(Box::new(o)), 1 + total))
Ok((v, 1 + total))
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
Expand Down Expand Up @@ -221,10 +225,7 @@
Ok(())
}
#[inline]
fn write_object<E: ByteOrder>(
o: &HashMap<Cow<'_, str>, Value>,
w: &mut impl Write,
) -> Result<()> {
fn write_object<E: ByteOrder>(o: &Object, w: &mut impl Write) -> Result<()> {
Self::write_type_and_len::<E>(Self::OBJECT, o.len(), w)?;
for (k, v) in o.iter() {
w.write_u64::<E>(k.len() as u64)?;
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ pub struct Binding {

#[cfg(test)]
mod tests {
use halfbrown::HashMap;
use serde::Deserialize;
use std::collections::HashMap;

use super::*;
use crate::{errors::Result, system::flow};
Expand Down Expand Up @@ -412,7 +412,7 @@ mod tests {
"application/yaml": {"name": "yaml"},
"*/*": codec,
});
let nac = HashMap::<String, NameWithConfig>::deserialize(data)
let nac = std::collections::HashMap::<String, NameWithConfig>::deserialize(data)
.expect("could structurize two element struct");

assert_eq!(nac.len(), 3);
Expand Down
3 changes: 1 addition & 2 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ use elasticsearch::{
Bulk, BulkDeleteOperation, BulkOperation, BulkOperations, BulkParts, BulkUpdateOperation,
Elasticsearch,
};
use halfbrown::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt::Display, sync::atomic::AtomicBool};
Expand Down Expand Up @@ -280,7 +279,7 @@ pub(crate) struct Config {

#[serde(default = "Default::default")]
/// custom headers to add to each request to elastic
headers: HashMap<String, Header>,
headers: std::collections::HashMap<String, Header>,

/// means for authenticating towards elastic
#[serde(default = "Default::default")]
Expand Down
45 changes: 22 additions & 23 deletions src/connectors/impls/gbq/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,10 +990,9 @@ mod test {

#[test]
pub fn can_encode_a_struct() {
let mut values = halfbrown::HashMap::new();
values.insert("a".into(), Value::Static(StaticNode::I64(1)));
values.insert("b".into(), Value::Static(StaticNode::I64(1024)));
let input = Value::Object(Box::new(values));
let mut input = Value::object();
input.try_insert("a", 1);
input.try_insert("b", 1024);

let mut subfields = HashMap::new();
subfields.insert(
Expand Down Expand Up @@ -1075,7 +1074,7 @@ mod test {

#[test]
pub fn can_encode_json() {
let value = Value::Object(Box::new(halfbrown::HashMap::new()));
let value = Value::object();
let field = Field {
table_type: TableType::Json,
tag: 1,
Expand Down Expand Up @@ -1206,10 +1205,10 @@ mod test {
],
&ctx,
);
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Static(StaticNode::I64(12)));
fields.insert("b".into(), Value::Static(StaticNode::I64(21)));
let result = mapping.map(&Value::Object(Box::new(fields)))?;
let mut fields = Value::object();
fields.try_insert("a", 12);
fields.try_insert("b", 21);
let result = mapping.map(&fields)?;

assert_eq!([8u8, 12u8, 16u8, 21u8], result[..]);
Ok(())
Expand Down Expand Up @@ -1251,11 +1250,11 @@ mod test {
],
&ctx,
);
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Static(StaticNode::I64(12)));
fields.insert("b".into(), Value::Static(StaticNode::I64(21)));
fields.insert("c".into(), Value::Static(StaticNode::I64(33)));
let result = mapping.map(&Value::Object(Box::new(fields)))?;
let mut fields = Value::object();
fields.try_insert("a", 12);
fields.try_insert("b", 21);
fields.try_insert("c", 33);
let result = mapping.map(&fields)?;

assert_eq!([8u8, 12u8, 16u8, 21u8], result[..]);
Ok(())
Expand Down Expand Up @@ -1294,12 +1293,12 @@ mod test {
}],
&ctx,
);
let mut inner_fields = halfbrown::HashMap::new();
inner_fields.insert("x".into(), Value::Static(StaticNode::I64(10)));
inner_fields.insert("y".into(), Value::Static(StaticNode::I64(10)));
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Object(Box::new(inner_fields)));
let result = mapping.map(&Value::Object(Box::new(fields)))?;
let mut inner_fields = Value::object();
inner_fields.try_insert("x", 10);
inner_fields.try_insert("y", 10);
let mut fields = Value::object();
fields.try_insert("a", inner_fields);
let result = mapping.map(&fields)?;

assert_eq!([10u8, 2u8, 8u8, 10u8], result[..]);
Ok(())
Expand Down Expand Up @@ -1329,9 +1328,9 @@ mod test {
}],
&ctx,
);
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Static(StaticNode::I64(12)));
let result = mapping.map(&Value::Object(Box::new(fields)));
let mut fields = Value::object();
fields.try_insert("a", 12);
let result = mapping.map(&fields);

if let Err(Error(ErrorKind::BigQueryTypeMismatch("bytes", x), _)) = result {
assert_eq!(x, ValueType::I64);
Expand Down
10 changes: 5 additions & 5 deletions src/connectors/impls/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ pub(crate) mod producer;

use crate::connectors::prelude::*;
use beef::Cow;
use halfbrown::HashMap;
use rdkafka::{error::KafkaError, ClientContext, Statistics};
use rdkafka_sys::RDKafkaErrorCode;
use simd_json::ObjectHasher;
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -423,12 +423,12 @@ where
let metrics_payload = match stats.client_type.as_str() {
Self::PRODUCER => {
let timestamp = u64::try_from(stats.time)? * 1_000_000_000;
let mut fields = HashMap::with_capacity(3);
let mut fields = Object::with_capacity_and_hasher(3, ObjectHasher::default());
fields.insert(Self::TX_MSGS, Value::from(stats.txmsgs));
fields.insert(Self::TX_MSG_BYTES, Value::from(stats.txmsg_bytes));
fields.insert(Self::QUEUED_MSGS, Value::from(stats.msg_cnt));

let mut tags = HashMap::with_capacity(1);
let mut tags = Object::with_capacity_and_hasher(1, ObjectHasher::default());
tags.insert(Self::CONNECTOR, Value::from(self.ctx.alias().to_string()));

make_metrics_payload(Self::KAFKA_PRODUCER_STATS, fields, tags, timestamp)
Expand All @@ -437,7 +437,7 @@ where
let timestamp = u64::try_from(stats.time)? * 1_000_000_000;

// consumer stats
let mut fields = HashMap::with_capacity(4);
let mut fields = Object::with_capacity_and_hasher(4, ObjectHasher::default());
fields.insert(Self::RX_MSGS, Value::from(stats.rxmsgs));
fields.insert(Self::RX_MSG_BYTES, Value::from(stats.rxmsg_bytes));
if let Some(cg) = stats.cgrp {
Expand All @@ -452,7 +452,7 @@ where
}
}
fields.insert(Self::CONSUMER_LAG, Value::from(consumer_lag));
let mut tags = HashMap::with_capacity(1);
let mut tags = Object::with_capacity_and_hasher(1, ObjectHasher::default());
tags.insert(Self::CONNECTOR, Value::from(self.ctx.alias().to_string()));
make_metrics_payload(Self::KAFKA_CONSUMER_STATS, fields, tags, timestamp)
}
Expand Down
14 changes: 10 additions & 4 deletions src/connectors/sink/channel_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
};
use bimap::BiMap;
use either::Either;
use hashbrown::HashMap;
use std::collections::HashMap;
use std::{
hash::Hash,
marker::PhantomData,
Expand Down Expand Up @@ -208,9 +208,15 @@
}
// clean out closed streams
if clean_closed_streams {
for (stream_id, _) in self.streams.drain_filter(|_k, v| v.is_closed()) {
self.streams_meta.remove_by_right(&stream_id);
serializer.drop_stream(stream_id);
let closed_streams: Vec<_> = self
.streams
.iter()
.filter_map(|(k, v)| v.is_closed().then_some(*k))
.collect();
for id in closed_streams {
self.streams.remove(&id);
self.streams_meta.remove_by_right(&id);
serializer.drop_stream(id);

Check warning on line 219 in src/connectors/sink/channel_sink.rs

View check run for this annotation

Codecov / codecov/patch

src/connectors/sink/channel_sink.rs#L217-L219

Added lines #L217 - L219 were not covered by tests
}
}
self.streams.is_empty()
Expand Down
14 changes: 7 additions & 7 deletions src/connectors/tests/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,10 @@ async fn https_server_test() -> Result<()> {
data.extend_from_slice(&chunk);
}
let body = String::from_utf8(data)?;
assert_eq!(
format!(
r#"meta:
let body = serde_yaml::from_str::<serde_yaml::Value>(&body)?;
let expected = serde_yaml::from_str::<serde_yaml::Value>(&format!(
r#"
meta:
uri: /
version: HTTP/1.1
protocol: http
Expand All @@ -437,10 +438,9 @@ async fn https_server_test() -> Result<()> {
- localhost:{port}
method: DELETE
value: null
"#
),
body
);
"#
))?;
assert_eq!(expected, body);
assert_eq!(StatusCode::OK, response.status());
assert_eq!(
Some("application/yaml; charset=UTF-8"),
Expand Down
Loading
Loading