Skip to content

Commit

Permalink
flowctl: remove unchanged specs using the sql function
Browse files Browse the repository at this point in the history
Updates `flowctl catalog publish` to remove unchanged specs using the new
`prune_unchanged_draft_specs` RPC. This allows the pruning to account for
changes to inferred schemas, and ensures that the behavior is consistent
that of the `discovers` handler.
  • Loading branch information
psFried committed Oct 11, 2023
1 parent f85c049 commit e943e8f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 95 deletions.
33 changes: 33 additions & 0 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,39 @@ pub struct SpecSummaryItem {
pub catalog_name: String,
pub spec_type: Option<String>,
}

impl SpecSummaryItem {
fn summarize_catalog(catalog: models::Catalog) -> Vec<SpecSummaryItem> {
let mut summary = Vec::with_capacity(catalog.spec_count());
let models::Catalog {
captures,
collections,
materializations,
tests,
..
} = catalog;

summary.extend(captures.into_keys().map(|k| SpecSummaryItem {
catalog_name: k.into(),
spec_type: Some(String::from("capture")),
}));
summary.extend(collections.into_keys().map(|k| SpecSummaryItem {
catalog_name: k.into(),
spec_type: Some(String::from("collection")),
}));
summary.extend(materializations.into_keys().map(|k| SpecSummaryItem {
catalog_name: k.into(),
spec_type: Some(String::from("materialization")),
}));
summary.extend(tests.into_keys().map(|k| SpecSummaryItem {
catalog_name: k.into(),
spec_type: Some(String::from("test")),
}));

summary
}
}

