Skip to content

Commit

Permalink
changes to keep
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Nov 6, 2023
1 parent 9e405ab commit 6b01669
Show file tree
Hide file tree
Showing 14 changed files with 2,099 additions and 491 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/doc/src/combine/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ impl MemTable {
&self.zz_alloc
}

/// Parse a JSON document string into a HeapNode using this MemTable's allocator.
pub fn parse_json_str<'s>(&'s self, doc_json: &str) -> serde_json::Result<HeapNode<'s>> {
let mut de = serde_json::Deserializer::from_str(doc_json);
HeapNode::from_serde(&mut de, self.alloc())
}

/// Add the document to the MemTable.
pub fn add<'s>(&'s self, binding: u32, root: HeapNode<'s>, front: bool) -> Result<(), Error> {
// Safety: mutable borrow does not escape this function.
Expand Down
8 changes: 8 additions & 0 deletions crates/doc/src/combine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ impl Iterator for Drainer {
}

impl Drainer {
/// Drain the next document of this Drainer.
pub fn drain_next(&mut self) -> Result<Option<DrainedDoc>, Error> {
match self {
Self::Mem { drainer, .. } => drainer.drain_next(),
Self::Spill { drainer } => drainer.drain_next(),
}
}

/// Map this Drainer into a new and empty Accumulator.
/// Any un-drained documents are dropped.
pub fn into_new_accumulator(self) -> Result<Accumulator, Error> {
Expand Down
2 changes: 1 addition & 1 deletion crates/doc/src/combine/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ pub struct SpillDrainer<F: io::Read + io::Seek> {
unsafe impl<F: io::Read + io::Seek> Send for SpillDrainer<F> {}

impl<F: io::Read + io::Seek> SpillDrainer<F> {
fn drain_next(&mut self) -> Result<Option<DrainedDoc>, Error> {
pub fn drain_next(&mut self) -> Result<Option<DrainedDoc>, Error> {
let Some(cmp::Reverse(segment)) = self.heap.pop() else {
return Ok(None);
};
Expand Down
3 changes: 2 additions & 1 deletion crates/doc/src/reduce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,12 @@ fn compare_lazy<L: AsNode, R: AsNode>(
/// merge_patch_schema returns a JSON-Schema implementing the RFC-7396 Merge patch algorithm.
pub fn merge_patch_schema() -> serde_json::Value {
serde_json::json!({
"$id": "flow://merge-patch-schema",
"oneOf": [
{
"type": "object",
"reduce": {"strategy": "merge"},
"additionalProperties": {"$ref": "#"}
"additionalProperties": {"$ref": "flow://merge-patch-schema"}
},
{
"type": "null",
Expand Down
3 changes: 1 addition & 2 deletions crates/flowctl/src/raw/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ pub async fn do_spec(
format!("spec/{}", capture.capture),
)
.unary_capture(spec_req, build::CONNECTOR_TIMEOUT)
.await
.map_err(crate::status_to_anyhow)?
.await?
.spec
.context("connector didn't send expected Spec response")?;

Expand Down
8 changes: 4 additions & 4 deletions crates/ops/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::io::Write;
pub mod decode;
pub mod tracing;

pub use proto_flow::ops::log::Level as LogLevel;
pub use proto_flow::ops::Log;
pub use proto_flow::ops::TaskType;
// Re-export many types from proto_flow::ops, so that users of this crate
// don't also have to use that module.
pub use proto_flow::ops::{log::Level as LogLevel, stats, Log, Meta, ShardRef, Stats, TaskType};

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
Expand All @@ -22,7 +22,7 @@ pub struct Shard {
r_clock_begin: HexU32,
}

impl From<Shard> for proto_flow::ops::ShardRef {
impl From<Shard> for ShardRef {
fn from(
Shard {
kind,
Expand Down
58 changes: 58 additions & 0 deletions crates/proto-flow/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,58 @@ pub struct Container {
pub struct CaptureRequestExt {
#[prost(message, optional, tag = "1")]
pub labels: ::core::option::Option<super::ops::ShardLabeling>,
#[prost(message, optional, tag = "2")]
pub open: ::core::option::Option<capture_request_ext::Open>,
#[prost(message, optional, tag = "3")]
pub start_commit: ::core::option::Option<capture_request_ext::StartCommit>,
}
/// Nested message and enum types in `CaptureRequestExt`.
pub mod capture_request_ext {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Open {
/// RocksDB descriptor which should be opened.
#[prost(message, optional, tag = "1")]
pub rocksdb_descriptor: ::core::option::Option<super::RocksDbDescriptor>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StartCommit {
/// Flow runtime checkpoint associated with this transaction.
#[prost(message, optional, tag = "1")]
pub runtime_checkpoint: ::core::option::Option<
::proto_gazette::consumer::Checkpoint,
>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CaptureResponseExt {
#[prost(message, optional, tag = "1")]
pub container: ::core::option::Option<Container>,
#[prost(message, optional, tag = "2")]
pub captured: ::core::option::Option<capture_response_ext::Captured>,
#[prost(message, optional, tag = "4")]
pub checkpoint: ::core::option::Option<capture_response_ext::Checkpoint>,
}
/// Nested message and enum types in `CaptureResponseExt`.
pub mod capture_response_ext {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Captured {
/// Packed key extracted from the captured document.
#[prost(bytes = "bytes", tag = "1")]
pub key_packed: ::prost::bytes::Bytes,
/// Packed partition values extracted from the captured document.
#[prost(bytes = "bytes", tag = "2")]
pub partitions_packed: ::prost::bytes::Bytes,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Checkpoint {
#[prost(message, optional, tag = "1")]
pub stats: ::core::option::Option<super::super::ops::Stats>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -227,6 +273,18 @@ pub mod derive_response_ext {
pub struct MaterializeRequestExt {
#[prost(message, optional, tag = "1")]
pub labels: ::core::option::Option<super::ops::ShardLabeling>,
#[prost(message, optional, tag = "2")]
pub open: ::core::option::Option<materialize_request_ext::Open>,
}
/// Nested message and enum types in `MaterializeRequestExt`.
pub mod materialize_request_ext {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Open {
/// RocksDB descriptor which should be opened.
#[prost(message, optional, tag = "1")]
pub rocksdb_descriptor: ::core::option::Option<super::RocksDbDescriptor>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ coroutines = { path = "../coroutines" }
derive-sqlite = { path = "../derive-sqlite" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
json = { path = "../json" }
locate-bin = { path = "../locate-bin" }
models = { path = "../models" }
ops = { path = "../ops" }
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/src/materialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ where
request_rx.inspect_ok(move |request| {
let Ok(MaterializeRequestExt {
labels: Some(ops::ShardLabeling { log_level, .. }),
..
}) = request.get_internal()
else {
return;
Expand Down
Loading

0 comments on commit 6b01669

Please sign in to comment.