diff --git a/Cargo.lock b/Cargo.lock index ea7682e4..cece7d38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2372,6 +2372,7 @@ dependencies = [ "bb8-postgres", "chrono", "chronoutil", + "postgres-types", "rove", "thiserror", "tokio-postgres", diff --git a/rove_connector/Cargo.toml b/rove_connector/Cargo.toml index 3ef0bd29..e22187e9 100644 --- a/rove_connector/Cargo.toml +++ b/rove_connector/Cargo.toml @@ -9,6 +9,7 @@ bb8.workspace = true bb8-postgres.workspace = true chrono.workspace = true chronoutil.workspace = true +postgres-types.workspace = true rove.workspace = true thiserror.workspace = true tokio-postgres.workspace = true diff --git a/rove_connector/src/lib.rs b/rove_connector/src/lib.rs index 04307bb2..23f28377 100644 --- a/rove_connector/src/lib.rs +++ b/rove_connector/src/lib.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, TimeZone, Utc}; use chronoutil::RelativeDuration; use rove::data_switch::{self, DataCache, DataConnector, SpaceSpec, TimeSpec, Timeseries}; use thiserror::Error; -use tokio_postgres::NoTls; +use tokio_postgres::{types::FromSql, NoTls}; #[derive(Error, Debug)] #[non_exhaustive] @@ -20,26 +20,27 @@ pub struct Connector { pool: PgConnectionPool, } +#[derive(Debug, FromSql)] +struct Obs { + value: f32, + time: DateTime, +} + +// TODO: this should probably live somewhere else +#[derive(Debug, FromSql)] +#[postgres(name = "location")] +pub struct Location { + lat: Option, + lon: Option, + hamsl: Option, + _hag: Option, +} + fn extract_time_spec( time_spec: &TimeSpec, num_leading_points: u8, num_trailing_points: u8, -) -> Result<(DateTime, DateTime, &str), data_switch::Error> { - // TODO: matching intervals like this is a hack, but currently necessary to avoid - // SQL injection. Ideally we could pass an interval type as a query param, which would - // also save us the query_string allocation, but no ToSql implementations for intervals - // currently exist in tokio_postgres, so we need to implement it ourselves. - let interval = match time_spec.time_resolution { - x if x == RelativeDuration::minutes(1) => "1 minute", - x if x == RelativeDuration::hours(1) => "1 hour", - x if x == RelativeDuration::days(1) => "1 day", - _ => { - return Err(data_switch::Error::Other(Box::new( - Error::UnhandledTimeResolution(time_spec.time_resolution), - ))) - } - }; - +) -> Result<(DateTime, DateTime), data_switch::Error> { // TODO: should time_spec just use chrono timestamps instead of unix? // IIRC the reason for unix timestamps was easy compatibility with protobuf, but that's // less of a priority now @@ -50,7 +51,41 @@ fn extract_time_spec( let end_time = Utc.timestamp_opt(time_spec.timerange.start.0, 0).unwrap() + (time_spec.time_resolution * num_trailing_points.into()); - Ok((start_time, end_time, interval)) + Ok((start_time, end_time)) +} + +// TODO: does the input type match postgres-types? +fn regularize( + obses: Vec, + start_time: DateTime, + end_time: DateTime, + time_resolution: RelativeDuration, + expected_len: usize, +) -> Vec> { + let mut out = Vec::with_capacity(expected_len); + let mut curr_obs_time = start_time; + + for obs in obses { + while curr_obs_time < obs.time { + out.push(None); + curr_obs_time = curr_obs_time + time_resolution; + } + if curr_obs_time == obs.time { + out.push(Some(obs.value)); + curr_obs_time = curr_obs_time + time_resolution; + } else { + // In this case the observation is misaligned, so we should skip it. There's a case + // to be made for returning an error, but I think we ought to be more robust. + continue; + } + } + + while curr_obs_time <= end_time { + out.push(None); + curr_obs_time = curr_obs_time + time_resolution; + } + + out } impl Connector { @@ -61,9 +96,29 @@ impl Connector { num_leading_points: u8, num_trailing_points: u8, ) -> Result { - let (start_time, end_time, interval) = + // TODO: matching intervals like this is a hack, but currently necessary to avoid + // SQL injection. Ideally we could pass an interval type as a query param, which would + // also save us the query_string allocation, but no ToSql implementations for intervals + // currently exist in tokio_postgres, so we need to implement it ourselves. + let interval = match time_spec.time_resolution { + x if x == RelativeDuration::minutes(1) => "1 minute", + x if x == RelativeDuration::hours(1) => "1 hour", + x if x == RelativeDuration::days(1) => "1 day", + _ => { + return Err(data_switch::Error::Other(Box::new( + Error::UnhandledTimeResolution(time_spec.time_resolution), + ))) + } + }; + + let (start_time, end_time) = extract_time_spec(time_spec, num_leading_points, num_trailing_points)?; + // TODO: should this contain an ORDER BY? + // TODO: should we drop ts_rule.timestamp from the SELECT? we don't seem to use it + // TODO: should we make this like the fetch_all query and regularize outside the query? + // I think this query might perform badly because the join against the generated series + // doesn't use the index optimally. Doing this would also save us the "interval" mess let query_string = format!("SELECT data.obsvalue, ts_rule.timestamp \ FROM (SELECT data.obsvalue, data.obstime FROM data WHERE data.timeseries = $1) as data RIGHT JOIN generate_series($2::timestamptz, $3::timestamptz, interval '{}') AS ts_rule(timestamp) \ @@ -106,6 +161,98 @@ impl Connector { Ok(cache) } + + async fn fetch_all( + &self, + time_spec: &TimeSpec, + num_leading_points: u8, + num_trailing_points: u8, + ) -> Result { + let (start_time, end_time) = + extract_time_spec(time_spec, num_leading_points, num_trailing_points)?; + + let conn = self + .pool + .get() + .await + .map_err(|e| data_switch::Error::Other(Box::new(e)))?; + + let data_results = conn + .query( + " + SELECT timeseries.id, data.values, timeseries.loc \ + FROM ( \ + SELECT timeseries, ARRAY_AGG ((value, timestamp) ORDER BY timestamp ASC) as values \ + FROM data \ + WHERE obstime BETWEEN $1 AND $2 \ + GROUP BY timeseries \ + ) as data \ + JOIN timeseries \ + ON data.timeseries = timeseries.id \ + ", + &[&start_time, &end_time], + ) + .await + .map_err(|e| data_switch::Error::Other(Box::new(e)))?; + + let cache = { + let mut data = Vec::with_capacity(data_results.len()); + let mut lats = Vec::with_capacity(data_results.len()); + let mut lons = Vec::with_capacity(data_results.len()); + let mut elevs = Vec::with_capacity(data_results.len()); + + let ts_length = { + let mut ts_length = 0; + let mut curr_time = start_time; + while curr_time <= end_time { + ts_length += 1; + curr_time = curr_time + time_spec.time_resolution; + } + ts_length + }; + + for row in data_results { + let ts_id: i32 = row.get(0); + let raw_values: Vec = row.get(1); + let loc: Location = row.get(2); + + // TODO: is there a better way to handle this? If we insert with default latlon we + // risk corrupting spatial checks, if not we miss QCing data we probably should be + // QCing... Perhaps we can change the definition of DataCache to accommodate this + // better? + if loc.lat.is_none() || loc.lon.is_none() || loc.hamsl.is_none() { + continue; + } + + data.push(Timeseries { + tag: ts_id.to_string(), + values: regularize( + raw_values, + start_time, + end_time, + time_spec.time_resolution, + ts_length, + ), + }); + lats.push(loc.lat.unwrap()); + lons.push(loc.lon.unwrap()); + elevs.push(loc.hamsl.unwrap()); + } + + DataCache::new( + data, + lats, + lons, + elevs, + time_spec.timerange.start, + time_spec.time_resolution, + num_leading_points, + num_trailing_points, + ) + }; + + Ok(cache) + } } #[async_trait] @@ -130,9 +277,11 @@ impl DataConnector for Connector { ) .await } - // TODO: We should handle at least the All case, Polygon can be left unimplemented for - // now - _ => todo!(), + SpaceSpec::Polygon(_) => unimplemented!(), + SpaceSpec::All => { + self.fetch_all(time_spec, num_leading_points, num_trailing_points) + .await + } } } }