From 4a1e97f0cd3fab22385a4b24b60034dcdb54c144 Mon Sep 17 00:00:00 2001 From: Phil Date: Fri, 10 Nov 2023 16:11:14 -0500 Subject: [PATCH] build: return all tables after build failure Introduces the `build::BuildOutput` type, which represents the results of a build. This new type includes all tables, even when there are build errors. It replaces the binary `Result<_,_>` type with one that is able to simultaneously represent both the errors that occurred during the build, as well as the outputs from the successful stages of the build. The immediate effect of this, and motivation for doing it, is that if fixes a bug that prevented the agent from notifying users about collections with incompatible schema changes. The agent relies on the presence of `built_materializations` in the output in order to populate the `incompatible_collections` array in the publication status. The prior `Result` representation didn't propagate `built_materializations` if there were any `errors`, so this feature could not work. The new, more complete representation allows that to work. --- crates/agent/src/publications.rs | 6 +- crates/agent/src/publications/builds.rs | 146 ++++++++++-------------- crates/agent/src/publications/specs.rs | 22 ++-- crates/build/src/lib.rs | 102 ++++++++++++----- crates/flowctl/src/local_specs.rs | 6 +- crates/flowctl/src/raw/mod.rs | 2 +- 6 files changed, 150 insertions(+), 134 deletions(-) diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index e5063cd074..be17ed680b 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -336,11 +336,11 @@ impl PublishHandler { ) .await?; - let errors = build_output.draft_errors(); + let errors = builds::draft_errors(&build_output); if !errors.is_empty() { // If there's a build error, then it's possible that it's due to incompatible collection changes. // We'll report those in the status so that the UI can present a dialog allowing users to take action. - let incompatible_collections = build_output.get_incompatible_collections(); + let incompatible_collections = builds::get_incompatible_collections(&build_output); return stop_with_errors( errors, JobStatus::build_failed(incompatible_collections), @@ -431,7 +431,7 @@ impl PublishHandler { // so that this function can observe the `live_specs` that we just updated. let pub_ids = linked_materializations::create_linked_materialization_publications( &self.agent_user_email, - &build_output.built_captures, + build_output.built_captures(), maybe_source_captures, txn, ) diff --git a/crates/agent/src/publications/builds.rs b/crates/agent/src/publications/builds.rs index 8cc0506d15..1eb136699a 100644 --- a/crates/agent/src/publications/builds.rs +++ b/crates/agent/src/publications/builds.rs @@ -2,6 +2,7 @@ use crate::{draft::Error, jobs, logs, Id}; use agent_sql::publications::{ExpandedRow, SpecRow}; use agent_sql::CatalogType; use anyhow::Context; +use build::Output; use itertools::Itertools; use proto_flow::{ materialize::response::validated::constraint::Type as ConstraintType, @@ -13,15 +14,6 @@ use std::collections::{BTreeMap, HashSet}; use std::io::Write; use std::path; -#[derive(Default)] -pub struct BuildOutput { - pub errors: tables::Errors, - pub built_captures: tables::BuiltCaptures, - pub built_collections: tables::BuiltCollections, - pub built_materializations: tables::BuiltMaterializations, - pub built_tests: tables::BuiltTests, -} - /// Reasons why a draft collection spec would need to be published under a new name. #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Debug)] #[serde(rename_all = "camelCase")] @@ -56,64 +48,64 @@ pub struct RejectedField { pub reason: String, } -impl BuildOutput { - pub fn draft_errors(&self) -> Vec { - self.errors - .iter() - .map(|e| Error { - scope: Some(e.scope.to_string()), - // Use "alternate" form to include compact, chained error causes. - // See: https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations - detail: format!("{:#}", e.error), - ..Default::default() - }) - .collect() - } +pub fn draft_errors(output: &Output) -> Vec { + output + .errors() + .map(|e| Error { + scope: Some(e.scope.to_string()), + // Use "alternate" form to include compact, chained error causes. + // See: https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations + detail: format!("{:#}", e.error), + ..Default::default() + }) + .collect() +} - pub fn get_incompatible_collections(&self) -> Vec { - // We'll collect a map of collection names to lists of materializations that have rejected the proposed collection changes. - let mut naughty_collections = BTreeMap::new(); - - // Look at materialization validation responses for any collections that have been rejected due to unsatisfiable constraints. - for mat in self.built_materializations.iter() { - for (i, binding) in mat.validated.bindings.iter().enumerate() { - let Some(collection_name) = mat.spec.bindings[i].collection.as_ref().map(|c| c.name.as_str()) else { - continue; - }; - let naughty_fields: Vec = binding - .constraints - .iter() - .filter(|(_, constraint)| { - constraint.r#type == ConstraintType::Unsatisfiable as i32 - }) - .map(|(field, constraint)| RejectedField { - field: field.clone(), - reason: constraint.reason.clone(), - }) - .collect(); - if !naughty_fields.is_empty() { - let affected_consumers = naughty_collections - .entry(collection_name.to_owned()) - .or_insert_with(|| Vec::new()); - affected_consumers.push(AffectedConsumer { - name: mat.materialization.clone(), - fields: naughty_fields, - }); - } +pub fn get_incompatible_collections(output: &Output) -> Vec { + // We'll collect a map of collection names to lists of materializations that have rejected the proposed collection changes. + let mut naughty_collections = BTreeMap::new(); + + // Look at materialization validation responses for any collections that have been rejected due to unsatisfiable constraints. + for mat in output.built_materializations().iter() { + for (i, binding) in mat.validated.bindings.iter().enumerate() { + let Some(collection_name) = mat.spec.bindings[i] + .collection + .as_ref() + .map(|c| c.name.as_str()) + else { + continue; + }; + let naughty_fields: Vec = binding + .constraints + .iter() + .filter(|(_, constraint)| constraint.r#type == ConstraintType::Unsatisfiable as i32) + .map(|(field, constraint)| RejectedField { + field: field.clone(), + reason: constraint.reason.clone(), + }) + .collect(); + if !naughty_fields.is_empty() { + let affected_consumers = naughty_collections + .entry(collection_name.to_owned()) + .or_insert_with(|| Vec::new()); + affected_consumers.push(AffectedConsumer { + name: mat.materialization.clone(), + fields: naughty_fields, + }); } } - - naughty_collections - .into_iter() - .map( - |(collection, affected_materializations)| IncompatibleCollection { - collection, - affected_materializations, - requires_recreation: Vec::new(), - }, - ) - .collect() } + + naughty_collections + .into_iter() + .map( + |(collection, affected_materializations)| IncompatibleCollection { + collection, + affected_materializations, + requires_recreation: Vec::new(), + }, + ) + .collect() } pub async fn build_catalog( @@ -126,7 +118,7 @@ pub async fn build_catalog( logs_tx: &logs::Tx, pub_id: Id, tmpdir: &path::Path, -) -> anyhow::Result { +) -> anyhow::Result { // We perform the build under a ./builds/ subdirectory, which is a // specific sub-path expected by temp-data-plane underneath its // working temporary directory. This lets temp-data-plane use the @@ -179,7 +171,7 @@ pub async fn build_catalog( ..Default::default() }, &db_path, - build_result.as_ref(), + &build_result, )?; let dest_url = builds_root.join(&pub_id.to_string())?; @@ -201,29 +193,7 @@ pub async fn build_catalog( if !persist_job.success() { anyhow::bail!("persist of {db_path:?} exited with an error"); } - - Ok(match build_result { - Ok(( - _sources, - tables::Validations { - built_captures, - built_collections, - built_materializations, - built_tests, - errors: _, - }, - )) => BuildOutput { - built_captures, - built_collections, - built_materializations, - built_tests, - ..Default::default() - }, - Err(errors) => BuildOutput { - errors, - ..Default::default() - }, - }) + Ok(build_result) } pub async fn data_plane( diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index b9521a1c6c..0879838fd6 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -1,4 +1,4 @@ -use super::builds::{self, IncompatibleCollection, ReCreateReason}; +use super::builds::{IncompatibleCollection, ReCreateReason}; use super::draft::Error; use agent_sql::publications::{ExpandedRow, SpecRow, Tenant}; use agent_sql::{Capability, CatalogType, Id}; @@ -475,7 +475,7 @@ pub async fn apply_updates_for_row( pub async fn add_build_output_to_live_specs( spec_rows: &[SpecRow], pruned_collections: &HashSet, - build_output: &builds::BuildOutput, + build_output: &build::Output, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { // We use the `draft_spec.is_some()` check throughout in order to avoid @@ -484,7 +484,7 @@ pub async fn add_build_output_to_live_specs( // referenced by other tasks in the build. Technically, only collections are // ever added to the build output in this way, but we similarly filter the // others for consistency. - for collection in build_output.built_collections.iter() { + for collection in build_output.built_collections().iter() { // Note that only non-pruned collections must be updated as part of this function. // Pruned collections will already have had their live_specs rows deleted. if let Some(row) = spec_rows @@ -504,7 +504,7 @@ pub async fn add_build_output_to_live_specs( } } - for capture in build_output.built_captures.iter() { + for capture in build_output.built_captures().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == capture.capture.as_str()) @@ -514,7 +514,7 @@ pub async fn add_build_output_to_live_specs( } } - for materialization in build_output.built_materializations.iter() { + for materialization in build_output.built_materializations().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == materialization.materialization.as_str()) @@ -525,7 +525,7 @@ pub async fn add_build_output_to_live_specs( } } - for test in build_output.built_tests.iter() { + for test in build_output.built_tests().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == test.test.as_str()) @@ -543,13 +543,13 @@ pub async fn add_build_output_to_live_specs( // changing in this publication per the list of spec_rows. pub async fn add_built_specs_to_draft_specs( spec_rows: &[SpecRow], - build_output: &builds::BuildOutput, + build_output: &build::Output, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { // We use the `draft_spec.is_some()` check throughout in order to avoid // adding built specs to draft_specs that are being deleted by this // publication. See the comment in `add_build_output_to_live_specs` - for collection in build_output.built_collections.iter() { + for collection in build_output.built_collections().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == collection.collection.as_str()) @@ -565,7 +565,7 @@ pub async fn add_built_specs_to_draft_specs( } } - for capture in build_output.built_captures.iter() { + for capture in build_output.built_captures().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == capture.capture.as_str()) @@ -581,7 +581,7 @@ pub async fn add_built_specs_to_draft_specs( } } - for materialization in build_output.built_materializations.iter() { + for materialization in build_output.built_materializations().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == materialization.materialization.as_str()) @@ -597,7 +597,7 @@ pub async fn add_built_specs_to_draft_specs( } } - for test in build_output.built_tests.iter() { + for test in build_output.built_tests().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == test.test.as_str()) diff --git a/crates/build/src/lib.rs b/crates/build/src/lib.rs index 09e49b4071..08eef12dbe 100644 --- a/crates/build/src/lib.rs +++ b/crates/build/src/lib.rs @@ -161,6 +161,53 @@ where (sources, validations) } +/// The output of a build, which can be either successful, failed, or anything +/// in between. The "in between" may seem silly, but may be important for +/// some use cases. For example, you may be executing a build for the purpose +/// of getting the collection projections, in which case you may not want to +/// consider errors from materialization validations to be terminal. +pub struct Output { + sources: tables::Sources, + validations: tables::Validations, +} + +impl Output { + pub fn new(sources: tables::Sources, validations: tables::Validations) -> Self { + Output { + sources, + validations, + } + } + + pub fn into_parts(self) -> (tables::Sources, tables::Validations) { + (self.sources, self.validations) + } + + /// Returns an iterator of all errors that have occurred during any phase of the build. + pub fn errors(&self) -> impl Iterator { + self.sources + .errors + .iter() + .chain(self.validations.errors.iter()) + } + + pub fn built_materializations(&self) -> &tables::BuiltMaterializations { + &self.validations.built_materializations + } + + pub fn built_captures(&self) -> &tables::BuiltCaptures { + &self.validations.built_captures + } + + pub fn built_collections(&self) -> &tables::BuiltCollections { + &self.validations.built_collections + } + + pub fn built_tests(&self) -> &tables::BuiltTests { + &self.validations.built_tests + } +} + /// Perform a "managed" build, which is a convenience for: /// * Loading `source` and failing-fast on any load errors. /// * Then performing all validations and producing built specs. @@ -177,47 +224,46 @@ pub async fn managed_build( log_handler: L, project_root: url::Url, source: url::Url, -) -> Result<(tables::Sources, tables::Validations), tables::Errors> +) -> Output where L: Fn(&ops::Log) + Send + Sync + Clone + 'static, { - let (sources, validations) = validate( - allow_local, - &build_id, - &connector_network, - &*control_plane, - true, // Generate ops collections. - log_handler, - false, // Validate captures. - false, // Validate derivations. - false, // Validate materializations. - &project_root, - load(&source, &file_root).await.into_result()?, - ) - .await; + let in_sources = load(&source, &file_root).await; + + let (sources, validations) = if in_sources.errors.is_empty() { + validate( + allow_local, + &build_id, + &connector_network, + &*control_plane, + true, // Generate ops collections. + log_handler, + false, // Validate captures. + false, // Validate derivations. + false, // Validate materializations. + &project_root, + in_sources, + ) + .await + } else { + (in_sources, Default::default()) + }; - Ok((sources, validations.into_result()?)) + Output::new(sources, validations) } /// Persist a managed build Result into the SQLite tables commonly known as a "build DB". pub fn persist( build_config: proto_flow::flow::build_api::Config, db_path: &Path, - result: Result<&(tables::Sources, tables::Validations), &tables::Errors>, + result: &Output, ) -> anyhow::Result<()> { let db = rusqlite::Connection::open(db_path).context("failed to open catalog database")?; - match result { - Ok((sources, validations)) => { - tables::persist_tables(&db, &sources.as_tables()) - .context("failed to persist catalog sources")?; - tables::persist_tables(&db, &validations.as_tables()) - .context("failed to persist catalog validations")?; - } - Err(errors) => { - tables::persist_tables(&db, &[errors]).context("failed to persist catalog errors")?; - } - } + tables::persist_tables(&db, &result.sources.as_tables()) + .context("failed to persist catalog sources")?; + tables::persist_tables(&db, &result.validations.as_tables()) + .context("failed to persist catalog validations")?; // Legacy support: encode and persist a deprecated protobuf build Config. // At the moment, these are still covered by Go snapshot tests. diff --git a/crates/flowctl/src/local_specs.rs b/crates/flowctl/src/local_specs.rs index d2fd4e242d..376d44bfcd 100644 --- a/crates/flowctl/src/local_specs.rs +++ b/crates/flowctl/src/local_specs.rs @@ -108,7 +108,7 @@ async fn validate( }) .collect::(); - let out = (sources, validations); + let out = build::Output::new(sources, validations); // If DEBUG tracing is enabled, then write sources and validations to a // debugging database that can be inspected or shipped to Estuary for support. @@ -119,11 +119,11 @@ async fn validate( .as_secs(); let db_path = std::env::temp_dir().join(format!("flowctl_{seconds}.sqlite")); - build::persist(Default::default(), &db_path, Ok(&out)).expect("failed to write build DB"); + build::persist(Default::default(), &db_path, &out).expect("failed to write build DB"); tracing::debug!(db_path=%db_path.to_string_lossy(), "wrote debugging database"); } - out + out.into_parts() } pub(crate) fn surface_errors(result: Result) -> anyhow::Result { diff --git a/crates/flowctl/src/raw/mod.rs b/crates/flowctl/src/raw/mod.rs index bc140bdba3..4c7a9bdaf3 100644 --- a/crates/flowctl/src/raw/mod.rs +++ b/crates/flowctl/src/raw/mod.rs @@ -231,7 +231,7 @@ async fn do_build(ctx: &mut crate::CliContext, build: &Build) -> anyhow::Result< ..Default::default() }; - build::persist(build_config, &db_path, build_result.as_ref())?; + build::persist(build_config, &db_path, &build_result)?; Ok(()) }