Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parser: sanitize timestamps to RFC3339 #1201

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ license.workspace = true
name = "flow-parser"
path = "src/main.rs"

[[bench]]
name = "peoples_500"
harness = false

[dependencies]
json = { path = "../json" }
Expand All @@ -23,6 +26,8 @@ bytes = { workspace = true }
caseless = { workspace = true }
chardetng = { workspace = true }
chrono = { workspace = true }
time = { workspace = true }
criterion = { workspace = true }
csv = { workspace = true }
encoding_rs = { workspace = true }
flate2 = { workspace = true }
Expand Down
500 changes: 500 additions & 0 deletions crates/parser/benches/data/people-500.csv

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions crates/parser/benches/peoples_500.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#[path="../tests/testutil.rs"] mod testutil;

use criterion::{criterion_group, criterion_main, Criterion, black_box};

use parser::ParseConfig;
use testutil::{input_for_file, run_parser};

fn peoples_500(c: &mut Criterion) {
let path = "benches/data/people-500.csv";
let cfg = ParseConfig {
filename: Some(path.to_string()),
..Default::default()
};

c.bench_function("peoples_500", |b| b.iter(|| {
let input = input_for_file(path);
run_parser(&cfg, input, false);
}));
}

criterion_group!(benches, peoples_500);
criterion_main!(benches);
28 changes: 26 additions & 2 deletions crates/parser/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,15 @@ impl schemars::JsonSchema for ErrorThreshold {
}
}

fn default_offset_string() -> String {
"+00:00".to_string()
}

// Fields annotated with `schemars(skip)` will not appear in the JSON schema, and thus won't be
// shown in the UI. These are things that connectors set programatically when it generates the
// config. We could consider moving these fields to be CLI arguments if we want a clearer
// separation.
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
#[schemars(
title = "Parser Configuration",
description = "Configures how files are parsed"
Expand All @@ -550,6 +554,11 @@ pub struct ParseConfig {
#[serde(default)]
pub compression: DefaultNullIsAutomatic<Compression>,

/// The default timezone to use when parsing timestamps that do not have a timezone. Timezones
/// must be specified as an +/-HH:MM offset, defaults to +00:00.
#[serde(default="default_offset_string")]
pub default_offset: String,

/// filename is used for format inference. It will be ignored if `format` is specified.
#[serde(default)]
#[schemars(skip)]
Expand Down Expand Up @@ -582,6 +591,21 @@ pub struct ParseConfig {
pub content_encoding: Option<String>,
}

impl Default for ParseConfig {
fn default() -> Self {
ParseConfig {
format: Default::default(),
compression: Default::default(),
default_offset: default_offset_string(),
filename: Default::default(),
add_record_offset: Default::default(),
add_values: Default::default(),
content_type: Default::default(),
content_encoding: Default::default(),
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("failed to read config: {0}")]
Expand Down Expand Up @@ -745,7 +769,7 @@ mod test {
}
},
"filename": "tha-file",
"compression": "zip",
"compression": "zip"
});

