Skip to content

Commit

Permalink
ingestion: get time_resolution for qc from type_id
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Dec 13, 2024
1 parent e365309 commit 389f56a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
25 changes: 18 additions & 7 deletions ingestion/src/kldata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,26 @@ pub fn parse_kldata(

Ok((
header.message_id,
parse_obs(csv_body, &columns, reference_params, header)?, // ObsinnChunk {
// observations: parse_obs(csv_body, &columns, reference_params)?,
// station_id: header.station_id,
// type_id: header.type_id,
// timestamp:,
// },
parse_obs(csv_body, &columns, reference_params, header)?,
))
}

// TODO: this is a messy hack, but it's the only way people at met currently have to determine
// time_resolution. Ultimately we intend to store time_resolution info in the database under
// public.timeseries or labels.met. This will be populated by a combination of a script that looks
// at a timeseries's history, and manual editing by content managers.
pub fn type_id_to_time_resolution(type_id: i32) -> Option<RelativeDuration> {
// Source for these matches: PDF presented by PiM
match type_id {
514 => Some(RelativeDuration::minutes(1)),
506 | 509 | 510 => Some(RelativeDuration::minutes(10)),
7 | 311 | 330 | 342 | 501 | 502 | 503 | 505 | 507 | 511 => Some(RelativeDuration::hours(1)),
522 => Some(RelativeDuration::days(1)),
399 => Some(RelativeDuration::years(1)),
_ => None,
}
}

// TODO: rewrite such that queries can be pipelined?
// not pipelining here hurts latency, but shouldn't matter for throughput
pub async fn filter_and_label_kldata<'a>(
Expand Down Expand Up @@ -378,7 +389,7 @@ pub async fn filter_and_label_kldata<'a>(
out_chunks.push(DataChunk {
timestamp: chunk.timestamp,
// TODO: real time_resolution (derive from type_id for now)
time_resolution: RelativeDuration::hours(1),
time_resolution: type_id_to_time_resolution(chunk.type_id),
data,
});
}
Expand Down
15 changes: 9 additions & 6 deletions ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub struct Datum<'a> {
/// Generic container for a piece of data ready to be inserted into the DB
pub struct DataChunk<'a> {
timestamp: DateTime<Utc>,
time_resolution: chronoutil::RelativeDuration,
time_resolution: Option<chronoutil::RelativeDuration>,
data: Vec<Datum<'a>>,
}

Expand Down Expand Up @@ -235,13 +235,16 @@ pub async fn qc_data(

let mut qc_results: Vec<QcResult> = Vec::new();
for chunk in chunks {
let time_resolution = match chunk.time_resolution {
Some(time_resolution) => time_resolution,
// if there's no time_resolution, we can't QC
None => continue,
};
let timestamp = chunk.timestamp.timestamp();

for datum in chunk.data.iter() {
let time_spec = TimeSpec::new(
Timestamp(timestamp),
Timestamp(timestamp),
chunk.time_resolution,
);
let time_spec =
TimeSpec::new(Timestamp(timestamp), Timestamp(timestamp), time_resolution);
let space_spec = SpaceSpec::One(datum.timeseries_id.to_string());
// TODO: load and fetch real pipeline
let pipeline = "sample_pipeline";
Expand Down

0 comments on commit 389f56a

Please sign in to comment.