Skip to content

Commit

Permalink
control-plane: track effective inferred schema md5
Browse files Browse the repository at this point in the history
Updates the agent to store the md5 sum of each inferred schema that was used
during the publication. The md5 is added to the new `inferred_schema_md5`
column on `live_specs`, which is used to determine whether a given collection
spec should be pruned during auto-discovers.

The md5 hashes are pulled from the control plane database as part of resolving
inferred schemas, threaded through the validation process, and output as part
of the `BuiltCollections` (in-memory) table.

The end result of this is that auto-discovers will now prune collection specs
from the draft if both the draft spec and the inferred schema are unchanged.
But a change to only the inferred schema will be enough to trigger a publication
as a result of auto-discover.
  • Loading branch information
psFried committed Oct 11, 2023
1 parent 41c6f26 commit f85c049
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 24 deletions.
23 changes: 22 additions & 1 deletion crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,25 @@ pub async fn insert_new_live_specs(
Ok(rows.rows_affected())
}

pub async fn add_inferred_schema_md5(
live_spec_id: Id,
inferred_schema_md5: Option<String>,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
update live_specs set inferred_schema_md5 = $1
where id = $2 and spec_type = 'collection'
returning 1 as "must_exist"
"#,
inferred_schema_md5 as Option<String>,
live_spec_id as Id,
)
.fetch_one(txn)
.await?;
Ok(())
}