let r1: ParseConfig = serde_json::from_value(c1).expect("deserialize config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ expression: schema
}
]
},
"defaultOffset": {
"description": "The default timezone to use when parsing timestamps that do not have a timezone. Timezones must be specified as an +/-HH:MM offset, defaults to +00:00.",
"default": "+00:00",
"type": "string"
},
"format": {
"description": "Determines how to parse the contents. The default, 'Auto', will try to determine the format automatically based on the file extension or MIME type, if available.",
"default": {
Expand Down
7 changes: 6 additions & 1 deletion crates/parser/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod avro;
pub mod character_separated;
pub mod json;
pub mod protobuf;
pub mod sanitize;

use crate::config::ErrorThreshold;
use crate::decorate::{AddFieldError, Decorator};
Expand Down Expand Up @@ -46,6 +47,9 @@ pub enum ParseError {

#[error("error limit exceeded")]
ErrorLimitExceeded(ErrorThreshold),

#[error("failed to sanitize documents: {0}")]
SanitizeError(#[from] sanitize::SanitizeError),
}

/// Runs format inference if the config does not specify a `format`. The expectation is that more
Expand Down Expand Up @@ -162,7 +166,8 @@ fn parse_file(
starting_offset: u64,
) -> Result<u64, ParseError> {
let output = parser.parse(input)?;
format_output(&config, output, dest, starting_offset)
let sanitized_output = sanitize::sanitize_output(&config, output)?;
format_output(&config, sanitized_output, dest, starting_offset)
}

fn parser_for(format: Format) -> Box<dyn Parser> {
Expand Down
88 changes: 88 additions & 0 deletions crates/parser/src/format/sanitize/datetime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use crate::{ParseConfig, Output, format::ParseResult, ParseError};
use time::macros::format_description;
use serde_json::Value;

struct DatetimeSanitizer {
from: Output,
default_offset: time::UtcOffset,
}

// Here we are trying to parse non-RFC3339 dates
fn datetime_to_rfc3339(val: &mut Value, default_offset: time::UtcOffset) {
match val {
Value::String(s) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure exactly what we should do about this, but would like to point out that in the common case where a string isn't a timestamp, we're trying to parse it 6 times. If there's a way to cut down on that, it might be worth it.

One possibility might be to switch from chrono to the time crate, which has the ability to specify optional elements in the format specifier. The time crate is generally preferred over chrono anyway. We currently use both (chrono being used a bit more, actually), but I'd like us to gradually standardize on just using time if we can. So it might be worthwhile to switch to time now, if it seems like it could significantly cut down on the amount of work we have to do here.

All this is of course speculative without any sort of benchmarks. I just brought up the current lack of benchmarks after standup, and Johnny's suggestion was to just try a basic before and after tests against a big CSV, so we can at least ensure that this isn't regressing performance egregiously. I agree that seems like a good compromise to avoid blowing up the scope of this PR. And I think we can let that determine whether it's worth switching to the time crate. As long as performance hasn't gotten significantly worse, it's fine the way it is for now.

let primitive_format = format_description!(
mdibaiee marked this conversation as resolved.
Show resolved Hide resolved
version = 2,
"[year]-[month]-[day][optional [T]][optional [ ]][hour]:[minute]:[second][optional [.[subsecond]]][optional [Z]][optional [z]][optional [[offset_hour]:[offset_minute]]]"
);

let parsed_no_tz = time::PrimitiveDateTime::parse(&s, primitive_format).ok();

let parsed_with_tz = if parsed_no_tz.is_some() {
let offset_format = format_description!(
version = 2,
"[first
[[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]]Z]
[[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]]z]
[[year]-[month]-[day] [hour]:[minute]:[second][optional [.[subsecond]]][offset_hour]:[offset_minute]]
]"
);

time::OffsetDateTime::parse(&s, offset_format).ok()
} else { None };

if let Some(parsed) = parsed_with_tz {
*s = parsed.format(&time::format_description::well_known::Rfc3339).unwrap();
} else if let Some(parsed) = parsed_no_tz {
*s = parsed.assume_offset(default_offset).format(&time::format_description::well_known::Rfc3339).unwrap();
}
}

Value::Array(vec) => {
vec.iter_mut().for_each(|item| {
datetime_to_rfc3339(item, default_offset)
})
}

Value::Object(map) => {
map.iter_mut().for_each(|(_k, v)| {
datetime_to_rfc3339(v, default_offset)
})
}

_ => {}
}
}

impl Iterator for DatetimeSanitizer {
type Item = ParseResult;

fn next(&mut self) -> Option<Self::Item> {
let next = self.from.next()?;
Some(match next {
Ok(mut val) => {
datetime_to_rfc3339(&mut val, self.default_offset);
Ok(val)
}
Err(e) => {
mdibaiee marked this conversation as resolved.
Show resolved Hide resolved
Err(ParseError::Parse(Box::new(e)))
}
})
}
}

