Skip to content

Commit

Permalink
Merge branch 'master' into emily/move_sops_info
Browse files Browse the repository at this point in the history
  • Loading branch information
aeluce authored Jan 6, 2025
2 parents 4619f37 + 44dc0db commit 6776f37
Show file tree
Hide file tree
Showing 29 changed files with 315 additions and 165 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/dekaf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ on:
paths:
- "crates/dekaf/**"
- "crates/proto-**"
- "crates/allocator"
- "crates/avro"
- "crates/doc"
- "crates/flow-client"
- "crates/gazette"
- "crates/json"
- "crates/labels"
- "crates/models"
- "crates/ops"
- "crates/simd-doc"
- "Cargo.lock"
pull_request:
branches: [master]
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/api/public/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn fetch_status(
ls.controller_next_run,
ls.updated_at as "live_spec_updated_at: DateTime<Utc>",
cj.updated_at as "controller_updated_at: DateTime<Utc>",
cj.status as "status: status::Status",
cj.status as "controller_status: status::ControllerStatus",
cj.error as "controller_error: String",
cj.failures as "controller_failures: i32"
from live_specs ls
Expand Down
32 changes: 15 additions & 17 deletions crates/agent/src/controllers/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,19 @@ pub async fn update<C: ControlPlane>(
return Ok(Some(NextRun::immediately()));
}

if model.auto_discover.is_some() {
publication_status::update_activation(&mut status.activation, state, control_plane)
.await
.with_retry(backoff_data_plane_activate(state.failures))?;

publication_status::update_notify_dependents(&mut status.publications, state, control_plane)
.await
.context("failed to notify dependents")?;

if periodic::update_periodic_publish(state, &mut status.publications, control_plane).await? {
return Ok(Some(NextRun::immediately()));
}

let ad_next = if model.auto_discover.is_some() {
let ad_status = status
.auto_discover
.get_or_insert_with(AutoDiscoverStatus::default);
Expand All @@ -56,29 +68,15 @@ pub async fn update<C: ControlPlane>(
if published {
return Ok(Some(NextRun::immediately()));
}
auto_discover::next_run(ad_status)
} else {
// Clear auto-discover status to avoid confusion, but only if
// auto-discover is disabled. We leave the auto-discover status if
// shards are disabled, since it's still useful for debugging.
status.auto_discover = None;
None
};

if periodic::update_periodic_publish(state, &mut status.publications, control_plane).await? {
return Ok(Some(NextRun::immediately()));
}

publication_status::update_activation(&mut status.activation, state, control_plane)
.await
.with_retry(backoff_data_plane_activate(state.failures))?;

publication_status::update_notify_dependents(&mut status.publications, state, control_plane)
.await
.context("failed to notify dependents")?;

let ad_next = status
.auto_discover
.as_ref()
.and_then(|ad| auto_discover::next_run(ad));
let periodic_next = periodic::next_periodic_publish(state);
Ok(NextRun::earliest([ad_next, periodic_next]))
}
4 changes: 2 additions & 2 deletions crates/agent/src/controllers/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
DataPlaneConnectors, HandleResult, Handler,
};

use super::{controller_update, ControllerState, NextRun, RetryableError, Status};
use super::{controller_update, ControllerState, ControllerStatus, NextRun, RetryableError};

use crate::controllers::CONTROLLER_VERSION;

Expand Down Expand Up @@ -132,7 +132,7 @@ impl Handler for ControllerHandler {
))]
async fn run_controller<C: ControlPlane>(
state: &ControllerState,
next_status: &mut Status,
next_status: &mut ControllerStatus,
control_plane: &mut C,
) -> anyhow::Result<Option<NextRun>> {
controller_update(next_status, &state, control_plane).await
Expand Down
10 changes: 5 additions & 5 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) mod publication_status;
use crate::controlplane::ControlPlane;
use anyhow::Context;
use chrono::{DateTime, Utc};
use models::{status::Status, AnySpec, CatalogType, Id};
use models::{status::ControllerStatus, AnySpec, CatalogType, Id};
use proto_flow::{flow, AnyBuiltSpec};
use serde::Serialize;
use sqlx::types::Uuid;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct ControllerState {
pub controller_version: i32,
/// The current `status` of the controller, which represents the before
/// state during an update. This is just informational.
pub current_status: Status,
pub current_status: ControllerStatus,
/// ID of the data plane in which this specification lives. May be zero for tests.
pub data_plane_id: Id,
/// Name of the data plane in which this specification lives. May be `None` for tests.
Expand All @@ -76,8 +76,8 @@ impl ControllerState {
pub fn parse_db_row(
job: &agent_sql::controllers::ControllerJob,
) -> anyhow::Result<ControllerState> {
let status: Status = if job.controller_version == 0 {
Status::Uninitialized
let status: ControllerStatus = if job.controller_version == 0 {
ControllerStatus::Uninitialized
} else {
serde_json::from_str(job.status.get()).context("deserializing controller status")?
};
Expand Down Expand Up @@ -321,7 +321,7 @@ fn backoff_data_plane_activate(prev_failures: i32) -> NextRun {

/// The main logic of a controller run is performed as an update of the status.
async fn controller_update<C: ControlPlane>(
status: &mut Status,
status: &mut ControllerStatus,
state: &ControllerState,
control_plane: &mut C,
) -> anyhow::Result<Option<NextRun>> {
Expand Down
12 changes: 6 additions & 6 deletions crates/agent/src/integration_tests/periodic_publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
},
ControlPlane,
};
use models::{status::Status, CatalogType};
use models::{status::ControllerStatus, CatalogType};
use uuid::Uuid;

#[tokio::test]
Expand Down Expand Up @@ -150,11 +150,11 @@ async fn specs_are_published_periodically() {

let after_state = harness.get_controller_state(&name).await;
let pub_status = match after_state.current_status {
Status::Capture(cap) => cap.publications,
Status::Materialization(m) => m.publications,
Status::Collection(c) => c.publications,
Status::Test(t) => t.publications,
Status::Uninitialized => panic!("unexpected status"),
ControllerStatus::Capture(cap) => cap.publications,
ControllerStatus::Materialization(m) => m.publications,
ControllerStatus::Collection(c) => c.publications,
ControllerStatus::Test(t) => t.publications,
ControllerStatus::Uninitialized => panic!("unexpected status"),
};
assert_eq!(
Some("periodic publication"),
Expand Down
10 changes: 7 additions & 3 deletions crates/flowctl/src/catalog/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ impl crate::output::CliOutput for StatusOutput {
],
);
// Activation Complete is a computed column so we need to add it manually.
let activation_complete = self.0.status.activation_status().map(|activation| {
serde_json::Value::Bool(activation.last_activated == self.0.last_build_id)
});
let activation_complete = self
.0
.controller_status
.activation_status()
.map(|activation| {
serde_json::Value::Bool(activation.last_activated == self.0.last_build_id)
});
row.push(JsonCell(activation_complete));
row
}
Expand Down
60 changes: 30 additions & 30 deletions crates/models/src/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct StatusResponse {
#[schemars(schema_with = "datetime_schema")]
pub controller_updated_at: DateTime<Utc>,
/// The controller status json.
pub status: Status,
pub controller_status: ControllerStatus,
/// Error from the most recent controller run, or `null` if the run was
/// successful.
pub controller_error: Option<String>,
Expand All @@ -51,7 +51,7 @@ pub struct StatusResponse {
/// Represents the internal state of a controller.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)]
#[serde(tag = "type")]
pub enum Status {
pub enum ControllerStatus {
Capture(capture::CaptureStatus),
Collection(collection::CollectionStatus),
Materialization(materialization::MaterializationStatus),
Expand All @@ -61,49 +61,49 @@ pub enum Status {
}

// Status types are serialized as plain json columns.
crate::sqlx_json::sqlx_json!(Status);
crate::sqlx_json::sqlx_json!(ControllerStatus);

impl Status {
impl ControllerStatus {
pub fn catalog_type(&self) -> Option<CatalogType> {
match self {
Status::Capture(_) => Some(CatalogType::Capture),
Status::Collection(_) => Some(CatalogType::Collection),
Status::Materialization(_) => Some(CatalogType::Materialization),
Status::Test(_) => Some(CatalogType::Test),
Status::Uninitialized => None,
ControllerStatus::Capture(_) => Some(CatalogType::Capture),
ControllerStatus::Collection(_) => Some(CatalogType::Collection),
ControllerStatus::Materialization(_) => Some(CatalogType::Materialization),
ControllerStatus::Test(_) => Some(CatalogType::Test),
ControllerStatus::Uninitialized => None,
}
}

pub fn is_uninitialized(&self) -> bool {
matches!(self, Status::Uninitialized)
matches!(self, ControllerStatus::Uninitialized)
}

/// Returns the activation status, if this status is for a capture, collection, or materialization.
pub fn activation_status(&self) -> Option<&publications::ActivationStatus> {
match self {
Status::Capture(c) => Some(&c.activation),
Status::Collection(c) => Some(&c.activation),
Status::Materialization(c) => Some(&c.activation),
ControllerStatus::Capture(c) => Some(&c.activation),
ControllerStatus::Collection(c) => Some(&c.activation),
ControllerStatus::Materialization(c) => Some(&c.activation),
_ => None,
}
}

pub fn as_capture_mut(&mut self) -> anyhow::Result<&mut capture::CaptureStatus> {
if self.is_uninitialized() {
*self = Status::Capture(Default::default());
*self = ControllerStatus::Capture(Default::default());
}
match self {
Status::Capture(c) => Ok(c),
ControllerStatus::Capture(c) => Ok(c),
_ => anyhow::bail!("expected capture status"),
}
}

pub fn as_collection_mut(&mut self) -> anyhow::Result<&mut collection::CollectionStatus> {
if self.is_uninitialized() {
*self = Status::Collection(Default::default());
*self = ControllerStatus::Collection(Default::default());
}
match self {
Status::Collection(c) => Ok(c),
ControllerStatus::Collection(c) => Ok(c),
_ => anyhow::bail!("expected collection status"),
}
}
Expand All @@ -112,48 +112,48 @@ impl Status {
&mut self,
) -> anyhow::Result<&mut materialization::MaterializationStatus> {
if self.is_uninitialized() {
*self = Status::Materialization(Default::default());
*self = ControllerStatus::Materialization(Default::default());
}
match self {
Status::Materialization(m) => Ok(m),
ControllerStatus::Materialization(m) => Ok(m),
_ => anyhow::bail!("expected materialization status"),
}
}

pub fn as_test_mut(&mut self) -> anyhow::Result<&mut catalog_test::TestStatus> {
if self.is_uninitialized() {
*self = Status::Test(Default::default());
*self = ControllerStatus::Test(Default::default());
}
match self {
Status::Test(t) => Ok(t),
ControllerStatus::Test(t) => Ok(t),
_ => anyhow::bail!("expected test status"),
}
}

pub fn unwrap_capture(&self) -> &capture::CaptureStatus {
match self {
Status::Capture(c) => c,
ControllerStatus::Capture(c) => c,
_ => panic!("expected capture status"),
}
}

pub fn unwrap_collection(&self) -> &collection::CollectionStatus {
match self {
Status::Collection(c) => c,
ControllerStatus::Collection(c) => c,
_ => panic!("expected collection status"),
}
}

pub fn unwrap_materialization(&self) -> &materialization::MaterializationStatus {
match self {
Status::Materialization(m) => m,
ControllerStatus::Materialization(m) => m,
_ => panic!("expected materialization status"),
}
}

pub fn unwrap_test(&self) -> &catalog_test::TestStatus {
match self {
Status::Test(t) => t,
ControllerStatus::Test(t) => t,
_ => panic!("expected test status"),
}
}
Expand Down Expand Up @@ -205,7 +205,7 @@ mod test {
let mut history = VecDeque::new();
history.push_front(pub_status);

let status = Status::Materialization(MaterializationStatus {
let status = ControllerStatus::Materialization(MaterializationStatus {
activation: ActivationStatus {
last_activated: Id::new([1, 2, 3, 4, 4, 3, 2, 1]),
},
Expand All @@ -221,15 +221,15 @@ mod test {
});

let as_json = serde_json::to_string_pretty(&status).expect("failed to serialize status");
let round_tripped: Status =
let round_tripped: ControllerStatus =
serde_json::from_str(&as_json).expect("failed to deserialize status");

#[derive(Debug)]
#[allow(unused)]
struct StatusSnapshot {
starting: Status,
starting: ControllerStatus,
json: String,
parsed: Status,
parsed: ControllerStatus,
}

insta::assert_debug_snapshot!(
Expand All @@ -246,7 +246,7 @@ mod test {
fn test_status_json_schema() {
let settings = schemars::gen::SchemaSettings::draft2019_09();
let generator = schemars::gen::SchemaGenerator::new(settings);
let schema_obj = generator.into_root_schema_for::<Status>();
let schema_obj = generator.into_root_schema_for::<ControllerStatus>();
let schema = serde_json::to_value(&schema_obj).unwrap();
insta::assert_json_snapshot!(schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,5 +811,5 @@ expression: schema
"type": "object"
}
],
"title": "Status"
"title": "ControllerStatus"
}
14 changes: 14 additions & 0 deletions crates/validation/src/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,20 @@ async fn walk_materialization(
{
shard_template.id.clone()
} else {
let pub_id = match endpoint {
// Dekaf materializations don't create any shards, so the problem of
// deleting and re-creating tasks with the same name, which this
// shard id template logic was introduced to resolve, isn't applicable.
// Instead, since the Dekaf service uses the task name to authenticate
// whereas the authorization API expects the shard template id, it's
// useful to be able to generate the correct shard template id for a
// Dekaf materialization given only its task name, so we set the pub id
// to a well-known value of all zeros.
models::MaterializationEndpoint::Dekaf(_) => models::Id::zero(),
models::MaterializationEndpoint::Connector(_)
| models::MaterializationEndpoint::Local(_) => pub_id,
};

assemble::shard_id_prefix(pub_id, materialization, labels::TASK_TYPE_MATERIALIZATION)
};

Expand Down
4 changes: 2 additions & 2 deletions site/docs/concepts/advanced/evolutions.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ When you attempt to publish a breaking change to a collection in the Flow web ap

Click the **Apply** button to trigger an evolution and update all necessary specification to keep your Data Flow functioning. Then, review and publish your draft.

If you enabled [AutoDiscover](../captures.md#autodiscover) on a capture, any breaking changes that it introduces will trigger an automatic schema evolution, so long as you selected the **Breaking change re-versions collections** option(`evolveIncompatibleCollections`).
If you enabled [AutoDiscover](../captures.md#autodiscover) on a capture, any breaking changes that it introduces will trigger an automatic schema evolution, so long as you selected the **Breaking change re-versions collections** option (`evolveIncompatibleCollections`).

## What do schema evolutions do?

The schema evolution feature is available in the Flow web app when you're editing pre-existing Flow entities.
It notices when one of your edit would cause other components of the Data Flow to fail, alerts you, and gives you the option to automatically update the specs of these components to prevent failure.
It notices when one of your edits would cause other components of the Data Flow to fail, alerts you, and gives you the option to automatically update the specs of these components to prevent failure.

In other words, evolutions happen in the *draft* state. Whenever you edit, you create a draft.
Evolutions add to the draft so that when it is published and updates the active data flow, operations can continue seamlessly.
Expand Down
Loading

0 comments on commit 6776f37

Please sign in to comment.