Skip to content

Commit

Permalink
parser: use time crate to parse, update benchmark to remove I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Oct 12, 2023
1 parent 75d2518 commit 7205e15
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 144 deletions.
77 changes: 1 addition & 76 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bytes = { workspace = true }
caseless = { workspace = true }
chardetng = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
time = { workspace = true }
criterion = { workspace = true }
csv = { workspace = true }
encoding_rs = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions crates/parser/benches/data/datetimes.csv

This file was deleted.

4 changes: 2 additions & 2 deletions crates/parser/benches/peoples_500.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use criterion::{criterion_group, criterion_main, Criterion, black_box};

use parser::ParseConfig;
use testutil::{input_for_file, run_test};
use testutil::{input_for_file, run_parser};

fn peoples_500(c: &mut Criterion) {
let path = "benches/data/people-500.csv";
Expand All @@ -14,7 +14,7 @@ fn peoples_500(c: &mut Criterion) {

c.bench_function("peoples_500", |b| b.iter(|| {
let input = input_for_file(path);
run_test(&cfg, input);
run_parser(&cfg, input, false);
}));
}

Expand Down
12 changes: 6 additions & 6 deletions crates/parser/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,8 @@ impl schemars::JsonSchema for ErrorThreshold {
}
}

fn default_timezone_string() -> String {
"UTC".to_string()
fn default_offset_string() -> String {
"+00:00".to_string()
}

// Fields annotated with `schemars(skip)` will not appear in the JSON schema, and thus won't be
Expand All @@ -555,9 +555,9 @@ pub struct ParseConfig {
pub compression: DefaultNullIsAutomatic<Compression>,

/// The default timezone to use when parsing timestamps that do not have a timezone. Timezones
/// must be specified as a valid IANA name. Defaults to UTC.
#[serde(default="default_timezone_string")]
pub default_timezone: String,
/// must be specified as an +/-HH:MM offset, defaults to +00:00.
#[serde(default="default_offset_string")]
pub default_offset: String,

/// filename is used for format inference. It will be ignored if `format` is specified.
#[serde(default)]
Expand Down Expand Up @@ -596,7 +596,7 @@ impl Default for ParseConfig {
ParseConfig {
format: Default::default(),
compression: Default::default(),
default_timezone: default_timezone_string(),
default_offset: default_offset_string(),
filename: Default::default(),
add_record_offset: Default::default(),
add_values: Default::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ expression: schema
}
]
},
"defaultTimezone": {
"description": "The default timezone to use when parsing timestamps that do not have a timezone. Timezones must be specified as a valid IANA name. Defaults to UTC.",
"default": "UTC",
"defaultOffset": {
"description": "The default timezone to use when parsing timestamps that do not have a timezone. Timezones must be specified as an +/-HH:MM offset, defaults to +00:00.",
"default": "+00:00",
"type": "string"
},
"format": {
Expand Down
81 changes: 35 additions & 46 deletions crates/parser/src/format/sanitize/datetime.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,50 @@
use crate::{ParseConfig, Output, format::ParseResult, ParseError};
use chrono::{DateTime, FixedOffset, SecondsFormat};
use chrono_tz::Tz;
use time::macros::format_description;
use serde_json::Value;

struct DatetimeSanitizer {
from: Output,
default_timezone: Tz,
default_offset: time::UtcOffset,
}

const NAIVE_FORMATS: [&'static str; 4] = [
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%.f",
"%Y-%m-%d %H:%M:%S%.f",
"%Y-%m-%d %H:%M:%S",
];

const FORMATS: [&'static str; 2] = [
"%Y-%m-%d %H:%M:%S%.f%:z",
"%Y-%m-%d %H:%M:%S%:z",
];

fn datetime_to_rfc3339(val: &mut Value, default_timezone: Tz) {
// Here we are trying to parse non-ambiguous, non-RFC3339 dates and formatting them as RFC3339
// So we skip any valid RFC3339 in our processing and pass it as-is
fn datetime_to_rfc3339(val: &mut Value, default_offset: time::UtcOffset) {
match val {
Value::String(s) => {
let mut parsed: Option<DateTime<FixedOffset>> = None;

for f in FORMATS {
parsed = parsed.or_else(||
chrono::DateTime::parse_from_str(&s, f).ok()
)
}

if let Some(ts) = parsed {
*s = ts.to_rfc3339_opts(SecondsFormat::AutoSi, true);
return
}

let mut naive_parsed: Option<DateTime<Tz>> = None;

for f in NAIVE_FORMATS {
naive_parsed = naive_parsed.or_else(||
chrono::NaiveDateTime::parse_from_str(&s, f).map(|d| d.and_local_timezone(default_timezone).unwrap()).ok()
)
}

if let Some(ts) = naive_parsed {
*s = ts.to_rfc3339_opts(SecondsFormat::AutoSi, true);
let offset_format = format_description!(
version = 2,
"[first
[[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]]Z]
[[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]]z]
[[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]:[offset_minute]]
]"
);

let primitive_format = format_description!(
version = 2,
"[year]-[month]-[day][optional [T]][optional [ ]][hour]:[minute]:[second][optional [.[subsecond]]]"
);

let parsed_with_tz = time::OffsetDateTime::parse(&s, offset_format);
let parsed_no_tz = time::PrimitiveDateTime::parse(&s, primitive_format);

if let Ok(parsed) = parsed_with_tz {
*s = parsed.format(&time::format_description::well_known::Rfc3339).unwrap();
} else if let Ok(parsed) = parsed_no_tz {
*s = parsed.assume_offset(default_offset).format(&time::format_description::well_known::Rfc3339).unwrap();
}
}

Value::Array(vec) => {
vec.iter_mut().for_each(|item| {
datetime_to_rfc3339(item, default_timezone)
datetime_to_rfc3339(item, default_offset)
})
}

Value::Object(map) => {
map.iter_mut().for_each(|(_k, v)| {
datetime_to_rfc3339(v, default_timezone)
datetime_to_rfc3339(v, default_offset)
})
}

Expand All @@ -72,7 +59,7 @@ impl Iterator for DatetimeSanitizer {
let next = self.from.next()?;
Some(match next {
Ok(mut val) => {
datetime_to_rfc3339(&mut val, self.default_timezone);
datetime_to_rfc3339(&mut val, self.default_offset);
Ok(val)
}
Err(e) => {
Expand All @@ -84,15 +71,17 @@ impl Iterator for DatetimeSanitizer {

#[derive(Debug, thiserror::Error)]
pub enum DatetimeSanitizeError {
#[error("could not parse timezone as a valid IANA timezone")]
TimezoneParseError(String),
#[error("could not parse offset: {0}")]
OffsetParseError(#[from] time::error::Parse),
}

pub fn sanitize_datetime(config: &ParseConfig, output: Output) -> Result<Output, DatetimeSanitizeError> {
let tz: Tz = config.default_timezone.parse().map_err(DatetimeSanitizeError::TimezoneParseError)?;
eprintln!("sanitize_datetime");
let offset = time::UtcOffset::parse(&config.default_offset, format_description!("[offset_hour]:[offset_minute]")).map_err(DatetimeSanitizeError::OffsetParseError)?;
eprintln!("offset: {:?}", offset);
let sanitizer = DatetimeSanitizer {
from: output,
default_timezone: tz,
default_offset: offset,
};

return Ok(Box::new(sanitizer))
Expand Down
10 changes: 5 additions & 5 deletions crates/parser/tests/sanitize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ fn sanitize_datetime_to_rfc3339() {
let output = run_test(&cfg, input);
output.assert_success(1);

let expected_first_row = "2020-01-01T00:00:00+00:00";
let expected_first_row = "2020-01-01T00:00:00Z";
for value in output.parsed[0].as_object().unwrap().values() {
assert_eq!(expected_first_row, DateTime::parse_from_rfc3339(value.as_str().unwrap()).unwrap().to_rfc3339())
assert_eq!(expected_first_row, value.as_str().unwrap())
}
}

#[test]
fn sanitize_datetime_to_rfc3339_iana_timezone() {
fn sanitize_datetime_to_rfc3339_offset() {
let path = "tests/examples/datetimes-naive.csv";
let cfg = ParseConfig {
default_timezone: "America/New_York".to_string(),
default_offset: "-05:00".to_string(),
filename: Some(path.to_string()),
..Default::default()
};
Expand All @@ -38,7 +38,7 @@ fn sanitize_datetime_to_rfc3339_iana_timezone() {

let expected_first_row = "2020-01-01T00:00:00-05:00";
for value in output.parsed[0].as_object().unwrap().values() {
assert_eq!(expected_first_row, DateTime::parse_from_rfc3339(value.as_str().unwrap()).unwrap().to_rfc3339())
assert_eq!(expected_first_row, value.as_str().unwrap())
}
}

Expand Down
16 changes: 13 additions & 3 deletions crates/parser/tests/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl CommandResult {
}

pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
return run_parser(config, input, true)
}

pub fn run_parser(config: &ParseConfig, input: Input, debug: bool) -> CommandResult {
use std::io::BufRead;
use std::process::{Command, Stdio};

Expand All @@ -57,13 +61,15 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
serde_json::to_writer_pretty(&mut cfg_file, config).expect("failed to write config");
std::mem::drop(cfg_file);

let debug_env = if debug { vec![("PARSER_LOG", "parser=debug")] } else { vec![] };

let mut process = Command::cargo_bin("flow-parser")
.expect("to find flow-parser binary")
.args(&["parse", "--config-file", cfg_path.to_str().unwrap()])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env("PARSER_LOG", "parser=debug")
.envs(debug_env)
.spawn()
.expect("failed to spawn parser process");

Expand All @@ -83,7 +89,9 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
});
let mut parsed = Vec::new();
for line in output.stdout.lines() {
println!("parser output line: {:?}", line);
if debug {
println!("parser output line: {:?}", line);
}
parsed.push(
serde_json::from_str(&line.unwrap()).expect("failed to deserialize parser output"),
);
Expand All @@ -92,7 +100,9 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {

// Print stderr so that it will show up in the output if the test fails.
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
println!("parser stderr:\n{}", stderr);
if debug {
println!("parser stderr:\n{}", stderr);
}

CommandResult {
parsed,
Expand Down

0 comments on commit 7205e15

Please sign in to comment.