Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Nov 2, 2023
1 parent 8ca7b7d commit f227805
Show file tree
Hide file tree
Showing 17 changed files with 1,776 additions and 431 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions crates/flowctl/src/raw/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ pub async fn do_capture(
.map(|i| i.clone().into())
.unwrap_or(std::time::Duration::from_secs(1));

let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
_ = ticker.tick().await; // First tick is immediate.

let mut output = std::io::stdout();

// TODO(johnny): This is currently only partly implemented, but is awaiting
Expand All @@ -147,15 +143,18 @@ pub async fn do_capture(

// Upon a checkpoint, wait until the next tick interval has elapsed before acknowledging.
if let Some(_checkpoint) = response.checkpoint {
_ = ticker.tick().await;

request_tx
.send(Ok(capture::Request {
acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }),
..Default::default()
}))
.await
.unwrap();
let mut request_tx = request_tx.clone();
tokio::spawn(async move {
() = tokio::time::sleep(interval).await;

request_tx
.feed(Ok(capture::Request {
acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }),
..Default::default()
}))
.await
.unwrap();
});
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/ops/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::io::Write;
pub mod decode;
pub mod tracing;

pub use proto_flow::ops::log::Level as LogLevel;
pub use proto_flow::ops::Log;
pub use proto_flow::ops::TaskType;
// Re-export many types from proto_flow::ops, so that users of this crate
// don't also have to use that module.
pub use proto_flow::ops::{log::Level as LogLevel, stats, Log, Meta, ShardRef, Stats, TaskType};

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
Expand All @@ -22,7 +22,7 @@ pub struct Shard {
r_clock_begin: HexU32,
}

impl From<Shard> for proto_flow::ops::ShardRef {
impl From<Shard> for ShardRef {
fn from(
Shard {
kind,
Expand Down
47 changes: 47 additions & 0 deletions crates/proto-flow/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,47 @@ pub struct Container {
pub struct CaptureRequestExt {
#[prost(message, optional, tag = "1")]
pub labels: ::core::option::Option<super::ops::ShardLabeling>,
#[prost(message, optional, tag = "2")]
pub open: ::core::option::Option<capture_request_ext::Open>,
}
/// Nested message and enum types in `CaptureRequestExt`.
pub mod capture_request_ext {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Open {
/// RocksDB descriptor which should be opened.
#[prost(message, optional, tag = "1")]
pub rocksdb_descriptor: ::core::option::Option<super::RocksDbDescriptor>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CaptureResponseExt {
#[prost(message, optional, tag = "1")]
pub container: ::core::option::Option<Container>,
#[prost(message, optional, tag = "2")]
pub captured: ::core::option::Option<capture_response_ext::Captured>,
#[prost(message, optional, tag = "4")]
pub checkpoint: ::core::option::Option<capture_response_ext::Checkpoint>,
}
/// Nested message and enum types in `CaptureResponseExt`.
pub mod capture_response_ext {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Captured {
/// Packed key extracted from the captured document.
#[prost(bytes = "bytes", tag = "1")]
pub key_packed: ::prost::bytes::Bytes,
/// Packed partition values extracted from the captured document.
#[prost(bytes = "bytes", tag = "2")]
pub partitions_packed: ::prost::bytes::Bytes,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Checkpoint {
#[prost(message, optional, tag = "1")]
pub stats: ::core::option::Option<super::super::ops::Stats>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -227,6 +262,18 @@ pub mod derive_response_ext {
pub struct MaterializeRequestExt {
#[prost(message, optional, tag = "1")]
pub labels: ::core::option::Option<super::ops::ShardLabeling>,
#[prost(message, optional, tag = "2")]
pub open: ::core::option::Option<materialize_request_ext::Open>,
}
/// Nested message and enum types in `MaterializeRequestExt`.
pub mod materialize_request_ext {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Open {
/// RocksDB descriptor which should be opened.
#[prost(message, optional, tag = "1")]
pub rocksdb_descriptor: ::core::option::Option<super::RocksDbDescriptor>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ coroutines = { path = "../coroutines" }
derive-sqlite = { path = "../derive-sqlite" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
json = { path = "../json" }
locate-bin = { path = "../locate-bin" }
models = { path = "../models" }
ops = { path = "../ops" }
Expand Down
Loading

0 comments on commit f227805

Please sign in to comment.