Skip to content

Commit

Permalink
parser: sanitize timestamps to RFC3339
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 26, 2023
1 parent 1cf02e0 commit fbe8857
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 3 deletions.
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ byteorder = "1.4"
caseless = "0.2"
chardetng = "0.1"
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.8" }
clap = { version = "3.2", features = ["derive", "env"] }
colored_json = "3"
comfy-table = "6.1"
Expand Down
1 change: 1 addition & 0 deletions crates/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ bytes = { workspace = true }
caseless = { workspace = true }
chardetng = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
csv = { workspace = true }
encoding_rs = { workspace = true }
flate2 = { workspace = true }
Expand Down
28 changes: 26 additions & 2 deletions crates/parser/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,15 @@ impl schemars::JsonSchema for ErrorThreshold {
}
}

fn default_timezone_string() -> String {
"UTC".to_string()
}

// Fields annotated with `schemars(skip)` will not appear in the JSON schema, and thus won't be
// shown in the UI. These are things that connectors set programatically when it generates the
// config. We could consider moving these fields to be CLI arguments if we want a clearer
// separation.
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
#[schemars(
title = "Parser Configuration",
description = "Configures how files are parsed"
Expand All @@ -550,6 +554,11 @@ pub struct ParseConfig {
#[serde(default)]
pub compression: DefaultNullIsAutomatic<Compression>,

/// The default timezone to use when parsing timestamps that do not have a timezone. Timezones
/// must be specified as a valid IANA name. Defaults to UTC.
#[serde(default="default_timezone_string")]
pub default_timezone: String,

/// filename is used for format inference. It will be ignored if `format` is specified.
#[serde(default)]
#[schemars(skip)]
Expand Down Expand Up @@ -582,6 +591,21 @@ pub struct ParseConfig {
pub content_encoding: Option<String>,
}

impl Default for ParseConfig {
fn default() -> Self {
ParseConfig {
format: Default::default(),
compression: Default::default(),
default_timezone: default_timezone_string(),
filename: Default::default(),
add_record_offset: Default::default(),
add_values: Default::default(),
content_type: Default::default(),
content_encoding: Default::default(),
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("failed to read config: {0}")]
Expand Down Expand Up @@ -745,7 +769,7 @@ mod test {
}
},
"filename": "tha-file",
"compression": "zip",
"compression": "zip"
});

