Skip to content

Commit

Permalink
Update simd-json and halfbrown
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Jun 26, 2023
1 parent 35b0f69 commit b1d05e9
Show file tree
Hide file tree
Showing 48 changed files with 1,009 additions and 872 deletions.
1,203 changes: 661 additions & 542 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ file-mode = "0.1"
futures = "0.3.28"
event-listener = "2.5"
glob = "0.3"
halfbrown = "0.1"
hashbrown = { version = "0.13", features = ["serde"] }
halfbrown = "0.2"
hashbrown = { version = "0.14", features = ["serde"] }
hex = "0.4"
hostname = "0.3"
http-types = "2.12"
Expand All @@ -90,8 +90,8 @@ regex = "1.7"
rmp-serde = "1.1"
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
simd-json = { version = "0.8", features = ["known-key"] }
simd-json-derive = "0.8"
simd-json = { version = "0.10", features = ["known-key"] }
simd-json-derive = "0.10"
snap = "1"
socket2 = { version = "0.5", features = ["all"] }
syslog_loose = "0.18"
Expand All @@ -101,7 +101,7 @@ tremor-pipeline = { path = "tremor-pipeline" }
tremor-script = { path = "tremor-script" }
tremor-value = { path = "tremor-value" }
url = "2.3"
value-trait = "0.5"
value-trait = "0.6"
zstd = "0.12"

