Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parser: sanitize timestamps to RFC3339 (again) #1245

Merged
merged 6 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ license.workspace = true
name = "flow-parser"
path = "src/main.rs"

[[bench]]
name = "peoples_500"
harness = false

[dependencies]
json = { path = "../json" }
Expand All @@ -23,6 +26,8 @@ bytes = { workspace = true }
caseless = { workspace = true }
chardetng = { workspace = true }
chrono = { workspace = true }
time = { workspace = true }
criterion = { workspace = true }
csv = { workspace = true }
encoding_rs = { workspace = true }
flate2 = { workspace = true }
Expand Down
500 changes: 500 additions & 0 deletions crates/parser/benches/data/people-500.csv

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions crates/parser/benches/peoples_500.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#[path="../tests/testutil.rs"] mod testutil;

use criterion::{criterion_group, criterion_main, Criterion, black_box};

use parser::ParseConfig;
use testutil::{input_for_file, run_parser};

fn peoples_500(c: &mut Criterion) {
let path = "benches/data/people-500.csv";
let cfg = ParseConfig {
filename: Some(path.to_string()),
..Default::default()
};

c.bench_function("peoples_500", |b| b.iter(|| {
let input = input_for_file(path);
run_parser(&cfg, input, false);
}));
}

criterion_group!(benches, peoples_500);
criterion_main!(benches);
28 changes: 26 additions & 2 deletions crates/parser/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,15 @@ impl schemars::JsonSchema for ErrorThreshold {
}
}

fn default_offset_string() -> String {
"+00:00".to_string()
}

// Fields annotated with `schemars(skip)` will not appear in the JSON schema, and thus won't be
// shown in the UI. These are things that connectors set programatically when it generates the
// config. We could consider moving these fields to be CLI arguments if we want a clearer
// separation.
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
#[schemars(
title = "Parser Configuration",
description = "Configures how files are parsed"
Expand All @@ -550,6 +554,11 @@ pub struct ParseConfig {
#[serde(default)]
pub compression: DefaultNullIsAutomatic<Compression>,

/// The default timezone to use when parsing timestamps that do not have a timezone. Timezones
/// must be specified as an +/-HH:MM offset, defaults to +00:00.
#[serde(default="default_offset_string")]
pub default_offset: String,

/// filename is used for format inference. It will be ignored if `format` is specified.
#[serde(default)]
#[schemars(skip)]
Expand Down Expand Up @@ -582,6 +591,21 @@ pub struct ParseConfig {
pub content_encoding: Option<String>,
}

impl Default for ParseConfig {
fn default() -> Self {
ParseConfig {
format: Default::default(),
compression: Default::default(),
default_offset: default_offset_string(),
filename: Default::default(),
add_record_offset: Default::default(),
add_values: Default::default(),
content_type: Default::default(),
content_encoding: Default::default(),
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("failed to read config: {0}")]
Expand Down Expand Up @@ -745,7 +769,7 @@ mod test {
}
},
"filename": "tha-file",
"compression": "zip",
"compression": "zip"
});