let r1: ParseConfig = serde_json::from_value(c1).expect("deserialize config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ expression: schema
}
]
},
"defaultTimezone": {
"description": "The default timezone to use when parsing timestamps that do not have a timezone. Timezones must be specified as a valid IANA name. Defaults to UTC.",
"default": "UTC",
"type": "string"
},
"format": {
"description": "Determines how to parse the contents. The default, 'Auto', will try to determine the format automatically based on the file extension or MIME type, if available.",
"default": {
Expand Down
7 changes: 6 additions & 1 deletion crates/parser/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod avro;
pub mod character_separated;
pub mod json;
pub mod protobuf;
pub mod sanitize;

use crate::config::ErrorThreshold;
use crate::decorate::{AddFieldError, Decorator};
Expand Down Expand Up @@ -46,6 +47,9 @@ pub enum ParseError {

#[error("error limit exceeded")]
ErrorLimitExceeded(ErrorThreshold),

#[error("failed to sanitize documents: {0}")]
SanitizeError(#[from] sanitize::SanitizeError),
}

/// Runs format inference if the config does not specify a `format`. The expectation is that more
Expand Down Expand Up @@ -162,7 +166,8 @@ fn parse_file(
starting_offset: u64,
) -> Result<u64, ParseError> {
let output = parser.parse(input)?;
format_output(&config, output, dest, starting_offset)
let sanitized_output = sanitize::sanitize_output(&config, output)?;
format_output(&config, sanitized_output, dest, starting_offset)
}

fn parser_for(format: Format) -> Box<dyn Parser> {
Expand Down
98 changes: 98 additions & 0 deletions crates/parser/src/format/sanitize/datetime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::{ParseConfig, Output, format::ParseResult, ParseError};
use chrono::{DateTime, FixedOffset, SecondsFormat};
use chrono_tz::Tz;
use serde_json::Value;

struct DatetimeSanitizer {
from: Output,
default_timezone: Tz,
}

const NAIVE_FORMATS: [&'static str; 4] = [
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%.3f",
"%Y-%m-%d %H:%M:%S%.3f",
"%Y-%m-%d %H:%M:%S",
];

const FORMATS: [&'static str; 2] = [
"%Y-%m-%d %H:%M:%S%.3f%:z",
"%Y-%m-%d %H:%M:%S%:z",
];

fn datetime_to_rfc3339(val: &mut Value, default_timezone: Tz) {
match val {
Value::String(s) => {
let mut parsed: Option<DateTime<FixedOffset>> = None;

for f in FORMATS {
parsed = parsed.or_else(||
chrono::DateTime::parse_from_str(&s, f).ok()
)
}

if let Some(ts) = parsed {
*s = ts.to_rfc3339_opts(SecondsFormat::AutoSi, true);
} else {
let mut naive_parsed: Option<DateTime<Tz>> = None;

for f in NAIVE_FORMATS {
naive_parsed = naive_parsed.or_else(||
chrono::NaiveDateTime::parse_from_str(&s, f).map(|d| d.and_local_timezone(default_timezone).unwrap()).ok()
)
}

if let Some(ts) = naive_parsed {
*s = ts.to_rfc3339_opts(SecondsFormat::AutoSi, true);
}
}
}

Value::Array(vec) => {
vec.iter_mut().for_each(|item| {
datetime_to_rfc3339(item, default_timezone)
})
}

Value::Object(map) => {
map.iter_mut().for_each(|(_k, v)| {
datetime_to_rfc3339(v, default_timezone)
})
}

_ => {}
}
}

impl Iterator for DatetimeSanitizer {
type Item = ParseResult;

fn next(&mut self) -> Option<Self::Item> {
let next = self.from.next()?;
Some(match next {
Ok(mut val) => {
datetime_to_rfc3339(&mut val, self.default_timezone);
Ok(val)
}
Err(e) => {
Err(ParseError::Parse(Box::new(e)))
}
})
}
}

#[derive(Debug, thiserror::Error)]
pub enum DatetimeSanitizeError {
#[error("could not parse timezone as a valid IANA timezone")]
TimezoneParseError(String),
}

pub fn sanitize_datetime(config: &ParseConfig, output: Output) -> Result<Output, DatetimeSanitizeError> {
let tz: Tz = config.default_timezone.parse().map_err(DatetimeSanitizeError::TimezoneParseError)?;
let sanitizer = DatetimeSanitizer {
from: output,
default_timezone: tz,
};

return Ok(Box::new(sanitizer))
}
14 changes: 14 additions & 0 deletions crates/parser/src/format/sanitize/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::{ParseConfig, Output};

pub mod datetime;

#[derive(Debug, thiserror::Error)]
pub enum SanitizeError {
#[error("sanitizing datetimes: {0}")]
DatetimeSanitizeError(#[from] datetime::DatetimeSanitizeError),
}

pub fn sanitize_output(config: &ParseConfig, output: Output) -> Result<Output, SanitizeError> {
datetime::sanitize_datetime(config, output).map_err(SanitizeError::DatetimeSanitizeError)
}

2 changes: 2 additions & 0 deletions crates/parser/tests/examples/datetimes-naive.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"no_timezone", "no_timezone_fractional", "no_t", "no_t_fractional"
"2020-01-01T00:00:00","2020-01-01T00:00:00.000","2020-01-01 00:00:00","2020-01-01 00:00:00.000"
2 changes: 2 additions & 0 deletions crates/parser/tests/examples/datetimes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"no_timezone", "no_timezone_fractional", "rfc3339", "timezone_offset", "no_t", "no_t_fractional"
"2020-01-01T00:00:00","2020-01-01T00:00:00.000","2020-01-01T00:00:00Z","2020-01-01 00:00:00+00:00","2020-01-01 00:00:00","2020-01-01 00:00:00.000"
44 changes: 44 additions & 0 deletions crates/parser/tests/sanitize_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
mod testutil;

use chrono::DateTime;

use parser::ParseConfig;
use testutil::{input_for_file, run_test};

#[test]
fn sanitize_datetime_to_rfc3339() {
let path = "tests/examples/datetimes.csv";
let cfg = ParseConfig {
default_timezone: "UTC".to_string(),
filename: Some(path.to_string()),
..Default::default()
};

let input = input_for_file(path);
let output = run_test(&cfg, input);
output.assert_success(1);

let expected_first_row = DateTime::parse_from_rfc3339("2020-01-01T00:00:00Z").unwrap();
for value in output.parsed[0].as_object().unwrap().values() {
assert_eq!(expected_first_row, DateTime::parse_from_rfc3339(value.as_str().unwrap()).unwrap())
}
}

#[test]
fn sanitize_datetime_to_rfc3339_iana_timezone() {
let path = "tests/examples/datetimes-naive.csv";
let cfg = ParseConfig {
default_timezone: "America/New_York".to_string(),
filename: Some(path.to_string()),
..Default::default()
};

let input = input_for_file(path);
let output = run_test(&cfg, input);
output.assert_success(1);

let expected_first_row = DateTime::parse_from_rfc3339("2020-01-01T00:00:00-05:00").unwrap();
for value in output.parsed[0].as_object().unwrap().values() {
assert_eq!(expected_first_row, DateTime::parse_from_rfc3339(value.as_str().unwrap()).unwrap())
}
}
Loading

0 comments on commit fbe8857

Please sign in to comment.