Skip to content

Commit

Permalink
ingestion: implement qc_data
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Dec 12, 2024
1 parent 9d00a5a commit ed70cc7
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 11 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ quick-xml = { version = "0.35.0", features = [ "serialize", "overlapped-lists" ]
rand = "0.8.5"
rand_distr = "0.4.3"
regex = "1.11.1"
rove = { git = "https://github.com/metno/rove.git" }
rove = { git = "https://github.com/metno/rove.git", branch = "lard_fixes" }
# rove = { git = "https://github.com/metno/rove.git" }
serde = { version = "1.0.215", features = ["derive"] }
test-case = "3.3.1"
thiserror = "1.0.69"
Expand Down
2 changes: 2 additions & 0 deletions db/flags.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ CREATE TABLE IF NOT EXISTS flags.confident (
CREATE INDEX IF NOT EXISTS confident_timestamp_index ON flags.confident (obstime);
CREATE INDEX IF NOT EXISTS confident_timeseries_index ON flags.confident USING HASH (timeseries);

-- TODO: should this also have a column for qc_time or some such?
CREATE TABLE IF NOT EXISTS flags.confident_provenance (
timeseries INT4 NOT NULL,
obstime TIMESTAMPTZ NOT NULL,
pipeline TEXT NOT NULL,
-- TODO: should this be an enum?
flag INT4 NOT NULL,
-- TODO: better name? since this might be applied to flags that aren't fail but also aren't pass?
fail_condition TEXT NULL,
CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline UNIQUE (timeseries, obstime, pipeline),
CONSTRAINT fk_confident_providence_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries
Expand Down
1 change: 1 addition & 0 deletions ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bb8.workspace = true
bb8-postgres.workspace = true
bytes.workspace = true
chrono.workspace = true
chronoutil.workspace = true
csv.workspace = true
futures.workspace = true
kafka.workspace = true
Expand Down
135 changes: 127 additions & 8 deletions ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use axum::{
use bb8::PooledConnection;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, Utc};
use chronoutil::RelativeDuration;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use rove::data_switch::{SpaceSpec, TimeSpec, Timestamp};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
Expand All @@ -30,6 +32,8 @@ pub enum Error {
Pool(#[from] bb8::RunError<tokio_postgres::Error>),
#[error("parse error: {0}")]
Parse(String),
#[error("qc system returned an error: {0}")]
Qc(#[from] rove::scheduler::Error),
#[error("RwLock was poisoned: {0}")]
Lock(String),
#[error("Could not read environment variable: {0}")]
Expand Down Expand Up @@ -73,7 +77,7 @@ struct IngestorState {
db_pool: PgConnectionPool,
param_conversions: ParamConversions, // converts param codes to element ids
permit_tables: Arc<RwLock<(ParamPermitTable, StationPermitTable)>>,
qc_scheduler: Arc<rove::Scheduler<'static>>,
qc_scheduler: Arc<rove::Scheduler>,
}

impl FromRef<IngestorState> for PgConnectionPool {
Expand All @@ -94,8 +98,8 @@ impl FromRef<IngestorState> for Arc<RwLock<(ParamPermitTable, StationPermitTable
}
}

impl FromRef<IngestorState> for Arc<rove::Scheduler<'static>> {
fn from_ref(state: &IngestorState) -> Arc<rove::Scheduler<'static>> {
impl FromRef<IngestorState> for Arc<rove::Scheduler> {
fn from_ref(state: &IngestorState) -> Arc<rove::Scheduler> {
state.qc_scheduler.clone()
}
}
Expand All @@ -118,6 +122,16 @@ pub struct Datum<'a> {

pub type Data<'a> = Vec<Datum<'a>>;

pub struct QcResult {
timeseries_id: i32,
timestamp: DateTime<Utc>,
// TODO: possible to avoid heap-allocating this?
pipeline: String,
// TODO: correct type?
flag: i32,
fail_condition: Option<String>,
}

// TODO: benchmark insertion of scalar and non-scalar together vs separately?
pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> {
// TODO: the conflict resolution on this query is an imperfect solution, and needs improvement
Expand Down Expand Up @@ -179,8 +193,113 @@ pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result
Ok(())
}

pub async fn qc_data(data: &Data<'_>, scheduler: &rove::Scheduler<'static>) -> Result<(), Error> {
todo!()
pub async fn qc_data(
data: &Data<'_>,
scheduler: &rove::Scheduler,
conn: &mut PooledPgConn<'_>,
) -> Result<(), Error> {
// TODO: see conflict resolution issues on queries in `insert_data`
// On periodic or consistency QC pipelines, we should be checking the provenance table to
// decide how to update usable on a conflict, but here it should be fine not to since this is
// fresh data.
// The `AND` in the `DO UPDATE SET` subexpression better handles the case of resent data where
// periodic checks might already have been run by defaulting to false. If the existing data was
// only fresh checked, and the replacement is different, this could result in a false positive.
// I think this is OK though since it should be a rare occurence and will be quickly cleared up
// by a periodic run regardless.
let query = conn
.prepare(
"INSERT INTO flags.confident (timeseries, obstime, usable) \
VALUES ($1, $2, $3) \
ON CONFLICT ON CONSTRAINT unique_confident_timeseries_obstime \
DO UPDATE SET usable = usable AND EXCLUDED.usable",
)
.await?;
let query_provenance = conn
.prepare(
"INSERT INTO flags.confident_provenance (timeseries, obstime, pipeline, flag, fail_condition) \
VALUES ($1, $2, $3, $4, $5) \
ON CONFLICT ON CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline \
DO UPDATE SET flag = EXCLUDED.flag, fail_condition = EXCLUDED.fail_condition",
)
.await?;

let mut qc_results: Vec<QcResult> = Vec::with_capacity(data.len());
for datum in data {
let time_spec = TimeSpec::new(
Timestamp(datum.timestamp.timestamp()),
Timestamp(datum.timestamp.timestamp()),
// TODO: real time resolution here. For now derive from type_id?
RelativeDuration::hours(1),
);
let space_spec = SpaceSpec::One(datum.timeseries_id.to_string());
// TODO: load and fetch real pipeline
let pipeline = "sample_pipeline";
let rove_output = scheduler
.validate_direct(
"lard",
&[] as &[&str],
&time_spec,
&space_spec,
pipeline,
None,
)
.await?;

let first_fail = rove_output.iter().find(|check_result| {
if let Some(result) = check_result.results.first() {
if let Some(flag) = result.values.first() {
return *flag == rove::Flag::Fail;
}
}
false
});

let (flag, fail_condition) = match first_fail {
Some(check_result) => (1, Some(check_result.check.clone())),
None => (0, None),
};

qc_results.push(QcResult {
timeseries_id: datum.timeseries_id,
timestamp: datum.timestamp,
pipeline: pipeline.to_string(),
flag,
fail_condition,
});
}

let mut futures = qc_results
.iter()
.map(|qc_result| async {
conn.execute(
&query,
&[
&qc_result.timeseries_id,
&qc_result.timestamp,
&(qc_result.flag == 0),
],
)
.await?;
conn.execute(
&query_provenance,
&[
&qc_result.timeseries_id,
&qc_result.timestamp,
&qc_result.pipeline,
&qc_result.flag,
&qc_result.fail_condition,
],
)
.await
})
.collect::<FuturesUnordered<_>>();

while let Some(res) = futures.next().await {
res?;
}

Ok(())
}

pub mod kldata;
Expand All @@ -206,7 +325,7 @@ async fn handle_kldata(
State(pool): State<PgConnectionPool>,
State(param_conversions): State<ParamConversions>,
State(permit_table): State<Arc<RwLock<(ParamPermitTable, StationPermitTable)>>>,
State(qc_scheduler): State<Arc<rove::Scheduler<'static>>>,
State(qc_scheduler): State<Arc<rove::Scheduler>>,
body: String,
) -> Json<KldataResp> {
let result: Result<usize, Error> = async {
Expand All @@ -220,7 +339,7 @@ async fn handle_kldata(

insert_data(&data, &mut conn).await?;

qc_data(&data, &qc_scheduler).await?;
qc_data(&data, &qc_scheduler, &mut conn).await?;

Ok(message_id)
}
Expand Down Expand Up @@ -271,7 +390,7 @@ pub async fn run(
db_pool: PgConnectionPool,
param_conversion_path: &str,
permit_tables: Arc<RwLock<(ParamPermitTable, StationPermitTable)>>,
qc_scheduler: rove::Scheduler<'static>,
qc_scheduler: rove::Scheduler,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// set up param conversion map
let param_conversions = get_conversions(param_conversion_path)?;
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let scheduler = rove::Scheduler::new(
load_pipelines("").unwrap(),
DataSwitch::new(HashMap::from([(
"lard",
String::from("lard"),
Box::new(Connector {
pool: db_pool.clone(),
}) as Box<dyn DataConnector + Send>,
Expand Down

0 comments on commit ed70cc7

Please sign in to comment.