diff --git a/Cargo.lock b/Cargo.lock index 0e3b917b22..4dba00d62c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -739,6 +739,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "chrono-tz" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1369bc6b9e9a7dfdae2055f6ec151fe9c554a9d23d357c0237cee2e25eaabb7" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2f5ebdc942f57ed96d560a6d1a459bae5851102a25d5bf89dc04ae453e31ecf" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -2969,6 +2991,15 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +dependencies = [ + "regex", +] + [[package]] name = "parser" version = "0.0.0" @@ -2981,6 +3012,7 @@ dependencies = [ "caseless", "chardetng", "chrono", + "chrono-tz", "clap 3.2.24", "csv", "doc", @@ -3161,6 +3193,44 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand 0.8.5", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.0.12" @@ -4451,6 +4521,12 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "size" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 1583a7b06a..82944f0eb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ byteorder = "1.4" caseless = "0.2" chardetng = "0.1" chrono = { version = "0.4", features = ["serde"] } +chrono-tz = { version = "0.8" } clap = { version = "3.2", features = ["derive", "env"] } colored_json = "3" comfy-table = "6.1" diff --git a/crates/parser/Cargo.toml b/crates/parser/Cargo.toml index 981340c4b9..08c524a0e7 100644 --- a/crates/parser/Cargo.toml +++ b/crates/parser/Cargo.toml @@ -23,6 +23,7 @@ bytes = { workspace = true } caseless = { workspace = true } chardetng = { workspace = true } chrono = { workspace = true } +chrono-tz = { workspace = true } csv = { workspace = true } encoding_rs = { workspace = true } flate2 = { workspace = true } diff --git a/crates/parser/src/config/mod.rs b/crates/parser/src/config/mod.rs index aec6aced31..9abb2e6aca 100644 --- a/crates/parser/src/config/mod.rs +++ b/crates/parser/src/config/mod.rs @@ -528,11 +528,15 @@ impl schemars::JsonSchema for ErrorThreshold { } } +fn default_timezone_string() -> String { + "UTC".to_string() +} + // Fields annotated with `schemars(skip)` will not appear in the JSON schema, and thus won't be // shown in the UI. These are things that connectors set programatically when it generates the // config. We could consider moving these fields to be CLI arguments if we want a clearer // separation. -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, schemars::JsonSchema)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, schemars::JsonSchema)] #[schemars( title = "Parser Configuration", description = "Configures how files are parsed" @@ -550,6 +554,11 @@ pub struct ParseConfig { #[serde(default)] pub compression: DefaultNullIsAutomatic, + /// The default timezone to use when parsing timestamps that do not have a timezone. Timezones + /// must be specified as a valid IANA name. Defaults to UTC. + #[serde(default="default_timezone_string")] + pub default_timezone: String, + /// filename is used for format inference. It will be ignored if `format` is specified. #[serde(default)] #[schemars(skip)] @@ -582,6 +591,21 @@ pub struct ParseConfig { pub content_encoding: Option, } +impl Default for ParseConfig { + fn default() -> Self { + ParseConfig { + format: Default::default(), + compression: Default::default(), + default_timezone: default_timezone_string(), + filename: Default::default(), + add_record_offset: Default::default(), + add_values: Default::default(), + content_type: Default::default(), + content_encoding: Default::default(), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ConfigError { #[error("failed to read config: {0}")] @@ -745,7 +769,7 @@ mod test { } }, "filename": "tha-file", - "compression": "zip", + "compression": "zip" }); let r1: ParseConfig = serde_json::from_value(c1).expect("deserialize config"); diff --git a/crates/parser/src/config/snapshots/parser__config__test__config_schema_is_generated.snap b/crates/parser/src/config/snapshots/parser__config__test__config_schema_is_generated.snap index ad64a08a54..c546871e2b 100644 --- a/crates/parser/src/config/snapshots/parser__config__test__config_schema_is_generated.snap +++ b/crates/parser/src/config/snapshots/parser__config__test__config_schema_is_generated.snap @@ -40,6 +40,11 @@ expression: schema } ] }, + "defaultTimezone": { + "description": "The default timezone to use when parsing timestamps that do not have a timezone. Timezones must be specified as a valid IANA name. Defaults to UTC.", + "default": "UTC", + "type": "string" + }, "format": { "description": "Determines how to parse the contents. The default, 'Auto', will try to determine the format automatically based on the file extension or MIME type, if available.", "default": { diff --git a/crates/parser/src/format/mod.rs b/crates/parser/src/format/mod.rs index a29593a269..fdd5945e58 100644 --- a/crates/parser/src/format/mod.rs +++ b/crates/parser/src/format/mod.rs @@ -2,6 +2,7 @@ pub mod avro; pub mod character_separated; pub mod json; pub mod protobuf; +pub mod sanitize; use crate::config::ErrorThreshold; use crate::decorate::{AddFieldError, Decorator}; @@ -46,6 +47,9 @@ pub enum ParseError { #[error("error limit exceeded")] ErrorLimitExceeded(ErrorThreshold), + + #[error("failed to sanitize documents: {0}")] + SanitizeError(#[from] sanitize::SanitizeError), } /// Runs format inference if the config does not specify a `format`. The expectation is that more @@ -162,7 +166,8 @@ fn parse_file( starting_offset: u64, ) -> Result { let output = parser.parse(input)?; - format_output(&config, output, dest, starting_offset) + let sanitized_output = sanitize::sanitize_output(&config, output)?; + format_output(&config, sanitized_output, dest, starting_offset) } fn parser_for(format: Format) -> Box { diff --git a/crates/parser/src/format/sanitize/datetime.rs b/crates/parser/src/format/sanitize/datetime.rs new file mode 100644 index 0000000000..181503ac18 --- /dev/null +++ b/crates/parser/src/format/sanitize/datetime.rs @@ -0,0 +1,99 @@ +use crate::{ParseConfig, Output, format::ParseResult, ParseError}; +use chrono::{DateTime, FixedOffset, SecondsFormat}; +use chrono_tz::Tz; +use serde_json::Value; + +struct DatetimeSanitizer { + from: Output, + default_timezone: Tz, +} + +const NAIVE_FORMATS: [&'static str; 4] = [ + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M:%S%.3f", + "%Y-%m-%d %H:%M:%S%.3f", + "%Y-%m-%d %H:%M:%S", +]; + +const FORMATS: [&'static str; 2] = [ + "%Y-%m-%d %H:%M:%S%.3f%:z", + "%Y-%m-%d %H:%M:%S%:z", +]; + +fn datetime_to_rfc3339(val: &mut Value, default_timezone: Tz) { + match val { + Value::String(s) => { + let mut parsed: Option> = None; + + for f in FORMATS { + parsed = parsed.or_else(|| + chrono::DateTime::parse_from_str(&s, f).ok() + ) + } + + if let Some(ts) = parsed { + *s = ts.to_rfc3339_opts(SecondsFormat::AutoSi, true); + return + } + + let mut naive_parsed: Option> = None; + + for f in NAIVE_FORMATS { + naive_parsed = naive_parsed.or_else(|| + chrono::NaiveDateTime::parse_from_str(&s, f).map(|d| d.and_local_timezone(default_timezone).unwrap()).ok() + ) + } + + if let Some(ts) = naive_parsed { + *s = ts.to_rfc3339_opts(SecondsFormat::AutoSi, true); + } + } + + Value::Array(vec) => { + vec.iter_mut().for_each(|item| { + datetime_to_rfc3339(item, default_timezone) + }) + } + + Value::Object(map) => { + map.iter_mut().for_each(|(_k, v)| { + datetime_to_rfc3339(v, default_timezone) + }) + } + + _ => {} + } +} + +impl Iterator for DatetimeSanitizer { + type Item = ParseResult; + + fn next(&mut self) -> Option { + let next = self.from.next()?; + Some(match next { + Ok(mut val) => { + datetime_to_rfc3339(&mut val, self.default_timezone); + Ok(val) + } + Err(e) => { + Err(ParseError::Parse(Box::new(e))) + } + }) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DatetimeSanitizeError { + #[error("could not parse timezone as a valid IANA timezone")] + TimezoneParseError(String), +} + +pub fn sanitize_datetime(config: &ParseConfig, output: Output) -> Result { + let tz: Tz = config.default_timezone.parse().map_err(DatetimeSanitizeError::TimezoneParseError)?; + let sanitizer = DatetimeSanitizer { + from: output, + default_timezone: tz, + }; + + return Ok(Box::new(sanitizer)) +} diff --git a/crates/parser/src/format/sanitize/mod.rs b/crates/parser/src/format/sanitize/mod.rs new file mode 100644 index 0000000000..4002fa48dd --- /dev/null +++ b/crates/parser/src/format/sanitize/mod.rs @@ -0,0 +1,14 @@ +use crate::{ParseConfig, Output}; + +pub mod datetime; + +#[derive(Debug, thiserror::Error)] +pub enum SanitizeError { + #[error("sanitizing datetimes: {0}")] + DatetimeSanitizeError(#[from] datetime::DatetimeSanitizeError), +} + +pub fn sanitize_output(config: &ParseConfig, output: Output) -> Result { + datetime::sanitize_datetime(config, output).map_err(SanitizeError::DatetimeSanitizeError) +} + diff --git a/crates/parser/tests/examples/datetimes-naive.csv b/crates/parser/tests/examples/datetimes-naive.csv new file mode 100644 index 0000000000..f5406e8274 --- /dev/null +++ b/crates/parser/tests/examples/datetimes-naive.csv @@ -0,0 +1,2 @@ +"no_timezone", "no_timezone_fractional", "no_t", "no_t_fractional" +"2020-01-01T00:00:00","2020-01-01T00:00:00.000","2020-01-01 00:00:00","2020-01-01 00:00:00.000" diff --git a/crates/parser/tests/examples/datetimes-nested.json b/crates/parser/tests/examples/datetimes-nested.json new file mode 100644 index 0000000000..ac05035b7e --- /dev/null +++ b/crates/parser/tests/examples/datetimes-nested.json @@ -0,0 +1 @@ +{"x": ["2020-01-01 00:00:00"], "y": { "z": ["2020-01-01 00:00:00"], "k": "2020-01-01 00:00:00" } } diff --git a/crates/parser/tests/examples/datetimes.csv b/crates/parser/tests/examples/datetimes.csv new file mode 100644 index 0000000000..93a7fd4046 --- /dev/null +++ b/crates/parser/tests/examples/datetimes.csv @@ -0,0 +1,2 @@ +"no_timezone", "no_timezone_fractional", "rfc3339", "timezone_offset", "no_t", "no_t_fractional" +"2020-01-01T00:00:00","2020-01-01T00:00:00.000","2020-01-01T00:00:00Z","2020-01-01 00:00:00+00:00","2020-01-01 00:00:00","2020-01-01 00:00:00.000" diff --git a/crates/parser/tests/sanitize_test.rs b/crates/parser/tests/sanitize_test.rs new file mode 100644 index 0000000000..4550bf21e4 --- /dev/null +++ b/crates/parser/tests/sanitize_test.rs @@ -0,0 +1,62 @@ +mod testutil; + +use chrono::DateTime; + +use parser::ParseConfig; +use testutil::{input_for_file, run_test}; + +#[test] +fn sanitize_datetime_to_rfc3339() { + let path = "tests/examples/datetimes.csv"; + let cfg = ParseConfig { + filename: Some(path.to_string()), + ..Default::default() + }; + + let input = input_for_file(path); + let output = run_test(&cfg, input); + output.assert_success(1); + + let expected_first_row = DateTime::parse_from_rfc3339("2020-01-01T00:00:00Z").unwrap(); + for value in output.parsed[0].as_object().unwrap().values() { + assert_eq!(expected_first_row, DateTime::parse_from_rfc3339(value.as_str().unwrap()).unwrap()) + } +} + +#[test] +fn sanitize_datetime_to_rfc3339_iana_timezone() { + let path = "tests/examples/datetimes-naive.csv"; + let cfg = ParseConfig { + default_timezone: "America/New_York".to_string(), + filename: Some(path.to_string()), + ..Default::default() + }; + + let input = input_for_file(path); + let output = run_test(&cfg, input); + output.assert_success(1); + + let expected_first_row = DateTime::parse_from_rfc3339("2020-01-01T00:00:00-05:00").unwrap(); + for value in output.parsed[0].as_object().unwrap().values() { + assert_eq!(expected_first_row, DateTime::parse_from_rfc3339(value.as_str().unwrap()).unwrap()) + } +} + +#[test] +fn sanitize_datetime_to_rfc3339_nested() { + let path = "tests/examples/datetimes-nested.json"; + let cfg = ParseConfig { + filename: Some(path.to_string()), + ..Default::default() + }; + + let input = input_for_file(path); + let output = run_test(&cfg, input); + output.assert_success(1); + + let expected = "2020-01-01T00:00:00Z"; + let out = output.parsed[0].as_object().unwrap(); + assert_eq!(expected, out.get("x").unwrap().as_array().unwrap()[0].as_str().unwrap()); + assert_eq!(expected, out.get("y").unwrap().as_object().unwrap().get("z").unwrap().as_array().unwrap()[0].as_str().unwrap()); + assert_eq!(expected, out.get("y").unwrap().as_object().unwrap().get("k").unwrap().as_str().unwrap()); +} diff --git a/go/parser/config.go b/go/parser/config.go index 38a2d041a5..5395eba9b4 100644 --- a/go/parser/config.go +++ b/go/parser/config.go @@ -18,6 +18,7 @@ type Config struct { Compression string `json:"compression,omitempty"` ContentType string `json:"contentType,omitempty"` ContentEncoding string `json:"contentEncoding,omitempty"` + DefaultTimezone string `json:"defaultTimezone,omitempty"` } func (c *Config) Copy() Config { @@ -39,6 +40,7 @@ func (c *Config) Copy() Config { Compression: c.Compression, ContentType: c.ContentType, ContentEncoding: c.ContentEncoding, + DefaultTimezone: c.DefaultTimezone, } }