let r1: ParseConfig = serde_json::from_value(c1).expect("deserialize config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ expression: schema
}
]
},
"defaultOffset": {
"description": "The default timezone to use when parsing timestamps that do not have a timezone. Timezones must be specified as an +/-HH:MM offset, defaults to +00:00.",
"default": "+00:00",
"type": "string"
},
"format": {
"description": "Determines how to parse the contents. The default, 'Auto', will try to determine the format automatically based on the file extension or MIME type, if available.",
"default": {
Expand Down
7 changes: 6 additions & 1 deletion crates/parser/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod avro;
pub mod character_separated;
pub mod json;
pub mod protobuf;
pub mod sanitize;

use crate::config::ErrorThreshold;
use crate::decorate::{AddFieldError, Decorator};
Expand Down Expand Up @@ -46,6 +47,9 @@ pub enum ParseError {

#[error("error limit exceeded")]
ErrorLimitExceeded(ErrorThreshold),

#[error("failed to sanitize documents: {0}")]
SanitizeError(#[from] sanitize::SanitizeError),
}

/// Runs format inference if the config does not specify a `format`. The expectation is that more
Expand Down Expand Up @@ -162,7 +166,8 @@ fn parse_file(
starting_offset: u64,
) -> Result<u64, ParseError> {
let output = parser.parse(input)?;
format_output(&config, output, dest, starting_offset)
let sanitized_output = sanitize::sanitize_output(&config, output)?;
format_output(&config, sanitized_output, dest, starting_offset)
}

fn parser_for(format: Format) -> Box<dyn Parser> {
Expand Down
91 changes: 91 additions & 0 deletions crates/parser/src/format/sanitize/datetime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::{ParseConfig, Output, format::ParseResult};
use time::macros::format_description;
use serde_json::Value;

struct DatetimeSanitizer {
from: Output,
default_offset: time::UtcOffset,
}

// Here we are trying to parse non-RFC3339 dates
fn datetime_to_rfc3339(val: &mut Value, default_offset: time::UtcOffset) {
match val {
Value::String(s) => {
// We first try to parse a more relaxed format that allows all the different formats we
// support. At this stage we are trying to see if the value we see is a timestamp that
// we can parse at all. If we are successful at parsing this value, then we try to
// parse a more specific format for timestamps *with timezone*. If we are successful,
// we use the parsed timezone, otherwise we use the default offset provided.
let primitive_format = format_description!(
version = 2,
"[year]-[month]-[day][optional [T]][optional [ ]][hour]:[minute]:[second][optional [.[subsecond]]][optional [Z]][optional [z]][optional [[offset_hour]:[offset_minute]]]"
);

let parsed_no_tz = time::PrimitiveDateTime::parse(&s, primitive_format).ok();

let parsed_with_tz = if parsed_no_tz.is_some() {
let offset_format = format_description!(
version = 2,
"[first
[[year]-[month]-[day][optional [T]][optional [ ]][hour]:[minute]:[second][optional [.[subsecond]]]Z]
[[year]-[month]-[day][optional [T]][optional [ ]][hour]:[minute]:[second][optional [.[subsecond]]]z]
[[year]-[month]-[day][optional [T]][optional [ ]][hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]:[offset_minute]]
]"
);

time::OffsetDateTime::parse(&s, offset_format).ok()
} else { None };

if let Some(parsed) = parsed_with_tz {
*s = parsed.format(&time::format_description::well_known::Rfc3339).unwrap();
} else if let Some(parsed) = parsed_no_tz {
*s = parsed.assume_offset(default_offset).format(&time::format_description::well_known::Rfc3339).unwrap();
}
}

Value::Array(vec) => {
vec.iter_mut().for_each(|item| {
datetime_to_rfc3339(item, default_offset)
})
}

Value::Object(map) => {
map.iter_mut().for_each(|(_k, v)| {
datetime_to_rfc3339(v, default_offset)
})
}

_ => {}
}
}

impl Iterator for DatetimeSanitizer {
type Item = ParseResult;

fn next(&mut self) -> Option<Self::Item> {
let next = self.from.next()?;
Some(match next {
Ok(mut val) => {
datetime_to_rfc3339(&mut val, self.default_offset);
Ok(val)
}
e => e
})
}
}

#[derive(Debug, thiserror::Error)]
pub enum DatetimeSanitizeError {
#[error("could not parse offset: {0}")]
OffsetParseError(#[from] time::error::Parse),
}

pub fn sanitize_datetime(config: &ParseConfig, output: Output) -> Result<Output, DatetimeSanitizeError> {
let offset = time::UtcOffset::parse(&config.default_offset, format_description!("[offset_hour]:[offset_minute]")).map_err(DatetimeSanitizeError::OffsetParseError)?;
let sanitizer = DatetimeSanitizer {
from: output,
default_offset: offset,
};

return Ok(Box::new(sanitizer))
}
14 changes: 14 additions & 0 deletions crates/parser/src/format/sanitize/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::{ParseConfig, Output};

pub mod datetime;

#[derive(Debug, thiserror::Error)]
pub enum SanitizeError {
#[error("sanitizing datetimes: {0}")]
DatetimeSanitizeError(#[from] datetime::DatetimeSanitizeError),
}

pub fn sanitize_output(config: &ParseConfig, output: Output) -> Result<Output, SanitizeError> {
datetime::sanitize_datetime(config, output).map_err(SanitizeError::DatetimeSanitizeError)
}

1 change: 1 addition & 0 deletions crates/parser/tests/examples/datetimes-nested.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"x": ["2020-01-01 00:00:00"], "y": { "z": ["2020-01-01 00:00:00"], "k": "2020-01-01 00:00:00" } }
70 changes: 70 additions & 0 deletions crates/parser/tests/sanitize_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
mod testutil;

use std::fs::File;
use std::io::Write;

use parser::ParseConfig;
use testutil::{input_for_file, run_test};
use tempfile::tempdir;

fn test_sanitize(description: &str, input: &str, expected: &str, default_offset: &str) {
let dir = tempdir().unwrap();
let path = dir.path().join("sanitize-test.csv");
let mut f = File::create(path.clone()).unwrap();
writeln!(f, "header").unwrap();
writeln!(f, "\"{}\"", input).unwrap();

let cfg = ParseConfig {
filename: Some(path.to_string_lossy().to_string()),
default_offset: default_offset.to_string(),
..Default::default()
};

let input = input_for_file(path);
let output = run_test(&cfg, input);
output.assert_success(1);

for value in output.parsed[0].as_object().unwrap().values() {
assert_eq!(expected, value.as_str().unwrap(), "{}", description)
}
}

#[test]
fn sanitize_datetime_to_rfc3339() {
// With Timezone
test_sanitize("tz rfc3339 utc" , "2020-01-01T12:34:56Z" , "2020-01-01T12:34:56Z" , "+00:00");
test_sanitize("tz rfc3339 offset" , "2020-01-01T12:34:56-04:00" , "2020-01-01T12:34:56-04:00" , "+00:00");
test_sanitize("tz rfc3339 fractional" , "2020-01-01T12:34:56.999999999Z" , "2020-01-01T12:34:56.999999999Z" , "+00:00");
test_sanitize("tz rfc3339 fractional + offset" , "2020-01-01T12:34:56.999999999-04:00" , "2020-01-01T12:34:56.999999999-04:00" , "+00:00");
test_sanitize("tz spaced fractional + offset" , "2020-01-01 12:34:56.999999999-04:00" , "2020-01-01T12:34:56.999999999-04:00" , "+00:00");
test_sanitize("tz spaced fractional + utc" , "2020-01-01 12:34:56.999999999Z" , "2020-01-01T12:34:56.999999999Z" , "+00:00");
test_sanitize("tz spaced offset" , "2020-01-01 12:34:56-04:00" , "2020-01-01T12:34:56-04:00" , "+00:00");
test_sanitize("tz spaced utc" , "2020-01-01 12:34:56Z" , "2020-01-01T12:34:56Z" , "+00:00");

// Without Timezone
test_sanitize("naive t" , "2020-01-01T12:34:56" , "2020-01-01T12:34:56Z" , "+00:00");
test_sanitize("naive t + fractional" , "2020-01-01T12:34:56.999999999" , "2020-01-01T12:34:56.999999999Z" , "+00:00");
test_sanitize("naive t + fractional 2" , "2020-01-01T12:34:56.999999999" , "2020-01-01T12:34:56.999999999+04:00" , "+04:00");
test_sanitize("naive space" , "2020-01-01 12:34:56" , "2020-01-01T12:34:56Z" , "+00:00");
test_sanitize("naive space + fractional" , "2020-01-01 12:34:56.999999999" , "2020-01-01T12:34:56.999999999Z" , "+00:00");
test_sanitize("naive space + fractional 2" , "2020-01-01 12:34:56.999999999" , "2020-01-01T12:34:56.999999999+04:00" , "+04:00");
}

#[test]
fn sanitize_datetime_to_rfc3339_nested() {
let path = "tests/examples/datetimes-nested.json";
let cfg = ParseConfig {
filename: Some(path.to_string()),
..Default::default()
};

let input = input_for_file(path);
let output = run_test(&cfg, input);
output.assert_success(1);

let expected = "2020-01-01T00:00:00Z";
let out = output.parsed[0].as_object().unwrap();
assert_eq!(expected, out.get("x").unwrap().as_array().unwrap()[0].as_str().unwrap());
assert_eq!(expected, out.get("y").unwrap().as_object().unwrap().get("z").unwrap().as_array().unwrap()[0].as_str().unwrap());
assert_eq!(expected, out.get("y").unwrap().as_object().unwrap().get("k").unwrap().as_str().unwrap());
}
16 changes: 13 additions & 3 deletions crates/parser/tests/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl CommandResult {
}

pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
return run_parser(config, input, true)
}

pub fn run_parser(config: &ParseConfig, input: Input, debug: bool) -> CommandResult {
use std::io::BufRead;
use std::process::{Command, Stdio};

Expand All @@ -57,13 +61,15 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
serde_json::to_writer_pretty(&mut cfg_file, config).expect("failed to write config");
std::mem::drop(cfg_file);

let debug_env = if debug { vec![("PARSER_LOG", "parser=debug")] } else { vec![] };

let mut process = Command::cargo_bin("flow-parser")
.expect("to find flow-parser binary")
.args(&["parse", "--config-file", cfg_path.to_str().unwrap()])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env("PARSER_LOG", "parser=debug")
.envs(debug_env)
.spawn()
.expect("failed to spawn parser process");

Expand All @@ -83,7 +89,9 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
});
let mut parsed = Vec::new();
for line in output.stdout.lines() {
println!("parser output line: {:?}", line);
if debug {
println!("parser output line: {:?}", line);
}
parsed.push(
serde_json::from_str(&line.unwrap()).expect("failed to deserialize parser output"),
);
Expand All @@ -92,7 +100,9 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {

// Print stderr so that it will show up in the output if the test fails.
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
println!("parser stderr:\n{}", stderr);
if debug {
println!("parser stderr:\n{}", stderr);
}

CommandResult {
parsed,
Expand Down
Loading