Skip to content

Commit

Permalink
all working
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Nov 15, 2023
1 parent 8d4a2fc commit 330db0e
Show file tree
Hide file tree
Showing 27 changed files with 1,983 additions and 853 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 8 additions & 20 deletions crates/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,19 @@ pub async fn load(source: &url::Url, file_root: &Path) -> tables::Sources {
/// Perform validations and produce built specifications for `sources`.
/// * If `generate_ops_collections` is set, then ops collections are added into `sources`.
/// * If any of `noop_*` is true, then validations are skipped for connectors of that type.
pub async fn validate<L>(
pub async fn validate(
allow_local: bool,
build_id: &str,
connector_network: &str,
control_plane: &dyn validation::ControlPlane,
generate_ops_collections: bool,
log_handler: L,
log_handler: impl runtime::LogHandler,
noop_captures: bool,
noop_derivations: bool,
noop_materializations: bool,
project_root: &url::Url,
mut sources: tables::Sources,
) -> (tables::Sources, tables::Validations)
where
L: Fn(&ops::Log) + Send + Sync + Clone + 'static,
{
) -> (tables::Sources, tables::Validations) {
// TODO(johnny): We *really* need to kill this, and have ops collections
// be injected exclusively from the control-plane.
if generate_ops_collections {
Expand Down Expand Up @@ -168,19 +165,16 @@ where
/// This function is used to produce builds by managed control-plane
/// components but not the `flowctl` CLI, which requires finer-grain
/// control over build behavior.
pub async fn managed_build<L>(
pub async fn managed_build(
allow_local: bool,
build_id: String,
connector_network: String,
control_plane: Box<dyn validation::ControlPlane>,
file_root: PathBuf,
log_handler: L,
log_handler: impl runtime::LogHandler,
project_root: url::Url,
source: url::Url,
) -> Result<(tables::Sources, tables::Validations), tables::Errors>
where
L: Fn(&ops::Log) + Send + Sync + Clone + 'static,
{
) -> Result<(tables::Sources, tables::Validations), tables::Errors> {
let (sources, validations) = validate(
allow_local,
&build_id,
Expand Down Expand Up @@ -363,20 +357,14 @@ impl sources::Fetcher for Fetcher {

/// Connectors is a general-purpose implementation of validation::Connectors
/// that dispatches to its contained runtime::Runtime.
pub struct Connectors<L>
where
L: Fn(&ops::Log) + Send + Sync + Clone + 'static,
{
pub struct Connectors<L: runtime::LogHandler> {
noop_captures: bool,
noop_derivations: bool,
noop_materializations: bool,
runtime: runtime::Runtime<L>,
}

impl<L> validation::Connectors for Connectors<L>
where
L: Fn(&ops::Log) + Send + Sync + Clone + 'static,
{
impl<L: runtime::LogHandler> validation::Connectors for Connectors<L> {
fn validate_capture<'a>(
&'a self,
request: capture::Request,
Expand Down
1 change: 1 addition & 0 deletions crates/flowctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ assemble = { path = "../assemble" }
async-process = { path = "../async-process" }
build = { path = "../build" }
connector-init = { path = "../connector-init" }
coroutines = { path = "../coroutines" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
journal-client = { path = "../journal-client" }
Expand Down
244 changes: 244 additions & 0 deletions crates/flowctl/src/preview/journal_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
use anyhow::Context;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use proto_flow::flow;
use proto_gazette::{broker, consumer};

#[derive(Clone)]
pub struct Reader {
pub control_plane: crate::controlplane::Client,
pub delay: std::time::Duration,
}

impl runtime::harness::Reader for Reader {
type Stream = futures::stream::BoxStream<'static, anyhow::Result<runtime::harness::Read>>;

fn start_for_derivation(
self,
derivation: &flow::CollectionSpec,
resume: proto_gazette::consumer::Checkpoint,
) -> Self::Stream {
let transforms = &derivation.derivation.as_ref().unwrap().transforms;

let sources = transforms
.iter()
.map(|t| {
let collection = t.collection.as_ref().unwrap();
Source {
collection: collection.name.clone(),
partition_selector: t.partition_selector.clone().unwrap(),
not_before: t.not_before.clone(),
}
})
.collect();

self.start(sources, resume).boxed()
}

fn start_for_materialization(
self,
materialization: &flow::MaterializationSpec,
resume: proto_gazette::consumer::Checkpoint,
) -> Self::Stream {
let sources = materialization
.bindings
.iter()
.map(|b| {
let collection = b.collection.as_ref().unwrap();
Source {
collection: collection.name.clone(),
partition_selector: b.partition_selector.clone().unwrap(),
not_before: b.not_before.clone(),
}
})
.collect();

self.start(sources, resume).boxed()
}
}

struct Source {
collection: String,
partition_selector: broker::LabelSelector,
not_before: Option<pbjson_types::Timestamp>,
}

impl Reader {
fn start(
self,
sources: Vec<Source>,
mut resume: proto_gazette::consumer::Checkpoint,
) -> impl futures::Stream<Item = anyhow::Result<runtime::harness::Read>> {
coroutines::try_coroutine(move |mut co| async move {
// We must be able to access all of its sourced collections.
let access_prefixes = sources
.iter()
.map(|source| source.collection.clone())
.collect();

let data_plane_client =
crate::dataplane::journal_client_for(self.control_plane, access_prefixes).await?;

// Concurrently list the journals of every Source.
let journals: Vec<(&Source, Vec<broker::JournalSpec>)> =
futures::future::try_join_all(sources.iter().map(|source| {
Self::list_journals(source, data_plane_client.clone())
.map_ok(move |l| (source, l))
}))
.await?;

// Flatten into (binding, source, journal).
let journals: Vec<(u32, &Source, broker::JournalSpec)> = journals
.into_iter()
.enumerate()
.flat_map(|(binding, (source, journals))| {
journals
.into_iter()
.map(move |journal| (binding as u32, source, journal))
})
.collect();

// Map into a stream that yields lines from across all journals, as they're ready.
let mut journals =
futures::stream::select_all(journals.iter().map(|(binding, source, journal)| {
Self::read_journal_lines(
*binding,
data_plane_client.clone(),
journal,
&resume,
source,
)
.boxed()
}));

// Reset-able timer for delivery of delayed checkpoints.
let deadline = tokio::time::sleep(std::time::Duration::MAX);
tokio::pin!(deadline);

let mut in_txn = false; // Have we emitted a document that awaits a checkpoint?

loop {
let step = tokio::select! {
Some(read) = journals.next() => Ok(read?),
() = deadline.as_mut(), if in_txn => Err(())
};

match step {
Ok((binding, doc_json, journal, read_through)) => {
let resume = match resume.sources.get_mut(&journal.name) {
Some(entry) => entry,
None => resume.sources.entry(journal.name.clone()).or_default(),
};
resume.read_through = read_through;

() = co
.yield_(runtime::harness::Read::Document {
binding: binding as u32,
doc: doc_json,
})
.await;

// If this is the first Read of this transaction,
// schedule when it will Checkpoint.
if !in_txn {
in_txn = true;
deadline
.as_mut()
.reset(tokio::time::Instant::now() + self.delay);
}
}
Err(()) => {
() = co
.yield_(runtime::harness::Read::Checkpoint(resume.clone()))
.await;
in_txn = false;
}
}
}
})
.boxed()
}

async fn list_journals(
source: &Source,
mut client: journal_client::Client,
) -> anyhow::Result<Vec<broker::JournalSpec>> {
let listing = journal_client::list::list_journals(&mut client, &source.partition_selector)
.await
.with_context(|| {
format!(
"failed to list journals for collection {}",
&source.collection
)
})?;

if listing.is_empty() {
anyhow::bail!(
"no journals were returned by the selector: {}",
serde_json::to_string_pretty(&source.partition_selector).unwrap()
);
}

Ok(listing)
}

fn read_journal_lines<'s>(
binding: u32,
client: journal_client::Client,
journal: &'s broker::JournalSpec,
resume: &consumer::Checkpoint,
source: &Source,
) -> impl futures::Stream<Item = anyhow::Result<(u32, String, &'s broker::JournalSpec, i64)>>
{
use futures::AsyncBufReadExt;
use journal_client::read::uncommitted::{
ExponentialBackoff, JournalRead, ReadStart, ReadUntil, Reader,
};

let offset = resume
.sources
.get(&journal.name)
.map(|s| s.read_through)
.unwrap_or_default();

let read = JournalRead::new(journal.name.clone())
.starting_at(ReadStart::Offset(offset as u64))
.begin_mod_time(
source
.not_before
.as_ref()
.map(|b| b.seconds)
.unwrap_or_default(),
)
.read_until(ReadUntil::Forever);

coroutines::try_coroutine(move |mut co| async move {
let backoff = ExponentialBackoff::new(2);
let reader = Reader::start_read(client, read, backoff);
let mut reader = futures::io::BufReader::new(reader);

// Fill the buffer and establish the first read byte offset.
let buf_len = reader.fill_buf().await?.len();
let mut offset = reader.get_ref().current_offset() - buf_len as i64;

let mut lines = reader.lines();

loop {
let Some(doc_json) = lines.try_next().await? else {
break;
};
// Attempt to keep the offset up to date.
// TODO(johnny): This is subtly broken because it doesn't handle offset jumps.
// Fixing requires a deeper refactor of journal_client::Reader.
offset += doc_json.len() as i64 + 1;

// TODO(johnny): This is pretty janky.
if doc_json.starts_with("{\"_meta\":{\"ack\":true,") {
continue;
}

() = co.yield_((binding, doc_json, journal, offset)).await;
}
Ok(())
})
}
}
Loading

0 comments on commit 330db0e

Please sign in to comment.