# blaster / blackhole
Expand Down
5 changes: 3 additions & 2 deletions src/codec/binflux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
use super::prelude::*;
use beef::Cow;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use simd_json::ObjectHasher;
use std::convert::TryFrom;
use std::io::{Cursor, Write};
use std::str;
Expand Down Expand Up @@ -154,14 +155,14 @@ impl BInflux {
let measurement = read_string(&mut c)?;
let timestamp = c.read_u64::<BigEndian>()?;
let tag_count = c.read_u16::<BigEndian>()? as usize;
let mut tags = Object::with_capacity(tag_count);
let mut tags = Object::with_capacity_and_hasher(tag_count, ObjectHasher::default());
for _i in 0..tag_count {
let key = read_string(&mut c)?;
let value = read_string(&mut c)?;
tags.insert(key, Value::from(value));
}
let field_count = c.read_u16::<BigEndian>()? as usize;
let mut fields = Object::with_capacity(field_count);
let mut fields = Object::with_capacity_and_hasher(field_count, ObjectHasher::default());
for _i in 0..field_count {
let key = read_string(&mut c)?;
let kind = c.read_u8()?;
Expand Down
14 changes: 8 additions & 6 deletions src/codec/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@
//! }
//! ```

use simd_json::ObjectHasher;

use super::prelude::*;
use std::io::Write;

Expand Down Expand Up @@ -321,8 +323,8 @@ fn decode(data: &[u8], _ingest_ns: u64) -> Result<Value> {
}

fn decode_metric(data: &str) -> Result<Value> {
let mut map = Object::with_capacity(1);
let mut m = Object::with_capacity(6);
let mut map = Object::with_capacity_and_hasher(1, ObjectHasher::default());
let mut m = Object::with_capacity_and_hasher(6, ObjectHasher::default());

let (metric, data) = data.split_once(':').ok_or_else(invalid)?;
m.insert_nocheck("metric".into(), Value::from(metric));
Expand Down Expand Up @@ -372,8 +374,8 @@ fn decode_metric(data: &str) -> Result<Value> {

// _e{21,36}:An exception occurred|Cannot parse CSV file from 10.0.0.17|t:warning|#err_type:bad_file
fn decode_event(data: &str) -> Result<Value> {
let mut map = Object::with_capacity(1);
let mut m = Object::with_capacity(10);
let mut map = Object::with_capacity_and_hasher(1, ObjectHasher::default());
let mut m = Object::with_capacity_and_hasher(10, ObjectHasher::default());

let (titel_len, data) = data.split_once(',').ok_or_else(invalid)?;
let (text_len, data) = data.split_once("}:").ok_or_else(invalid)?;
Expand Down Expand Up @@ -419,8 +421,8 @@ fn decode_event(data: &str) -> Result<Value> {

//_sc|Redis connection|2|#env:dev|m:Redis connection timed out after 10s
fn decode_service_check(data: &str) -> Result<Value> {
let mut map = Object::with_capacity(1);
let mut m = Object::with_capacity(7);
let mut map = Object::with_capacity_and_hasher(1, ObjectHasher::default());
let mut m = Object::with_capacity_and_hasher(7, ObjectHasher::default());

let (name, data) = data.split_once('|').ok_or_else(invalid)?;
m.insert_nocheck("name".into(), Value::from(name));
Expand Down
4 changes: 3 additions & 1 deletion src/codec/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

use std::io::Write;

use simd_json::ObjectHasher;

use super::prelude::*;

#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -132,7 +134,7 @@ fn decode(data: &[u8], _ingest_ns: u64) -> Result<Value> {
}
let data = simdutf8::basic::from_utf8(data)?;

let mut m = Object::with_capacity(4);
let mut m = Object::with_capacity_and_hasher(4, ObjectHasher::default());

let (metric, data) = data.split_once(':').ok_or_else(invalid)?;
m.insert_nocheck("metric".into(), Value::from(metric));
Expand Down
15 changes: 8 additions & 7 deletions src/codec/tremor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{
use super::prelude::*;
use beef::Cow;
use byteorder::{ByteOrder, NetworkEndian, ReadBytesExt, WriteBytesExt};
use halfbrown::HashMap;
use simd_json::StaticNode;

/// Tremor to Tremor codec
Expand Down Expand Up @@ -122,7 +121,12 @@ impl Tremor {
let (len, mut total) = Self::read_len::<E>(t, data)?;
//ALLOW: `total` is the data we've already read so we know they exist
data = unsafe { data.get_unchecked(total..) };
let mut o = HashMap::with_capacity(len);
let mut v = Value::object_with_capacity(len);
let o = match v.as_object_mut() {
Some(o) => o,
// ALLOW: We knowm tis is an object
None => unreachable!(),
};
for _ in 0..len {
let mut cursor = Cursor::new(data);
let len = cursor.read_u64::<E>()? as usize;
Expand All @@ -139,7 +143,7 @@ impl Tremor {
data = unsafe { data.get_unchecked(read..) };
o.insert_nocheck(Cow::from(k), v);
}
Ok((Value::Object(Box::new(o)), 1 + total))
Ok((v, 1 + total))
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
Expand Down Expand Up @@ -221,10 +225,7 @@ impl Tremor {
Ok(())
}
#[inline]
fn write_object<E: ByteOrder>(
o: &HashMap<Cow<'_, str>, Value>,
w: &mut impl Write,
) -> Result<()> {
fn write_object<E: ByteOrder>(o: &Object, w: &mut impl Write) -> Result<()> {
Self::write_type_and_len::<E>(Self::OBJECT, o.len(), w)?;
for (k, v) in o.iter() {
w.write_u64::<E>(k.len() as u64)?;
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ pub struct Binding {

#[cfg(test)]
mod tests {
use halfbrown::HashMap;
use serde::Deserialize;
use std::collections::HashMap;

use super::*;
use crate::{errors::Result, system::flow};
Expand Down Expand Up @@ -412,7 +412,7 @@ mod tests {
"application/yaml": {"name": "yaml"},
"*/*": codec,
});
let nac = HashMap::<String, NameWithConfig>::deserialize(data)
let nac = std::collections::HashMap::<String, NameWithConfig>::deserialize(data)
.expect("could structurize two element struct");

assert_eq!(nac.len(), 3);
Expand Down
3 changes: 1 addition & 2 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ use elasticsearch::{
Bulk, BulkDeleteOperation, BulkOperation, BulkOperations, BulkParts, BulkUpdateOperation,
Elasticsearch,
};
use halfbrown::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt::Display, sync::atomic::AtomicBool};
Expand Down Expand Up @@ -280,7 +279,7 @@ pub(crate) struct Config {

#[serde(default = "Default::default")]
/// custom headers to add to each request to elastic
headers: HashMap<String, Header>,
headers: std::collections::HashMap<String, Header>,

/// means for authenticating towards elastic
#[serde(default = "Default::default")]
Expand Down
45 changes: 22 additions & 23 deletions src/connectors/impls/gbq/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,10 +990,9 @@ mod test {

#[test]
pub fn can_encode_a_struct() {
let mut values = halfbrown::HashMap::new();
values.insert("a".into(), Value::Static(StaticNode::I64(1)));
values.insert("b".into(), Value::Static(StaticNode::I64(1024)));
let input = Value::Object(Box::new(values));
let mut input = Value::object();
input.try_insert("a", 1);
input.try_insert("b", 1024);

let mut subfields = HashMap::new();
subfields.insert(
Expand Down Expand Up @@ -1075,7 +1074,7 @@ mod test {

#[test]
pub fn can_encode_json() {
let value = Value::Object(Box::new(halfbrown::HashMap::new()));
let value = Value::object();
let field = Field {
table_type: TableType::Json,
tag: 1,
Expand Down Expand Up @@ -1206,10 +1205,10 @@ mod test {
],
&ctx,
);
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Static(StaticNode::I64(12)));
fields.insert("b".into(), Value::Static(StaticNode::I64(21)));
let result = mapping.map(&Value::Object(Box::new(fields)))?;
let mut fields = Value::object();
fields.try_insert("a", 12);
fields.try_insert("b", 21);
let result = mapping.map(&fields)?;

assert_eq!([8u8, 12u8, 16u8, 21u8], result[..]);
Ok(())
Expand Down Expand Up @@ -1251,11 +1250,11 @@ mod test {
],
&ctx,
);
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Static(StaticNode::I64(12)));
fields.insert("b".into(), Value::Static(StaticNode::I64(21)));
fields.insert("c".into(), Value::Static(StaticNode::I64(33)));
let result = mapping.map(&Value::Object(Box::new(fields)))?;
let mut fields = Value::object();
fields.try_insert("a", 12);
fields.try_insert("b", 21);
fields.try_insert("c", 33);
let result = mapping.map(&fields)?;

assert_eq!([8u8, 12u8, 16u8, 21u8], result[..]);
Ok(())
Expand Down Expand Up @@ -1294,12 +1293,12 @@ mod test {
}],
&ctx,
);
let mut inner_fields = halfbrown::HashMap::new();
inner_fields.insert("x".into(), Value::Static(StaticNode::I64(10)));
inner_fields.insert("y".into(), Value::Static(StaticNode::I64(10)));
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Object(Box::new(inner_fields)));
let result = mapping.map(&Value::Object(Box::new(fields)))?;
let mut inner_fields = Value::object();
inner_fields.try_insert("x", 10);
inner_fields.try_insert("y", 10);
let mut fields = Value::object();
fields.try_insert("a", inner_fields);
let result = mapping.map(&fields)?;

assert_eq!([10u8, 2u8, 8u8, 10u8], result[..]);
Ok(())
Expand Down Expand Up @@ -1329,9 +1328,9 @@ mod test {
}],
&ctx,
);
let mut fields = halfbrown::HashMap::new();
fields.insert("a".into(), Value::Static(StaticNode::I64(12)));
let result = mapping.map(&Value::Object(Box::new(fields)));
let mut fields = Value::object();
fields.try_insert("a", 12);
let result = mapping.map(&fields);

if let Err(Error(ErrorKind::BigQueryTypeMismatch("bytes", x), _)) = result {
assert_eq!(x, ValueType::I64);
Expand Down
10 changes: 5 additions & 5 deletions src/connectors/impls/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ pub(crate) mod producer;

use crate::connectors::prelude::*;
use beef::Cow;
use halfbrown::HashMap;
use rdkafka::{error::KafkaError, ClientContext, Statistics};
use rdkafka_sys::RDKafkaErrorCode;
use simd_json::ObjectHasher;
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -423,12 +423,12 @@ where
let metrics_payload = match stats.client_type.as_str() {
Self::PRODUCER => {
let timestamp = u64::try_from(stats.time)? * 1_000_000_000;
let mut fields = HashMap::with_capacity(3);
let mut fields = Object::with_capacity_and_hasher(3, ObjectHasher::default());
fields.insert(Self::TX_MSGS, Value::from(stats.txmsgs));
fields.insert(Self::TX_MSG_BYTES, Value::from(stats.txmsg_bytes));
fields.insert(Self::QUEUED_MSGS, Value::from(stats.msg_cnt));

let mut tags = HashMap::with_capacity(1);
let mut tags = Object::with_capacity_and_hasher(1, ObjectHasher::default());
tags.insert(Self::CONNECTOR, Value::from(self.ctx.alias().to_string()));

make_metrics_payload(Self::KAFKA_PRODUCER_STATS, fields, tags, timestamp)
Expand All @@ -437,7 +437,7 @@ where
let timestamp = u64::try_from(stats.time)? * 1_000_000_000;

// consumer stats
let mut fields = HashMap::with_capacity(4);
let mut fields = Object::with_capacity_and_hasher(4, ObjectHasher::default());
fields.insert(Self::RX_MSGS, Value::from(stats.rxmsgs));
fields.insert(Self::RX_MSG_BYTES, Value::from(stats.rxmsg_bytes));
if let Some(cg) = stats.cgrp {
Expand All @@ -452,7 +452,7 @@ where
}
}
fields.insert(Self::CONSUMER_LAG, Value::from(consumer_lag));
let mut tags = HashMap::with_capacity(1);
let mut tags = Object::with_capacity_and_hasher(1, ObjectHasher::default());
tags.insert(Self::CONNECTOR, Value::from(self.ctx.alias().to_string()));
make_metrics_payload(Self::KAFKA_CONSUMER_STATS, fields, tags, timestamp)
}
Expand Down
14 changes: 10 additions & 4 deletions src/connectors/sink/channel_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
};
use bimap::BiMap;
use either::Either;
use hashbrown::HashMap;
use std::collections::HashMap;
use std::{
hash::Hash,
marker::PhantomData,
Expand Down Expand Up @@ -208,9 +208,15 @@ where
}
// clean out closed streams
if clean_closed_streams {
for (stream_id, _) in self.streams.drain_filter(|_k, v| v.is_closed()) {
self.streams_meta.remove_by_right(&stream_id);
serializer.drop_stream(stream_id);
let closed_streams: Vec<_> = self
.streams
.iter()
.filter_map(|(k, v)| v.is_closed().then_some(*k))
.collect();
for id in closed_streams {
self.streams.remove(&id);
self.streams_meta.remove_by_right(&id);
serializer.drop_stream(id);
}
}
self.streams.is_empty()
Expand Down
8 changes: 4 additions & 4 deletions src/connectors/utils/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use beef::Cow;
use halfbrown::HashMap;
use simd_json::ObjectHasher;
use tremor_common::ports::{Port, ERR, IN, OUT};
use tremor_pipeline::metrics::{value, value_count};
use tremor_pipeline::MetricsSender;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(crate) fn make_event_count_metrics_payload(
count: u64,
connector_id: &Alias,
) -> EventPayload {
let mut tags: HashMap<Cow<'static, str>, Value<'static>> = HashMap::with_capacity(2);
let mut tags = Object::with_capacity_and_hasher(2, ObjectHasher::default());
tags.insert_nocheck(FLOW, Value::from(connector_id.flow_alias().to_string()));
tags.insert_nocheck(CONNECTOR, connector_id.to_string().into());
tags.insert_nocheck(PORT, port.into());
Expand All @@ -160,8 +160,8 @@ pub(crate) fn make_event_count_metrics_payload(
#[must_use]
pub(crate) fn make_metrics_payload(
name: &'static str,
fields: HashMap<Cow<'static, str>, Value<'static>>,
tags: HashMap<Cow<'static, str>, Value<'static>>,
fields: Object<'static>,
tags: Object<'static>,
timestamp: u64,
) -> EventPayload {
let value = value(Cow::const_str(name), tags, fields, timestamp);
Expand Down
2 changes: 1 addition & 1 deletion tests/scripts/record_comprehension/out
Original file line number Diff line number Diff line change
@@ -1 +1 @@
[ [ "1", "one", "other" ], [ "2", "two", "other" ], [ "0", "zip", "other" ] ]
[ [ "0", "zip", "other" ], [ "1", "one", "other" ], [ "2", "two", "other" ] ]
5 changes: 3 additions & 2 deletions tests/scripts/record_comprehension/script.tremor
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
for event of
use std::array;
array::sort(for event of
case (k, v) when false => [k,v]
case (k, v) => [k,v, "other"]
case (k, v) => "should never get here"
end;
end)
2 changes: 1 addition & 1 deletion tests/scripts/record_comprehension_imut/out
Original file line number Diff line number Diff line change
@@ -1 +1 @@
[ [ "1", "one", "other" ], [ "2", "two", "other" ], [ "0", "zip", "other" ] ]
[ [ "0", "zip", "other" ], [ "1", "one", "other" ], [ "2", "two", "other" ] ]
Loading

0 comments on commit b1d05e9

Please sign in to comment.