diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index e5063cd074..be17ed680b 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -336,11 +336,11 @@ impl PublishHandler { ) .await?; - let errors = build_output.draft_errors(); + let errors = builds::draft_errors(&build_output); if !errors.is_empty() { // If there's a build error, then it's possible that it's due to incompatible collection changes. // We'll report those in the status so that the UI can present a dialog allowing users to take action. - let incompatible_collections = build_output.get_incompatible_collections(); + let incompatible_collections = builds::get_incompatible_collections(&build_output); return stop_with_errors( errors, JobStatus::build_failed(incompatible_collections), @@ -431,7 +431,7 @@ impl PublishHandler { // so that this function can observe the `live_specs` that we just updated. let pub_ids = linked_materializations::create_linked_materialization_publications( &self.agent_user_email, - &build_output.built_captures, + build_output.built_captures(), maybe_source_captures, txn, ) diff --git a/crates/agent/src/publications/builds.rs b/crates/agent/src/publications/builds.rs index 8cc0506d15..1eb136699a 100644 --- a/crates/agent/src/publications/builds.rs +++ b/crates/agent/src/publications/builds.rs @@ -2,6 +2,7 @@ use crate::{draft::Error, jobs, logs, Id}; use agent_sql::publications::{ExpandedRow, SpecRow}; use agent_sql::CatalogType; use anyhow::Context; +use build::Output; use itertools::Itertools; use proto_flow::{ materialize::response::validated::constraint::Type as ConstraintType, @@ -13,15 +14,6 @@ use std::collections::{BTreeMap, HashSet}; use std::io::Write; use std::path; -#[derive(Default)] -pub struct BuildOutput { - pub errors: tables::Errors, - pub built_captures: tables::BuiltCaptures, - pub built_collections: tables::BuiltCollections, - pub built_materializations: tables::BuiltMaterializations, - pub built_tests: tables::BuiltTests, -} - /// Reasons why a draft collection spec would need to be published under a new name. #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Debug)] #[serde(rename_all = "camelCase")] @@ -56,64 +48,64 @@ pub struct RejectedField { pub reason: String, } -impl BuildOutput { - pub fn draft_errors(&self) -> Vec { - self.errors - .iter() - .map(|e| Error { - scope: Some(e.scope.to_string()), - // Use "alternate" form to include compact, chained error causes. - // See: https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations - detail: format!("{:#}", e.error), - ..Default::default() - }) - .collect() - } +pub fn draft_errors(output: &Output) -> Vec { + output + .errors() + .map(|e| Error { + scope: Some(e.scope.to_string()), + // Use "alternate" form to include compact, chained error causes. + // See: https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations + detail: format!("{:#}", e.error), + ..Default::default() + }) + .collect() +} - pub fn get_incompatible_collections(&self) -> Vec { - // We'll collect a map of collection names to lists of materializations that have rejected the proposed collection changes. - let mut naughty_collections = BTreeMap::new(); - - // Look at materialization validation responses for any collections that have been rejected due to unsatisfiable constraints. - for mat in self.built_materializations.iter() { - for (i, binding) in mat.validated.bindings.iter().enumerate() { - let Some(collection_name) = mat.spec.bindings[i].collection.as_ref().map(|c| c.name.as_str()) else { - continue; - }; - let naughty_fields: Vec = binding - .constraints - .iter() - .filter(|(_, constraint)| { - constraint.r#type == ConstraintType::Unsatisfiable as i32 - }) - .map(|(field, constraint)| RejectedField { - field: field.clone(), - reason: constraint.reason.clone(), - }) - .collect(); - if !naughty_fields.is_empty() { - let affected_consumers = naughty_collections - .entry(collection_name.to_owned()) - .or_insert_with(|| Vec::new()); - affected_consumers.push(AffectedConsumer { - name: mat.materialization.clone(), - fields: naughty_fields, - }); - } +pub fn get_incompatible_collections(output: &Output) -> Vec { + // We'll collect a map of collection names to lists of materializations that have rejected the proposed collection changes. + let mut naughty_collections = BTreeMap::new(); + + // Look at materialization validation responses for any collections that have been rejected due to unsatisfiable constraints. + for mat in output.built_materializations().iter() { + for (i, binding) in mat.validated.bindings.iter().enumerate() { + let Some(collection_name) = mat.spec.bindings[i] + .collection + .as_ref() + .map(|c| c.name.as_str()) + else { + continue; + }; + let naughty_fields: Vec = binding + .constraints + .iter() + .filter(|(_, constraint)| constraint.r#type == ConstraintType::Unsatisfiable as i32) + .map(|(field, constraint)| RejectedField { + field: field.clone(), + reason: constraint.reason.clone(), + }) + .collect(); + if !naughty_fields.is_empty() { + let affected_consumers = naughty_collections + .entry(collection_name.to_owned()) + .or_insert_with(|| Vec::new()); + affected_consumers.push(AffectedConsumer { + name: mat.materialization.clone(), + fields: naughty_fields, + }); } } - - naughty_collections - .into_iter() - .map( - |(collection, affected_materializations)| IncompatibleCollection { - collection, - affected_materializations, - requires_recreation: Vec::new(), - }, - ) - .collect() } + + naughty_collections + .into_iter() + .map( + |(collection, affected_materializations)| IncompatibleCollection { + collection, + affected_materializations, + requires_recreation: Vec::new(), + }, + ) + .collect() } pub async fn build_catalog( @@ -126,7 +118,7 @@ pub async fn build_catalog( logs_tx: &logs::Tx, pub_id: Id, tmpdir: &path::Path, -) -> anyhow::Result { +) -> anyhow::Result { // We perform the build under a ./builds/ subdirectory, which is a // specific sub-path expected by temp-data-plane underneath its // working temporary directory. This lets temp-data-plane use the @@ -179,7 +171,7 @@ pub async fn build_catalog( ..Default::default() }, &db_path, - build_result.as_ref(), + &build_result, )?; let dest_url = builds_root.join(&pub_id.to_string())?; @@ -201,29 +193,7 @@ pub async fn build_catalog( if !persist_job.success() { anyhow::bail!("persist of {db_path:?} exited with an error"); } - - Ok(match build_result { - Ok(( - _sources, - tables::Validations { - built_captures, - built_collections, - built_materializations, - built_tests, - errors: _, - }, - )) => BuildOutput { - built_captures, - built_collections, - built_materializations, - built_tests, - ..Default::default() - }, - Err(errors) => BuildOutput { - errors, - ..Default::default() - }, - }) + Ok(build_result) } pub async fn data_plane( diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index b9521a1c6c..0879838fd6 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -1,4 +1,4 @@ -use super::builds::{self, IncompatibleCollection, ReCreateReason}; +use super::builds::{IncompatibleCollection, ReCreateReason}; use super::draft::Error; use agent_sql::publications::{ExpandedRow, SpecRow, Tenant}; use agent_sql::{Capability, CatalogType, Id}; @@ -475,7 +475,7 @@ pub async fn apply_updates_for_row( pub async fn add_build_output_to_live_specs( spec_rows: &[SpecRow], pruned_collections: &HashSet, - build_output: &builds::BuildOutput, + build_output: &build::Output, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { // We use the `draft_spec.is_some()` check throughout in order to avoid @@ -484,7 +484,7 @@ pub async fn add_build_output_to_live_specs( // referenced by other tasks in the build. Technically, only collections are // ever added to the build output in this way, but we similarly filter the // others for consistency. - for collection in build_output.built_collections.iter() { + for collection in build_output.built_collections().iter() { // Note that only non-pruned collections must be updated as part of this function. // Pruned collections will already have had their live_specs rows deleted. if let Some(row) = spec_rows @@ -504,7 +504,7 @@ pub async fn add_build_output_to_live_specs( } } - for capture in build_output.built_captures.iter() { + for capture in build_output.built_captures().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == capture.capture.as_str()) @@ -514,7 +514,7 @@ pub async fn add_build_output_to_live_specs( } } - for materialization in build_output.built_materializations.iter() { + for materialization in build_output.built_materializations().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == materialization.materialization.as_str()) @@ -525,7 +525,7 @@ pub async fn add_build_output_to_live_specs( } } - for test in build_output.built_tests.iter() { + for test in build_output.built_tests().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == test.test.as_str()) @@ -543,13 +543,13 @@ pub async fn add_build_output_to_live_specs( // changing in this publication per the list of spec_rows. pub async fn add_built_specs_to_draft_specs( spec_rows: &[SpecRow], - build_output: &builds::BuildOutput, + build_output: &build::Output, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { // We use the `draft_spec.is_some()` check throughout in order to avoid // adding built specs to draft_specs that are being deleted by this // publication. See the comment in `add_build_output_to_live_specs` - for collection in build_output.built_collections.iter() { + for collection in build_output.built_collections().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == collection.collection.as_str()) @@ -565,7 +565,7 @@ pub async fn add_built_specs_to_draft_specs( } } - for capture in build_output.built_captures.iter() { + for capture in build_output.built_captures().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == capture.capture.as_str()) @@ -581,7 +581,7 @@ pub async fn add_built_specs_to_draft_specs( } } - for materialization in build_output.built_materializations.iter() { + for materialization in build_output.built_materializations().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == materialization.materialization.as_str()) @@ -597,7 +597,7 @@ pub async fn add_built_specs_to_draft_specs( } } - for test in build_output.built_tests.iter() { + for test in build_output.built_tests().iter() { if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == test.test.as_str()) diff --git a/crates/build/src/lib.rs b/crates/build/src/lib.rs index 09e49b4071..08eef12dbe 100644 --- a/crates/build/src/lib.rs +++ b/crates/build/src/lib.rs @@ -161,6 +161,53 @@ where (sources, validations) } +/// The output of a build, which can be either successful, failed, or anything +/// in between. The "in between" may seem silly, but may be important for +/// some use cases. For example, you may be executing a build for the purpose +/// of getting the collection projections, in which case you may not want to +/// consider errors from materialization validations to be terminal. +pub struct Output { + sources: tables::Sources, + validations: tables::Validations, +} + +impl Output { + pub fn new(sources: tables::Sources, validations: tables::Validations) -> Self { + Output { + sources, + validations, + } + } + + pub fn into_parts(self) -> (tables::Sources, tables::Validations) { + (self.sources, self.validations) + } + + /// Returns an iterator of all errors that have occurred during any phase of the build. + pub fn errors(&self) -> impl Iterator { + self.sources + .errors + .iter() + .chain(self.validations.errors.iter()) + } + + pub fn built_materializations(&self) -> &tables::BuiltMaterializations { + &self.validations.built_materializations + } + + pub fn built_captures(&self) -> &tables::BuiltCaptures { + &self.validations.built_captures + } + + pub fn built_collections(&self) -> &tables::BuiltCollections { + &self.validations.built_collections + } + + pub fn built_tests(&self) -> &tables::BuiltTests { + &self.validations.built_tests + } +} + /// Perform a "managed" build, which is a convenience for: /// * Loading `source` and failing-fast on any load errors. /// * Then performing all validations and producing built specs. @@ -177,47 +224,46 @@ pub async fn managed_build( log_handler: L, project_root: url::Url, source: url::Url, -) -> Result<(tables::Sources, tables::Validations), tables::Errors> +) -> Output where L: Fn(&ops::Log) + Send + Sync + Clone + 'static, { - let (sources, validations) = validate( - allow_local, - &build_id, - &connector_network, - &*control_plane, - true, // Generate ops collections. - log_handler, - false, // Validate captures. - false, // Validate derivations. - false, // Validate materializations. - &project_root, - load(&source, &file_root).await.into_result()?, - ) - .await; + let in_sources = load(&source, &file_root).await; + + let (sources, validations) = if in_sources.errors.is_empty() { + validate( + allow_local, + &build_id, + &connector_network, + &*control_plane, + true, // Generate ops collections. + log_handler, + false, // Validate captures. + false, // Validate derivations. + false, // Validate materializations. + &project_root, + in_sources, + ) + .await + } else { + (in_sources, Default::default()) + }; - Ok((sources, validations.into_result()?)) + Output::new(sources, validations) } /// Persist a managed build Result into the SQLite tables commonly known as a "build DB". pub fn persist( build_config: proto_flow::flow::build_api::Config, db_path: &Path, - result: Result<&(tables::Sources, tables::Validations), &tables::Errors>, + result: &Output, ) -> anyhow::Result<()> { let db = rusqlite::Connection::open(db_path).context("failed to open catalog database")?; - match result { - Ok((sources, validations)) => { - tables::persist_tables(&db, &sources.as_tables()) - .context("failed to persist catalog sources")?; - tables::persist_tables(&db, &validations.as_tables()) - .context("failed to persist catalog validations")?; - } - Err(errors) => { - tables::persist_tables(&db, &[errors]).context("failed to persist catalog errors")?; - } - } + tables::persist_tables(&db, &result.sources.as_tables()) + .context("failed to persist catalog sources")?; + tables::persist_tables(&db, &result.validations.as_tables()) + .context("failed to persist catalog validations")?; // Legacy support: encode and persist a deprecated protobuf build Config. // At the moment, these are still covered by Go snapshot tests. diff --git a/crates/flowctl/src/local_specs.rs b/crates/flowctl/src/local_specs.rs index d2fd4e242d..376d44bfcd 100644 --- a/crates/flowctl/src/local_specs.rs +++ b/crates/flowctl/src/local_specs.rs @@ -108,7 +108,7 @@ async fn validate( }) .collect::(); - let out = (sources, validations); + let out = build::Output::new(sources, validations); // If DEBUG tracing is enabled, then write sources and validations to a // debugging database that can be inspected or shipped to Estuary for support. @@ -119,11 +119,11 @@ async fn validate( .as_secs(); let db_path = std::env::temp_dir().join(format!("flowctl_{seconds}.sqlite")); - build::persist(Default::default(), &db_path, Ok(&out)).expect("failed to write build DB"); + build::persist(Default::default(), &db_path, &out).expect("failed to write build DB"); tracing::debug!(db_path=%db_path.to_string_lossy(), "wrote debugging database"); } - out + out.into_parts() } pub(crate) fn surface_errors(result: Result) -> anyhow::Result { diff --git a/crates/flowctl/src/raw/mod.rs b/crates/flowctl/src/raw/mod.rs index bc140bdba3..4c7a9bdaf3 100644 --- a/crates/flowctl/src/raw/mod.rs +++ b/crates/flowctl/src/raw/mod.rs @@ -231,7 +231,7 @@ async fn do_build(ctx: &mut crate::CliContext, build: &Build) -> anyhow::Result< ..Default::default() }; - build::persist(build_config, &db_path, build_result.as_ref())?; + build::persist(build_config, &db_path, &build_result)?; Ok(()) }