impl CliOutput for SpecSummaryItem {
type TableAlt = ();
type CellValue = JsonCell;
Expand Down
78 changes: 22 additions & 56 deletions crates/flowctl/src/catalog/publish.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{api_exec, controlplane, draft, local_specs, CliContext};
use crate::{catalog::SpecSummaryItem, controlplane, draft, local_specs, CliContext};
use anyhow::Context;
use itertools::Itertools;
use serde::Deserialize;
use std::collections::HashMap;

#[derive(Debug, clap::Args)]
pub struct Publish {
Expand All @@ -17,46 +14,6 @@ pub struct Publish {
auto_approve: bool,
}

// TODO(whb): This page size is pretty arbitrary, but seemed to work fine during my tests. It needs
// to be large enough to be reasonably efficient for large numbers of specs, but small enough to not
// exceed query length limitations.
const MD5_PAGE_SIZE: usize = 100;

pub async fn remove_unchanged(
client: &controlplane::Client,
mut input_catalog: models::Catalog,
) -> anyhow::Result<models::Catalog> {
let mut spec_checksums: HashMap<String, String> = HashMap::new();

#[derive(Deserialize, Debug)]
struct SpecChecksumRow {
catalog_name: String,
md5: Option<String>,
}

let spec_names = input_catalog.all_spec_names();
for names in &spec_names.into_iter().chunks(MD5_PAGE_SIZE) {
let builder = client
.from("live_specs_ext")
.select("catalog_name,md5")
.in_("catalog_name", names);

let rows: Vec<SpecChecksumRow> = api_exec(builder).await?;
let chunk_checksums = rows.iter().filter_map(|row| {
if let Some(md5) = row.md5.as_ref() {
Some((row.catalog_name.clone(), md5.clone()))
} else {
None
}
});
spec_checksums.extend(chunk_checksums);
}

sources::remove_unchanged_specs(&spec_checksums, &mut input_catalog);

Ok(input_catalog)
}

pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<()> {
use crossterm::tty::IsTty;

Expand All @@ -70,24 +27,33 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<

let (sources, _validations) =
local_specs::load_and_validate(client.clone(), &args.source).await?;
let mut catalog = remove_unchanged(&client, local_specs::into_catalog(sources)).await?;
let catalog = local_specs::into_catalog(sources);

// `remove_unchanged` used to sneakily remove any storage mappings from the
// catalog by copying everything except storage mappings into a new catalog.
// This retains that behavior while making it a bit more explicit.
catalog.storage_mappings.clear();
let draft = draft::create_draft(client.clone()).await?;
println!("Created draft: {}", &draft.id);
tracing::info!(draft_id = %draft.id, "created draft");
draft::upsert_draft_specs(client.clone(), &draft.id, &catalog).await?;

let removed = draft::remove_unchanged(&client, &draft.id).await?;
if !removed.is_empty() {
println!("The following specs are identical to the currently published specs, and have been pruned from the draft:");
for name in removed.iter() {
println!("{name}");
}
println!(""); // blank line to give a bit of spacing
}

if catalog.is_empty() {
let mut summary = SpecSummaryItem::summarize_catalog(catalog);
summary.retain(|s| !removed.contains(&s.catalog_name));

if summary.is_empty() {
println!("No specs would be changed by this publication, nothing to publish.");
try_delete_draft(client, &draft.id).await;
return Ok(());
}

let draft = draft::create_draft(client.clone()).await?;
println!("Created draft: {}", &draft.id);
tracing::info!(draft_id = %draft.id, "created draft");
let spec_rows = draft::upsert_draft_specs(client.clone(), &draft.id, &catalog).await?;
println!("Will publish the following {} specs", spec_rows.len());
ctx.write_all(spec_rows, ())?;
println!("Will publish the following {} specs", summary.len());
ctx.write_all(summary, ())?;

if !(args.auto_approve || prompt_to_continue().await) {
println!("\nCancelling");
Expand Down
13 changes: 13 additions & 0 deletions crates/flowctl/src/draft/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::BTreeSet;

use crate::{
api_exec, api_exec_paginated,
controlplane::Client,
output::{to_table_row, CliOutput, JsonCell},
};
use anyhow::Context;
use serde::{Deserialize, Serialize};

mod author;
Expand Down Expand Up @@ -278,6 +281,16 @@ async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> {
ctx.write_all(rows, ())
}

/// Invokes the `prune_unchanged_draft_specs` RPC (SQL function), which removes any draft specs
/// that are identical to their live specs, accounting for changes to inferred schemas.
/// Returns the set of specs that were removed from the draft (as a `BTreeSet` so they're ordered).
pub async fn remove_unchanged(client: &Client, draft_id: &str) -> anyhow::Result<BTreeSet<String>> {
let params = serde_json::to_string(&serde_json::json!({ "draft_id": draft_id })).unwrap();
api_exec(client.rpc("prune_unchanged_draft_specs", params))
.await
.context("pruning unchanged specs")
}

async fn do_select(
ctx: &mut crate::CliContext,
Select { id: select_id }: &Select,
Expand Down
39 changes: 0 additions & 39 deletions crates/sources/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ pub mod merge;
pub mod scenarios;
mod scope;

use std::collections::HashMap;

pub use bundle_schema::bundle_schema;
pub use indirect::{indirect_large_files, rebuild_catalog_resources};
pub use inline::{inline_capture, inline_sources};
pub use loader::{Fetcher, LoadError, Loader};
pub use scope::Scope;
use serde::Serialize;

#[derive(Copy, Clone, Debug)]
pub enum Format {
Expand Down Expand Up @@ -56,39 +53,3 @@ impl Format {
buf
}
}

/// Computes an md5 hash of each spec in the catalog, and checks it against the
/// given set of `spec_checksums`. If `spec_checksums` contains an equivalent
/// hash for an item, then it is removed from the catalog. The input checksums
/// must be formatted as lowercase hex strings.
pub fn remove_unchanged_specs(
spec_checksums: &HashMap<String, String>,
catalog: &mut models::Catalog,
) {
catalog
.collections
.retain(|name, spec| is_spec_changed(&spec_checksums, name, spec));
catalog
.captures
.retain(|name, spec| is_spec_changed(&spec_checksums, name, spec));
catalog
.materializations
.retain(|name, spec| is_spec_changed(&spec_checksums, name, spec));
catalog
.tests
.retain(|name, spec| is_spec_changed(&spec_checksums, name, spec));
}

fn is_spec_changed(
existing_specs: &HashMap<String, String>,
new_catalog_name: &impl AsRef<str>,
new_catalog_spec: &impl Serialize,
) -> bool {
if let Some(existing_spec_md5) = existing_specs.get(&new_catalog_name.as_ref().to_string()) {
let buf = serde_json::to_vec(new_catalog_spec).expect("new spec must be serializable");
let new_spec_md5 = format!("{:x}", md5::compute(&buf));
return *existing_spec_md5 != new_spec_md5;
}
// If there's no existing md5, then the spec is new, which is considered a change.
true
}

0 comments on commit e943e8f

Please sign in to comment.