#[derive(Debug, thiserror::Error)]
pub enum DatetimeSanitizeError {
#[error("could not parse offset: {0}")]
OffsetParseError(#[from] time::error::Parse),
}

pub fn sanitize_datetime(config: &ParseConfig, output: Output) -> Result<Output, DatetimeSanitizeError> {
let offset = time::UtcOffset::parse(&config.default_offset, format_description!("[offset_hour]:[offset_minute]")).map_err(DatetimeSanitizeError::OffsetParseError)?;
let sanitizer = DatetimeSanitizer {
from: output,
default_offset: offset,
};

return Ok(Box::new(sanitizer))
}
14 changes: 14 additions & 0 deletions crates/parser/src/format/sanitize/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::{ParseConfig, Output};

pub mod datetime;

#[derive(Debug, thiserror::Error)]
pub enum SanitizeError {
#[error("sanitizing datetimes: {0}")]
DatetimeSanitizeError(#[from] datetime::DatetimeSanitizeError),
}

pub fn sanitize_output(config: &ParseConfig, output: Output) -> Result<Output, SanitizeError> {
datetime::sanitize_datetime(config, output).map_err(SanitizeError::DatetimeSanitizeError)
}

2 changes: 2 additions & 0 deletions crates/parser/tests/examples/datetimes-naive.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"no_timezone", "no_timezone_fractional", "no_t", "no_t_fractional", "no_t_large_fractional"
"2020-01-01T00:00:00","2020-01-01T00:00:00.000","2020-01-01 00:00:00","2020-01-01 00:00:00.000","2020-01-01 00:00:00.000000000"
1 change: 1 addition & 0 deletions crates/parser/tests/examples/datetimes-nested.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"x": ["2020-01-01 00:00:00"], "y": { "z": ["2020-01-01 00:00:00"], "k": "2020-01-01 00:00:00" } }
2 changes: 2 additions & 0 deletions crates/parser/tests/examples/datetimes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"no_timezone", "no_timezone_fractional", "rfc3339", "timezone_offset", "no_t", "no_t_fractional", "no_t_fractional_large"
"2020-01-01T00:00:00","2020-01-01T00:00:00.000","2020-01-01T00:00:00Z","2020-01-01 00:00:00+00:00","2020-01-01 00:00:00","2020-01-01 00:00:00.000","2020-01-01 00:00:00.000000000"
60 changes: 60 additions & 0 deletions crates/parser/tests/sanitize_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
mod testutil;

use parser::ParseConfig;
use testutil::{input_for_file, run_test};

#[test]
fn sanitize_datetime_to_rfc3339() {
let path = "tests/examples/datetimes.csv";
let cfg = ParseConfig {
filename: Some(path.to_string()),
..Default::default()
};

let input = input_for_file(path);
let output = run_test(&cfg, input);
output.assert_success(1);

let expected_first_row = "2020-01-01T00:00:00Z";
for value in output.parsed[0].as_object().unwrap().values() {
assert_eq!(expected_first_row, value.as_str().unwrap())
}
}

#[test]
fn sanitize_datetime_to_rfc3339_offset() {
let path = "tests/examples/datetimes-naive.csv";
let cfg = ParseConfig {
default_offset: "-05:00".to_string(),
filename: Some(path.to_string()),
..Default::default()
};

let input = input_for_file(path);
let output = run_test(&cfg, input);
output.assert_success(1);

let expected_first_row = "2020-01-01T00:00:00-05:00";
for value in output.parsed[0].as_object().unwrap().values() {
assert_eq!(expected_first_row, value.as_str().unwrap())
}
}

#[test]
fn sanitize_datetime_to_rfc3339_nested() {
let path = "tests/examples/datetimes-nested.json";
let cfg = ParseConfig {
filename: Some(path.to_string()),
..Default::default()
};

let input = input_for_file(path);
let output = run_test(&cfg, input);
output.assert_success(1);

let expected = "2020-01-01T00:00:00Z";
let out = output.parsed[0].as_object().unwrap();
assert_eq!(expected, out.get("x").unwrap().as_array().unwrap()[0].as_str().unwrap());
assert_eq!(expected, out.get("y").unwrap().as_object().unwrap().get("z").unwrap().as_array().unwrap()[0].as_str().unwrap());
assert_eq!(expected, out.get("y").unwrap().as_object().unwrap().get("k").unwrap().as_str().unwrap());
}
16 changes: 13 additions & 3 deletions crates/parser/tests/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl CommandResult {
}

pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
return run_parser(config, input, true)
}

pub fn run_parser(config: &ParseConfig, input: Input, debug: bool) -> CommandResult {
use std::io::BufRead;
use std::process::{Command, Stdio};

Expand All @@ -57,13 +61,15 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
serde_json::to_writer_pretty(&mut cfg_file, config).expect("failed to write config");
std::mem::drop(cfg_file);

let debug_env = if debug { vec![("PARSER_LOG", "parser=debug")] } else { vec![] };

let mut process = Command::cargo_bin("flow-parser")
.expect("to find flow-parser binary")
.args(&["parse", "--config-file", cfg_path.to_str().unwrap()])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env("PARSER_LOG", "parser=debug")
.envs(debug_env)
.spawn()
.expect("failed to spawn parser process");

Expand All @@ -83,7 +89,9 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {
});
let mut parsed = Vec::new();
for line in output.stdout.lines() {
println!("parser output line: {:?}", line);
if debug {
println!("parser output line: {:?}", line);
}
parsed.push(
serde_json::from_str(&line.unwrap()).expect("failed to deserialize parser output"),
);
Expand All @@ -92,7 +100,9 @@ pub fn run_test(config: &ParseConfig, input: Input) -> CommandResult {

// Print stderr so that it will show up in the output if the test fails.
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
println!("parser stderr:\n{}", stderr);
if debug {
println!("parser stderr:\n{}", stderr);
}

CommandResult {
parsed,
Expand Down
2 changes: 2 additions & 0 deletions go/parser/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
Compression string `json:"compression,omitempty"`
ContentType string `json:"contentType,omitempty"`
ContentEncoding string `json:"contentEncoding,omitempty"`
DefaultOffset string `json:"defaultOffset,omitempty"`
}

func (c *Config) Copy() Config {
Expand All @@ -39,6 +40,7 @@ func (c *Config) Copy() Config {
Compression: c.Compression,
ContentType: c.ContentType,
ContentEncoding: c.ContentEncoding,
DefaultOffset: c.DefaultOffset,
}
}

Expand Down
Loading