pub async fn add_built_specs<S>(
live_spec_id: Id,
built_spec: S,
Expand Down Expand Up @@ -644,6 +663,7 @@ pub async fn resolve_collections(
pub struct InferredSchemaRow {
pub collection_name: String,
pub schema: Json<Box<RawValue>>,
pub md5: String,
}

pub async fn get_inferred_schemas(
Expand All @@ -654,7 +674,8 @@ pub async fn get_inferred_schemas(
InferredSchemaRow,
r#"select
collection_name,
schema as "schema!: Json<Box<RawValue>>"
schema as "schema!: Json<Box<RawValue>>",
md5 as "md5!: String"
from inferred_schemas
where collection_name = ANY($1::text[])
"#,
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ impl PublishHandler {
}

// Add built specs to the live spec when publishing a build.
specs::add_built_specs_to_live_specs(&spec_rows, &pruned_collections, &build_output, txn)
specs::add_build_output_to_live_specs(&spec_rows, &pruned_collections, &build_output, txn)
.await
.context("adding built specs to live specs")?;

Expand Down
16 changes: 12 additions & 4 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,12 @@ pub async fn apply_updates_for_row(
Ok(())
}

// add_built_specs_to_live_specs adds the built spec to the live_specs row for all tasks included in
// build_output if they are in the list of specifications which are changing in this publication per
// the list of spec_rows.
pub async fn add_built_specs_to_live_specs(
/// adds the built spec to the live_specs row for all tasks included
/// in build_output if they are in the list of specifications which are
/// changing in this publication per the list of spec_rows. Also sets the
/// `inferred_schema_md5` for collections, which tracks the hash that was used
/// during the build.
pub async fn add_build_output_to_live_specs(
spec_rows: &[SpecRow],
pruned_collections: &HashSet<String>,
build_output: &builds::BuildOutput,
Expand All @@ -486,6 +488,12 @@ pub async fn add_built_specs_to_live_specs(
{
agent_sql::publications::add_built_specs(row.live_spec_id, &collection.spec, txn)
.await?;
agent_sql::publications::add_inferred_schema_md5(
row.live_spec_id,
collection.inferred_schema_md5.clone(),
txn,
)
.await?;
}
}

Expand Down
7 changes: 5 additions & 2 deletions crates/agent/src/publications/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl validation::ControlPlane for ControlPlane {
collections: Vec<models::Collection>,
) -> futures::future::BoxFuture<
'a,
anyhow::Result<std::collections::BTreeMap<models::Collection, models::Schema>>,
anyhow::Result<std::collections::BTreeMap<models::Collection, validation::InferredSchema>>,
> {
let Some(pool) = self.pool.clone() else {
return validation::NoOpControlPlane.get_inferred_schemas(collections)
Expand All @@ -52,7 +52,10 @@ impl validation::ControlPlane for ControlPlane {
.map(|row| {
(
models::Collection::new(row.collection_name),
models::Schema::new(row.schema.0.into()),
validation::InferredSchema {
schema: models::Schema::new(row.schema.0.into()),
md5: row.md5,
},
)
})
.collect()
Expand Down
14 changes: 10 additions & 4 deletions crates/flowctl/src/local_specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,15 @@ impl validation::ControlPlane for Resolver {
fn get_inferred_schemas<'a>(
&'a self,
collections: Vec<models::Collection>,
) -> BoxFuture<'a, anyhow::Result<std::collections::BTreeMap<models::Collection, models::Schema>>>
{
) -> BoxFuture<
'a,
anyhow::Result<std::collections::BTreeMap<models::Collection, validation::InferredSchema>>,
> {
#[derive(serde::Deserialize, Clone)]
struct Row {
pub collection_name: models::Collection,
pub schema: models::Schema,
pub md5: String,
}

let rows = collections
Expand All @@ -272,7 +275,7 @@ impl validation::ControlPlane for Resolver {
let builder = self
.client
.from("inferred_schemas")
.select("collection_name,schema")
.select("collection_name,schema,md5")
.in_("collection_name", names);

async move { crate::api_exec::<Vec<Row>>(builder).await }
Expand All @@ -290,7 +293,10 @@ impl validation::ControlPlane for Resolver {
|Row {
collection_name,
schema,
}| (collection_name, schema),
md5,
}| {
(collection_name, validation::InferredSchema { schema, md5 })
},
)
})
.flatten()
Expand Down
6 changes: 6 additions & 0 deletions crates/tables/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ tables!(
validated: Option<proto_flow::derive::response::Validated>,
// Built specification for this collection.
spec: proto_flow::flow::CollectionSpec,
// The md5 sum of the inferred schema at the time that this collection
// was built. Note that this may be present even if the collection does
// not actually use the inferred schema. And it may also be missing,
// even if the collection _does_ use schema inference, for "remote"
// collections that were resolve dynamically during the build.
inferred_schema_md5: Option<String>,
}

table BuiltMaterializations (row BuiltMaterialization, order_by [materialization], sql "built_materializations") {
Expand Down
24 changes: 19 additions & 5 deletions crates/validation/src/collection.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::{indexed, schema, storage_mapping, Error, Scope};
use super::{indexed, schema, storage_mapping, Error, InferredSchema, Scope};
use json::schema::types;
use proto_flow::flow;
use std::collections::BTreeMap;

pub fn walk_all_collections(
build_id: &str,
collections: &[tables::Collection],
inferred_schemas: &BTreeMap<models::Collection, models::Schema>,
inferred_schemas: &BTreeMap<models::Collection, InferredSchema>,
storage_mappings: &[tables::StorageMapping],
errors: &mut tables::Errors,
) -> tables::BuiltCollections {
Expand All @@ -20,7 +20,16 @@ pub fn walk_all_collections(
storage_mappings,
errors,
) {
built_collections.insert_row(&collection.scope, &collection.collection, None, spec);
let inferred_schema_md5 = inferred_schemas
.get(&collection.collection)
.map(|s| s.md5.clone());
built_collections.insert_row(
&collection.scope,
&collection.collection,
None,
spec,
inferred_schema_md5,
);
}
}
built_collections
Expand All @@ -29,7 +38,7 @@ pub fn walk_all_collections(
fn walk_collection(
build_id: &str,
collection: &tables::Collection,
inferred_schemas: &BTreeMap<models::Collection, models::Schema>,
inferred_schemas: &BTreeMap<models::Collection, InferredSchema>,
storage_mappings: &[tables::StorageMapping],
errors: &mut tables::Errors,
) -> Option<flow::CollectionSpec> {
Expand Down Expand Up @@ -64,6 +73,9 @@ fn walk_collection(
.push(scope.push_prop("key"), errors);
}

let inferred_schema = inferred_schemas.get(&collection.collection);
tracing::debug!(collection = %collection.collection, inferred_schema_md5 = ?inferred_schema.map(|s| s.md5.as_str()), "does collection have an inferred schema");

let (write_schema, write_bundle, read_schema_bundle) = match (schema, write_schema, read_schema)
{
// One schema used for both writes and reads.
Expand All @@ -82,7 +94,9 @@ fn walk_collection(
let read_bundle = models::Schema::extend_read_bundle(
read_bundle,
write_bundle,
inferred_schemas.get(&collection.collection),
inferred_schemas
.get(&collection.collection)
.map(|v| &v.schema),
);

let read_schema =
Expand Down
2 changes: 2 additions & 0 deletions crates/validation/src/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub async fn walk_all_derivations(
collection: this_collection,
validated: _,
spec: flow::CollectionSpec { name, .. },
inferred_schema_md5: _,
} = &built_collections[built_index];
let scope = Scope::new(scope);

Expand Down Expand Up @@ -277,6 +278,7 @@ fn walk_derive_request<'a>(
collection: _,
validated: _,
spec,
inferred_schema_md5: _,
} = &built_collections[built_index];

let scope = Scope::new(scope);
Expand Down
14 changes: 13 additions & 1 deletion crates/validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ pub trait Connectors: Send + Sync {
) -> BoxFuture<'a, anyhow::Result<proto_flow::materialize::Response>>;
}

pub struct InferredSchema {
/// The md5 sum of the inferred schema that was calculated by the control
/// plane database. We always use the value that was calculated by the
/// database in order to avoid spurious hash changes due to inconsequential
/// differences in JSON encoding.
pub md5: String,
pub schema: models::Schema,
}

pub trait ControlPlane: Send + Sync {
// Resolve a set of collection names into pre-built CollectionSpecs from
// the control plane. Resolution may be fuzzy: if there is a spec that's
Expand All @@ -54,7 +63,7 @@ pub trait ControlPlane: Send + Sync {
fn get_inferred_schemas<'a>(
&'a self,
collections: Vec<models::Collection>,
) -> BoxFuture<'a, anyhow::Result<BTreeMap<models::Collection, models::Schema>>>;
) -> BoxFuture<'a, anyhow::Result<BTreeMap<models::Collection, InferredSchema>>>;
}

pub async fn validate(
Expand Down Expand Up @@ -123,6 +132,9 @@ pub async fn validate(
scope: url::Url::parse("flow://control-plane").unwrap(),
spec,
validated: None,
// Note that we don't currently fetch the infeered schema md5 for remote collections,
// so they won't appear in the build ouptut for these collections.
inferred_schema_md5: None,
}
})
.collect::<tables::BuiltCollections>();
Expand Down
4 changes: 2 additions & 2 deletions crates/validation/src/noop.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Connectors, ControlPlane};
use super::{Connectors, ControlPlane, InferredSchema};
use futures::future::{BoxFuture, FutureExt};
use proto_flow::{capture, derive, flow, materialize};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -113,7 +113,7 @@ impl ControlPlane for NoOpControlPlane {
fn get_inferred_schemas<'a>(
&'a self,
_collections: Vec<models::Collection>,
) -> BoxFuture<'a, anyhow::Result<BTreeMap<models::Collection, models::Schema>>> {
) -> BoxFuture<'a, anyhow::Result<BTreeMap<models::Collection, InferredSchema>>> {
async move { Ok(BTreeMap::new()) }.boxed()
}
}
15 changes: 11 additions & 4 deletions crates/validation/tests/scenario_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1759,13 +1759,20 @@ impl validation::ControlPlane for MockDriverCalls {
fn get_inferred_schemas<'a>(
&'a self,
collections: Vec<models::Collection>,
) -> BoxFuture<'a, anyhow::Result<BTreeMap<models::Collection, models::Schema>>> {
) -> BoxFuture<'a, anyhow::Result<BTreeMap<models::Collection, validation::InferredSchema>>>
{
let out = collections
.iter()
.filter_map(|collection| {
self.inferred_schemas
.get(collection)
.map(|schema| (collection.clone(), schema.clone()))
self.inferred_schemas.get(collection).map(|schema| {
(
collection.clone(),
validation::InferredSchema {
schema: schema.clone(),
md5: String::from("mock md5"),
},
)
})
})
.collect();

Expand Down

0 comments on commit f85c049

Please sign in to comment.