Skip to content

Commit

Permalink
build: return all tables after build failure
Browse files Browse the repository at this point in the history
Introduces the `build::BuildOutput` type, which represents the results of a
build.  This new type includes all tables, even when there are build errors.
It replaces the binary `Result<_,_>` type with one that is able to
simultaneously represent both the errors that occurred during the build, as
well as the outputs from the successful stages of the build.

The immediate effect of this, and motivation for doing it, is that if fixes a
bug that prevented the agent from notifying users about collections with
incompatible schema changes.  The agent relies on the presence of
`built_materializations` in the output in order to populate the
`incompatible_collections` array in the publication status.  The prior `Result`
representation didn't propagate `built_materializations` if there were any
`errors`, so this feature could not work.  The new, more complete
representation allows that to work.
  • Loading branch information
psFried committed Nov 10, 2023
1 parent 5f7ea6c commit 77e482e
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 133 deletions.
6 changes: 3 additions & 3 deletions crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ impl PublishHandler {
)
.await?;

let errors = build_output.draft_errors();
let errors = builds::draft_errors(&build_output);
if !errors.is_empty() {
// If there's a build error, then it's possible that it's due to incompatible collection changes.
// We'll report those in the status so that the UI can present a dialog allowing users to take action.
let incompatible_collections = build_output.get_incompatible_collections();
let incompatible_collections = builds::get_incompatible_collections(&build_output);
return stop_with_errors(
errors,
JobStatus::build_failed(incompatible_collections),
Expand Down Expand Up @@ -431,7 +431,7 @@ impl PublishHandler {
// so that this function can observe the `live_specs` that we just updated.
let pub_ids = linked_materializations::create_linked_materialization_publications(
&self.agent_user_email,
&build_output.built_captures,
build_output.built_captures(),
maybe_source_captures,
txn,
)
Expand Down
144 changes: 57 additions & 87 deletions crates/agent/src/publications/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{draft::Error, jobs, logs, Id};
use agent_sql::publications::{ExpandedRow, SpecRow};
use agent_sql::CatalogType;
use anyhow::Context;
use build::BuildOutput;
use itertools::Itertools;
use proto_flow::{
materialize::response::validated::constraint::Type as ConstraintType,
Expand All @@ -13,15 +14,6 @@ use std::collections::{BTreeMap, HashSet};
use std::io::Write;
use std::path;

#[derive(Default)]
pub struct BuildOutput {
pub errors: tables::Errors,
pub built_captures: tables::BuiltCaptures,
pub built_collections: tables::BuiltCollections,
pub built_materializations: tables::BuiltMaterializations,
pub built_tests: tables::BuiltTests,
}

/// Reasons why a draft collection spec would need to be published under a new name.
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Debug)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -56,64 +48,64 @@ pub struct RejectedField {
pub reason: String,
}

impl BuildOutput {
pub fn draft_errors(&self) -> Vec<Error> {
self.errors
.iter()
.map(|e| Error {
scope: Some(e.scope.to_string()),
// Use "alternate" form to include compact, chained error causes.
// See: https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations
detail: format!("{:#}", e.error),
..Default::default()
})
.collect()
}
pub fn draft_errors(output: &BuildOutput) -> Vec<Error> {
output
.errors()
.map(|e| Error {
scope: Some(e.scope.to_string()),
// Use "alternate" form to include compact, chained error causes.
// See: https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations
detail: format!("{:#}", e.error),
..Default::default()
})
.collect()
}

pub fn get_incompatible_collections(&self) -> Vec<IncompatibleCollection> {
// We'll collect a map of collection names to lists of materializations that have rejected the proposed collection changes.
let mut naughty_collections = BTreeMap::new();

// Look at materialization validation responses for any collections that have been rejected due to unsatisfiable constraints.
for mat in self.built_materializations.iter() {
for (i, binding) in mat.validated.bindings.iter().enumerate() {
let Some(collection_name) = mat.spec.bindings[i].collection.as_ref().map(|c| c.name.as_str()) else {
continue;
};
let naughty_fields: Vec<RejectedField> = binding
.constraints
.iter()
.filter(|(_, constraint)| {
constraint.r#type == ConstraintType::Unsatisfiable as i32
})
.map(|(field, constraint)| RejectedField {
field: field.clone(),
reason: constraint.reason.clone(),
})
.collect();
if !naughty_fields.is_empty() {
let affected_consumers = naughty_collections
.entry(collection_name.to_owned())
.or_insert_with(|| Vec::new());
affected_consumers.push(AffectedConsumer {
name: mat.materialization.clone(),
fields: naughty_fields,
});
}
pub fn get_incompatible_collections(output: &BuildOutput) -> Vec<IncompatibleCollection> {
// We'll collect a map of collection names to lists of materializations that have rejected the proposed collection changes.
let mut naughty_collections = BTreeMap::new();

// Look at materialization validation responses for any collections that have been rejected due to unsatisfiable constraints.
for mat in output.built_materializations().iter() {
for (i, binding) in mat.validated.bindings.iter().enumerate() {
let Some(collection_name) = mat.spec.bindings[i]
.collection
.as_ref()
.map(|c| c.name.as_str())
else {
continue;
};
let naughty_fields: Vec<RejectedField> = binding
.constraints
.iter()
.filter(|(_, constraint)| constraint.r#type == ConstraintType::Unsatisfiable as i32)
.map(|(field, constraint)| RejectedField {
field: field.clone(),
reason: constraint.reason.clone(),
})
.collect();
if !naughty_fields.is_empty() {
let affected_consumers = naughty_collections
.entry(collection_name.to_owned())
.or_insert_with(|| Vec::new());
affected_consumers.push(AffectedConsumer {
name: mat.materialization.clone(),
fields: naughty_fields,
});
}
}

naughty_collections
.into_iter()
.map(
|(collection, affected_materializations)| IncompatibleCollection {
collection,
affected_materializations,
requires_recreation: Vec::new(),
},
)
.collect()
}

naughty_collections
.into_iter()
.map(
|(collection, affected_materializations)| IncompatibleCollection {
collection,
affected_materializations,
requires_recreation: Vec::new(),
},
)
.collect()
}

pub async fn build_catalog(
Expand Down Expand Up @@ -179,7 +171,7 @@ pub async fn build_catalog(
..Default::default()
},
&db_path,
build_result.as_ref(),
&build_result,
)?;
let dest_url = builds_root.join(&pub_id.to_string())?;

Expand All @@ -201,29 +193,7 @@ pub async fn build_catalog(
if !persist_job.success() {
anyhow::bail!("persist of {db_path:?} exited with an error");
}

Ok(match build_result {
Ok((
_sources,
tables::Validations {
built_captures,
built_collections,
built_materializations,
built_tests,
errors: _,
},
)) => BuildOutput {
built_captures,
built_collections,
built_materializations,
built_tests,
..Default::default()
},
Err(errors) => BuildOutput {
errors,
..Default::default()
},
})
Ok(build_result)
}

pub async fn data_plane(
Expand Down
22 changes: 11 additions & 11 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::builds::{self, IncompatibleCollection, ReCreateReason};
use super::builds::{IncompatibleCollection, ReCreateReason};
use super::draft::Error;
use agent_sql::publications::{ExpandedRow, SpecRow, Tenant};
use agent_sql::{Capability, CatalogType, Id};
Expand Down Expand Up @@ -475,7 +475,7 @@ pub async fn apply_updates_for_row(
pub async fn add_build_output_to_live_specs(
spec_rows: &[SpecRow],
pruned_collections: &HashSet<String>,
build_output: &builds::BuildOutput,
build_output: &build::BuildOutput,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), sqlx::Error> {
// We use the `draft_spec.is_some()` check throughout in order to avoid
Expand All @@ -484,7 +484,7 @@ pub async fn add_build_output_to_live_specs(
// referenced by other tasks in the build. Technically, only collections are
// ever added to the build output in this way, but we similarly filter the
// others for consistency.
for collection in build_output.built_collections.iter() {
for collection in build_output.built_collections().iter() {
// Note that only non-pruned collections must be updated as part of this function.
// Pruned collections will already have had their live_specs rows deleted.
if let Some(row) = spec_rows
Expand All @@ -504,7 +504,7 @@ pub async fn add_build_output_to_live_specs(
}
}

for capture in build_output.built_captures.iter() {
for capture in build_output.built_captures().iter() {
if let Some(row) = spec_rows
.iter()
.find(|r| r.catalog_name == capture.capture.as_str())
Expand All @@ -514,7 +514,7 @@ pub async fn add_build_output_to_live_specs(
}
}

for materialization in build_output.built_materializations.iter() {
for materialization in build_output.built_materializations().iter() {
if let Some(row) = spec_rows
.iter()
.find(|r| r.catalog_name == materialization.materialization.as_str())
Expand All @@ -525,7 +525,7 @@ pub async fn add_build_output_to_live_specs(
}
}

for test in build_output.built_tests.iter() {
for test in build_output.built_tests().iter() {
if let Some(row) = spec_rows
.iter()
.find(|r| r.catalog_name == test.test.as_str())
Expand All @@ -543,13 +543,13 @@ pub async fn add_build_output_to_live_specs(
// changing in this publication per the list of spec_rows.
pub async fn add_built_specs_to_draft_specs(
spec_rows: &[SpecRow],
build_output: &builds::BuildOutput,
build_output: &build::BuildOutput,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), sqlx::Error> {
// We use the `draft_spec.is_some()` check throughout in order to avoid
// adding built specs to draft_specs that are being deleted by this
// publication. See the comment in `add_build_output_to_live_specs`
for collection in build_output.built_collections.iter() {
for collection in build_output.built_collections().iter() {
if let Some(row) = spec_rows
.iter()
.find(|r| r.catalog_name == collection.collection.as_str())
Expand All @@ -565,7 +565,7 @@ pub async fn add_built_specs_to_draft_specs(
}
}

for capture in build_output.built_captures.iter() {
for capture in build_output.built_captures().iter() {
if let Some(row) = spec_rows
.iter()
.find(|r| r.catalog_name == capture.capture.as_str())
Expand All @@ -581,7 +581,7 @@ pub async fn add_built_specs_to_draft_specs(
}
}

for materialization in build_output.built_materializations.iter() {
for materialization in build_output.built_materializations().iter() {
if let Some(row) = spec_rows
.iter()
.find(|r| r.catalog_name == materialization.materialization.as_str())
Expand All @@ -597,7 +597,7 @@ pub async fn add_built_specs_to_draft_specs(
}
}

for test in build_output.built_tests.iter() {
for test in build_output.built_tests().iter() {
if let Some(row) = spec_rows
.iter()
.find(|r| r.catalog_name == test.test.as_str())
Expand Down
Loading

0 comments on commit 77e482e

Please sign in to comment.