From f85c049202e94212a215c35405c23515afa187bc Mon Sep 17 00:00:00 2001 From: Phil Date: Wed, 11 Oct 2023 12:37:10 -0400 Subject: [PATCH] control-plane: track effective inferred schema md5 Updates the agent to store the md5 sum of each inferred schema that was used during the publication. The md5 is added to the new `inferred_schema_md5` column on `live_specs`, which is used to determine whether a given collection spec should be pruned during auto-discovers. The md5 hashes are pulled from the control plane database as part of resolving inferred schemas, threaded through the validation process, and output as part of the `BuiltCollections` (in-memory) table. The end result of this is that auto-discovers will now prune collection specs from the draft if both the draft spec and the inferred schema are unchanged. But a change to only the inferred schema will be enough to trigger a publication as a result of auto-discover. --- crates/agent-sql/src/publications.rs | 23 +++++++++++++++++++- crates/agent/src/publications.rs | 2 +- crates/agent/src/publications/specs.rs | 16 ++++++++++---- crates/agent/src/publications/validation.rs | 7 ++++-- crates/flowctl/src/local_specs.rs | 14 ++++++++---- crates/tables/src/lib.rs | 6 ++++++ crates/validation/src/collection.rs | 24 ++++++++++++++++----- crates/validation/src/derivation.rs | 2 ++ crates/validation/src/lib.rs | 14 +++++++++++- crates/validation/src/noop.rs | 4 ++-- crates/validation/tests/scenario_tests.rs | 15 +++++++++---- 11 files changed, 103 insertions(+), 24 deletions(-) diff --git a/crates/agent-sql/src/publications.rs b/crates/agent-sql/src/publications.rs index e917b5f955..57059e1896 100644 --- a/crates/agent-sql/src/publications.rs +++ b/crates/agent-sql/src/publications.rs @@ -152,6 +152,25 @@ pub async fn insert_new_live_specs( Ok(rows.rows_affected()) } +pub async fn add_inferred_schema_md5( + live_spec_id: Id, + inferred_schema_md5: Option, + txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> sqlx::Result<()> { + sqlx::query!( + r#" + update live_specs set inferred_schema_md5 = $1 + where id = $2 and spec_type = 'collection' + returning 1 as "must_exist" + "#, + inferred_schema_md5 as Option, + live_spec_id as Id, + ) + .fetch_one(txn) + .await?; + Ok(()) +} + pub async fn add_built_specs( live_spec_id: Id, built_spec: S, @@ -644,6 +663,7 @@ pub async fn resolve_collections( pub struct InferredSchemaRow { pub collection_name: String, pub schema: Json>, + pub md5: String, } pub async fn get_inferred_schemas( @@ -654,7 +674,8 @@ pub async fn get_inferred_schemas( InferredSchemaRow, r#"select collection_name, - schema as "schema!: Json>" + schema as "schema!: Json>", + md5 as "md5!: String" from inferred_schemas where collection_name = ANY($1::text[]) "#, diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index 8a1571a8ae..e5063cd074 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -397,7 +397,7 @@ impl PublishHandler { } // Add built specs to the live spec when publishing a build. - specs::add_built_specs_to_live_specs(&spec_rows, &pruned_collections, &build_output, txn) + specs::add_build_output_to_live_specs(&spec_rows, &pruned_collections, &build_output, txn) .await .context("adding built specs to live specs")?; diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 8c76edf1de..604f36e99b 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -467,10 +467,12 @@ pub async fn apply_updates_for_row( Ok(()) } -// add_built_specs_to_live_specs adds the built spec to the live_specs row for all tasks included in -// build_output if they are in the list of specifications which are changing in this publication per -// the list of spec_rows. -pub async fn add_built_specs_to_live_specs( +/// adds the built spec to the live_specs row for all tasks included +/// in build_output if they are in the list of specifications which are +/// changing in this publication per the list of spec_rows. Also sets the +/// `inferred_schema_md5` for collections, which tracks the hash that was used +/// during the build. +pub async fn add_build_output_to_live_specs( spec_rows: &[SpecRow], pruned_collections: &HashSet, build_output: &builds::BuildOutput, @@ -486,6 +488,12 @@ pub async fn add_built_specs_to_live_specs( { agent_sql::publications::add_built_specs(row.live_spec_id, &collection.spec, txn) .await?; + agent_sql::publications::add_inferred_schema_md5( + row.live_spec_id, + collection.inferred_schema_md5.clone(), + txn, + ) + .await?; } } diff --git a/crates/agent/src/publications/validation.rs b/crates/agent/src/publications/validation.rs index 732b73e9c2..6f1e908223 100644 --- a/crates/agent/src/publications/validation.rs +++ b/crates/agent/src/publications/validation.rs @@ -38,7 +38,7 @@ impl validation::ControlPlane for ControlPlane { collections: Vec, ) -> futures::future::BoxFuture< 'a, - anyhow::Result>, + anyhow::Result>, > { let Some(pool) = self.pool.clone() else { return validation::NoOpControlPlane.get_inferred_schemas(collections) @@ -52,7 +52,10 @@ impl validation::ControlPlane for ControlPlane { .map(|row| { ( models::Collection::new(row.collection_name), - models::Schema::new(row.schema.0.into()), + validation::InferredSchema { + schema: models::Schema::new(row.schema.0.into()), + md5: row.md5, + }, ) }) .collect() diff --git a/crates/flowctl/src/local_specs.rs b/crates/flowctl/src/local_specs.rs index e6b7f6585a..d2fd4e242d 100644 --- a/crates/flowctl/src/local_specs.rs +++ b/crates/flowctl/src/local_specs.rs @@ -256,12 +256,15 @@ impl validation::ControlPlane for Resolver { fn get_inferred_schemas<'a>( &'a self, collections: Vec, - ) -> BoxFuture<'a, anyhow::Result>> - { + ) -> BoxFuture< + 'a, + anyhow::Result>, + > { #[derive(serde::Deserialize, Clone)] struct Row { pub collection_name: models::Collection, pub schema: models::Schema, + pub md5: String, } let rows = collections @@ -272,7 +275,7 @@ impl validation::ControlPlane for Resolver { let builder = self .client .from("inferred_schemas") - .select("collection_name,schema") + .select("collection_name,schema,md5") .in_("collection_name", names); async move { crate::api_exec::>(builder).await } @@ -290,7 +293,10 @@ impl validation::ControlPlane for Resolver { |Row { collection_name, schema, - }| (collection_name, schema), + md5, + }| { + (collection_name, validation::InferredSchema { schema, md5 }) + }, ) }) .flatten() diff --git a/crates/tables/src/lib.rs b/crates/tables/src/lib.rs index fa390787a1..d0d1095e44 100644 --- a/crates/tables/src/lib.rs +++ b/crates/tables/src/lib.rs @@ -93,6 +93,12 @@ tables!( validated: Option, // Built specification for this collection. spec: proto_flow::flow::CollectionSpec, + // The md5 sum of the inferred schema at the time that this collection + // was built. Note that this may be present even if the collection does + // not actually use the inferred schema. And it may also be missing, + // even if the collection _does_ use schema inference, for "remote" + // collections that were resolve dynamically during the build. + inferred_schema_md5: Option, } table BuiltMaterializations (row BuiltMaterialization, order_by [materialization], sql "built_materializations") { diff --git a/crates/validation/src/collection.rs b/crates/validation/src/collection.rs index 540a95a101..0d21e372cf 100644 --- a/crates/validation/src/collection.rs +++ b/crates/validation/src/collection.rs @@ -1,4 +1,4 @@ -use super::{indexed, schema, storage_mapping, Error, Scope}; +use super::{indexed, schema, storage_mapping, Error, InferredSchema, Scope}; use json::schema::types; use proto_flow::flow; use std::collections::BTreeMap; @@ -6,7 +6,7 @@ use std::collections::BTreeMap; pub fn walk_all_collections( build_id: &str, collections: &[tables::Collection], - inferred_schemas: &BTreeMap, + inferred_schemas: &BTreeMap, storage_mappings: &[tables::StorageMapping], errors: &mut tables::Errors, ) -> tables::BuiltCollections { @@ -20,7 +20,16 @@ pub fn walk_all_collections( storage_mappings, errors, ) { - built_collections.insert_row(&collection.scope, &collection.collection, None, spec); + let inferred_schema_md5 = inferred_schemas + .get(&collection.collection) + .map(|s| s.md5.clone()); + built_collections.insert_row( + &collection.scope, + &collection.collection, + None, + spec, + inferred_schema_md5, + ); } } built_collections @@ -29,7 +38,7 @@ pub fn walk_all_collections( fn walk_collection( build_id: &str, collection: &tables::Collection, - inferred_schemas: &BTreeMap, + inferred_schemas: &BTreeMap, storage_mappings: &[tables::StorageMapping], errors: &mut tables::Errors, ) -> Option { @@ -64,6 +73,9 @@ fn walk_collection( .push(scope.push_prop("key"), errors); } + let inferred_schema = inferred_schemas.get(&collection.collection); + tracing::debug!(collection = %collection.collection, inferred_schema_md5 = ?inferred_schema.map(|s| s.md5.as_str()), "does collection have an inferred schema"); + let (write_schema, write_bundle, read_schema_bundle) = match (schema, write_schema, read_schema) { // One schema used for both writes and reads. @@ -82,7 +94,9 @@ fn walk_collection( let read_bundle = models::Schema::extend_read_bundle( read_bundle, write_bundle, - inferred_schemas.get(&collection.collection), + inferred_schemas + .get(&collection.collection) + .map(|v| &v.schema), ); let read_schema = diff --git a/crates/validation/src/derivation.rs b/crates/validation/src/derivation.rs index caf0fdb104..e3fd2f882f 100644 --- a/crates/validation/src/derivation.rs +++ b/crates/validation/src/derivation.rs @@ -92,6 +92,7 @@ pub async fn walk_all_derivations( collection: this_collection, validated: _, spec: flow::CollectionSpec { name, .. }, + inferred_schema_md5: _, } = &built_collections[built_index]; let scope = Scope::new(scope); @@ -277,6 +278,7 @@ fn walk_derive_request<'a>( collection: _, validated: _, spec, + inferred_schema_md5: _, } = &built_collections[built_index]; let scope = Scope::new(scope); diff --git a/crates/validation/src/lib.rs b/crates/validation/src/lib.rs index 43ade15315..f9b8abb724 100644 --- a/crates/validation/src/lib.rs +++ b/crates/validation/src/lib.rs @@ -38,6 +38,15 @@ pub trait Connectors: Send + Sync { ) -> BoxFuture<'a, anyhow::Result>; } +pub struct InferredSchema { + /// The md5 sum of the inferred schema that was calculated by the control + /// plane database. We always use the value that was calculated by the + /// database in order to avoid spurious hash changes due to inconsequential + /// differences in JSON encoding. + pub md5: String, + pub schema: models::Schema, +} + pub trait ControlPlane: Send + Sync { // Resolve a set of collection names into pre-built CollectionSpecs from // the control plane. Resolution may be fuzzy: if there is a spec that's @@ -54,7 +63,7 @@ pub trait ControlPlane: Send + Sync { fn get_inferred_schemas<'a>( &'a self, collections: Vec, - ) -> BoxFuture<'a, anyhow::Result>>; + ) -> BoxFuture<'a, anyhow::Result>>; } pub async fn validate( @@ -123,6 +132,9 @@ pub async fn validate( scope: url::Url::parse("flow://control-plane").unwrap(), spec, validated: None, + // Note that we don't currently fetch the infeered schema md5 for remote collections, + // so they won't appear in the build ouptut for these collections. + inferred_schema_md5: None, } }) .collect::(); diff --git a/crates/validation/src/noop.rs b/crates/validation/src/noop.rs index 7c38177576..5048575a4c 100644 --- a/crates/validation/src/noop.rs +++ b/crates/validation/src/noop.rs @@ -1,4 +1,4 @@ -use super::{Connectors, ControlPlane}; +use super::{Connectors, ControlPlane, InferredSchema}; use futures::future::{BoxFuture, FutureExt}; use proto_flow::{capture, derive, flow, materialize}; use std::collections::BTreeMap; @@ -113,7 +113,7 @@ impl ControlPlane for NoOpControlPlane { fn get_inferred_schemas<'a>( &'a self, _collections: Vec, - ) -> BoxFuture<'a, anyhow::Result>> { + ) -> BoxFuture<'a, anyhow::Result>> { async move { Ok(BTreeMap::new()) }.boxed() } } diff --git a/crates/validation/tests/scenario_tests.rs b/crates/validation/tests/scenario_tests.rs index a15ccba4cc..16198dea12 100644 --- a/crates/validation/tests/scenario_tests.rs +++ b/crates/validation/tests/scenario_tests.rs @@ -1759,13 +1759,20 @@ impl validation::ControlPlane for MockDriverCalls { fn get_inferred_schemas<'a>( &'a self, collections: Vec, - ) -> BoxFuture<'a, anyhow::Result>> { + ) -> BoxFuture<'a, anyhow::Result>> + { let out = collections .iter() .filter_map(|collection| { - self.inferred_schemas - .get(collection) - .map(|schema| (collection.clone(), schema.clone())) + self.inferred_schemas.get(collection).map(|schema| { + ( + collection.clone(), + validation::InferredSchema { + schema: schema.clone(), + md5: String::from("mock md5"), + }, + ) + }) }) .collect();