Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Oct 14, 2023
1 parent 027395b commit f6aa489
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 18 deletions.
25 changes: 12 additions & 13 deletions crates/flowctl/src/raw/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ pub async fn do_capture(
.map(|i| i.clone().into())
.unwrap_or(std::time::Duration::from_secs(1));

let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
_ = ticker.tick().await; // First tick is immediate.

let mut output = std::io::stdout();

// TODO(johnny): This is currently only partly implemented, but is awaiting
Expand All @@ -147,15 +143,18 @@ pub async fn do_capture(

// Upon a checkpoint, wait until the next tick interval has elapsed before acknowledging.
if let Some(_checkpoint) = response.checkpoint {
_ = ticker.tick().await;

request_tx
.send(Ok(capture::Request {
acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }),
..Default::default()
}))
.await
.unwrap();
let mut request_tx = request_tx.clone();
tokio::spawn(async move {
() = tokio::time::sleep(interval).await;

request_tx
.feed(Ok(capture::Request {
acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }),
..Default::default()
}))
.await
.unwrap();
});
}
}

Expand Down
15 changes: 10 additions & 5 deletions crates/runtime/src/capture/combine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ where
R: Stream<Item = anyhow::Result<Request>>,
{
// Channel for receiving checkpoints of each request::Acknowledge from the response stream.
let (ack_tx, mut ack_rx) = mpsc::channel(1);
let (ack_tx, mut ack_rx) = mpsc::channel(0);
// Channel for passing request::Open to the response stream.
let (mut open_tx, open_rx) = mpsc::channel(1);
let (mut open_tx, open_rx) = mpsc::channel(0);

let request_rx = coroutines::try_coroutine(move |mut co| async move {
let mut request_rx = std::pin::pin!(request_rx);
Expand Down Expand Up @@ -152,7 +152,7 @@ where
let response = Response {
checkpoint: Some(response::Checkpoint {
state: Some(flow::ConnectorState {
merge_patch: std::mem::take(&mut checkpoint_state_merge_patch),
merge_patch: std::mem::replace(&mut checkpoint_state_merge_patch, true),
updated_json: std::mem::take(&mut checkpoint_state).to_string(),
}),
}),
Expand All @@ -169,6 +169,8 @@ where
// If inferred schemas were updated, log them out for continuous schema inference.
combiner.log_updated_schemas();

checkpoint_bytes = 0;
checkpoints = 0;
maybe_opened = Some(combiner);
continue;
};
Expand All @@ -189,8 +191,11 @@ where
.as_mut()
.context("connector sent Captured before Opened")?;

if !in_checkpoint && checkpoints == 0 {
started_at = SystemTime::now();
if !in_checkpoint {
if checkpoints == 0 {
started_at = SystemTime::now();
}
in_checkpoint = true;
}

combiner.combine_right(*binding, doc_json)?;
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod derive;
mod image_connector;
mod local_connector;
mod materialize;
mod state;
mod task_service;
mod tokio_context;
mod unary;
Expand Down
130 changes: 130 additions & 0 deletions crates/runtime/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use anyhow::Context;
use proto_flow::flow;
use std::collections::BTreeMap;

pub struct State {
val: Skim,
merge_patch: bool,
}

impl State {
pub fn reduce(&mut self, update: &flow::ConnectorState) -> anyhow::Result<()> {
let flow::ConnectorState {
merge_patch,
updated_json,
} = update;

let update: BTreeMap<String, models::RawValue> = serde_json::from_str(updated_json)
.context("failed to decode connector state as JSON object")?;

if !*merge_patch {
//self.val = update;
self.merge_patch = false;
} else if !self.merge_patch {
// apply merge-patch to fully reduced value
} else {
// apply merge-patch to patch.
}

Ok(())
}
}

fn merge_patch(doc: &mut Skim, patch: Skim, delete_null: bool) {
let Skim::Parsed(patch) = patch else {
*doc = patch; // Patch is not an object. Stop now, taking its value.
return;
};

if let None = doc.as_object() {
// `doc` is not already an object. Convert into an empty one.
*doc = Skim::Parsed(BTreeMap::new());
}
let doc = doc.as_object().unwrap();

/*
for (key, value) in patch {
if value.is_null() && delete_null {
doc.remove(&key);
} else {
merge_patch(
doc.entry(key).or_insert_with(Skim::default),
value,
delete_null,
);
}
}
*/
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
pub enum Skim {
Parsed(BTreeMap<String, Skim>),
Raw(Box<serde_json::value::RawValue>),
}

impl Skim {
/*
fn is_null(&self) -> bool {
match self {
Self::Raw(value) => value.is_null(),
Self::Parsed(_) => false,
}
}
*/

fn as_object(&mut self) -> Option<&mut BTreeMap<String, Skim>> {
match self {
Self::Parsed(parsed) => Some(parsed),
Self::Raw(raw) => match serde_json::from_str(raw.get()) {
Ok(parsed) => {
*self = Self::Parsed(parsed);
self.as_object()
}
Err(_) => None,
},
}
}

/*
fn shallow(input: &str) -> serde_json::Result<Self> {
#[derive(serde::Deserialize)]
#[serde(untagged)]
pub enum Shallow {
Raw(models::RawValue),
Parsed(BTreeMap<String, Shallow>),
}
let skim: BTreeMap<String>::RawValue = serde_json::from_str(input)?;
}
*/
}

/*
impl Default for Skim {
fn default() -> Self {
Self::Raw(models::RawValue::default())
}
}
*/

#[cfg(test)]
mod test {
use super::Skim;
use serde_json::json;

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
pub enum Foo {
//Parsed(BTreeMap<String, Skim>),
Raw(Box<serde_json::value::RawValue>),
}

#[test]
fn foobar() {
let fixture = json!({"hello": {"world": 42, "foo": "bar"}}).to_string();
let out: Foo = serde_json::from_str(&fixture).unwrap();

insta::assert_debug_snapshot!(out, @"");
}
}
10 changes: 10 additions & 0 deletions foobar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import json, sys, base64, typing as t

members : t.List[dict] = json.load(sys.stdin)["kvs"]
members.sort(key = lambda member: member["create_revision"])

for m in members:
broker = base64.b64decode(m["key"]).decode('utf8')[30:]
spec = repr(base64.b64decode(m["value"]))
lease = "%x" % m["lease"]
print(f"{broker} CREATE {m['create_revision']} LEASE {lease} MOD {m['mod_revision']} SPEC {spec}")

0 comments on commit f6aa489

Please sign in to comment.