Skip to content

Commit

Permalink
doc: refactor doc::Validator to be Clone
Browse files Browse the repository at this point in the history
Factor out into a doc::ValidationContext type which has an Arc reference
held by a Validator.

Validator and doc::combine::Spec now implement Clone.
  • Loading branch information
jgraettinger committed Nov 3, 2023
1 parent 8eeb6cd commit f4ad5e8
Show file tree
Hide file tree
Showing 17 changed files with 87 additions and 52 deletions.
9 changes: 6 additions & 3 deletions crates/derive/src/combine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ impl cgo::Service for API {
} else if infer_schema_json.len() > 0 {
let validator = new_validator(infer_schema_json.as_str())?;
Some(doc::Shape::infer(
&validator.schemas()[0],
validator.schema_index(),
&validator.context().schemas()[0],
validator.context().schema_index(),
))
} else {
None
Expand Down Expand Up @@ -776,7 +776,10 @@ pub mod test {
docs: &[(bool, serde_json::Value)],
) -> Vec<Vec<String>> {
let validator = crate::new_validator(&schema_json).unwrap();
let shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index());
let shape = doc::Shape::infer(
&validator.context().schemas()[0],
validator.context().schema_index(),
);

let projections: Vec<_> = shape
.locations()
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn new_validator(schema: &str) -> Result<doc::Validator, anyhow::Error> {
)
.context("building bundled JSON schema")?;

Ok(doc::Validator::new(schema).context("preparing schema validator")?)
Ok(doc::Validator::from_schema(schema).context("preparing schema validator")?)
}

/// Common test utilities used by sub-modules.
Expand Down
6 changes: 3 additions & 3 deletions crates/doc/src/combine/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ mod test {
json!("def"),
)],
None,
Validator::new(schema).unwrap(),
Validator::from_schema(schema).unwrap(),
)
})
.take(2),
Expand Down Expand Up @@ -708,7 +708,7 @@ mod test {
is_full,
vec![Extractor::new("/k", &SerPolicy::default())],
None,
Validator::new(
Validator::from_schema(
build_schema(
url::Url::parse("http://example").unwrap(),
&reduce::merge_patch_schema(),
Expand Down Expand Up @@ -973,7 +973,7 @@ mod test {
true, // Full reduction.
vec![Extractor::new("/key", &SerPolicy::default())],
None,
Validator::new(schema).unwrap(),
Validator::from_schema(schema).unwrap(),
)
})
.take(1),
Expand Down
1 change: 1 addition & 0 deletions crates/doc/src/combine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum Error {

/// Specification of how Combine operations are to be done
/// over one or more bindings.
#[derive(Clone)]
pub struct Spec {
is_full: Vec<bool>,
keys: Arc<[Box<[Extractor]>]>,
Expand Down
4 changes: 2 additions & 2 deletions crates/doc/src/combine/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ mod test {
json!("def"),
)],
None,
Validator::new(schema).unwrap(),
Validator::from_schema(schema).unwrap(),
)
})
.take(3),
Expand Down Expand Up @@ -726,7 +726,7 @@ mod test {
true, // Full reduction.
vec![Extractor::new("/key", &SerPolicy::default())],
None,
Validator::new(schema).unwrap(),
Validator::from_schema(schema).unwrap(),
)
})
.take(1),
Expand Down
4 changes: 2 additions & 2 deletions crates/doc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ pub use annotation::Annotation;
// over AsNode implementations.
pub mod validation;
pub use validation::{
FailedValidation, RawValidator, Schema, SchemaIndex, SchemaIndexBuilder, Valid, Validation,
Validator,
Context as ValidationContext, FailedValidation, RawValidator, Schema, SchemaIndex,
SchemaIndexBuilder, Valid, Validation, Validator,
};

// Doc implementations may be reduced.
Expand Down
2 changes: 1 addition & 1 deletion crates/doc/src/reduce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ pub mod test {

pub fn run_reduce_cases(schema: Value, cases: Vec<Case>) {
let curi = url::Url::parse("http://example/schema").unwrap();
let mut validator = Validator::new(build_schema(curi, &schema).unwrap()).unwrap();
let mut validator = Validator::from_schema(build_schema(curi, &schema).unwrap()).unwrap();
let alloc = HeapNode::new_allocator();
let mut lhs: Option<HeapNode<'_>> = None;

Expand Down
4 changes: 2 additions & 2 deletions crates/doc/src/shape/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ mod tests {

let curi = url::Url::parse("flow://fixture").unwrap();
let schema = crate::validation::build_schema(curi, &fixture).unwrap();
let validator = crate::Validator::new(schema).unwrap();
let shape = crate::Shape::infer(&validator.schemas()[0], validator.schema_index());
let context = crate::ValidationContext::from_schema(schema).unwrap();
let shape = crate::Shape::infer(&context.schemas()[0], context.schema_index());
let output = serde_json::to_value(to_schema(shape)).unwrap();

assert_eq!(fixture, output);
Expand Down
76 changes: 52 additions & 24 deletions crates/doc/src/validation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{reduce, walker::walk_document, Annotation, AsNode, SerPolicy};
use json::validator::Context;
use json::validator::Context as JsonContext;
use std::pin::Pin;
use std::sync::Arc;

// Specialize json templates for the Flow `Annotation` type.
pub type Schema = json::schema::Schema<Annotation>;
Expand Down Expand Up @@ -41,20 +42,18 @@ pub fn build_bundle(bundle: &str) -> Result<Schema, json::schema::build::Error>
Ok(schema)
}

// Validator wraps a json::Validator and manages ownership of the schemas under validation.
pub struct Validator {
// Careful, order matters! Fields are dropped in declaration order.
inner: json::validator::Validator<'static, Annotation, SpanContext>,
index: Pin<Box<SchemaIndex<'static>>>,
schemas: Pin<Box<[Schema]>>,
// Validation Context which contains compiled and index JSON Schemas for use with Validator.
pub struct Context {
index: SchemaIndex<'static>,
schemas: Pin<Box<[Schema]>>, // Safety: must drop after `index`.
}

impl Validator {
pub fn new(schema: Schema) -> Result<Self, json::schema::index::Error> {
Self::new_from_iter(std::iter::once(schema))
impl Context {
pub fn from_schema(schema: Schema) -> Result<Arc<Self>, json::schema::index::Error> {
Self::from_iter(std::iter::once(schema))
}

pub fn new_from_iter<I>(it: I) -> Result<Self, json::schema::index::Error>
pub fn from_iter<I>(it: I) -> Result<Arc<Self>, json::schema::index::Error>
where
I: IntoIterator<Item = Schema>,
{
Expand All @@ -72,28 +71,51 @@ impl Validator {
}
index.verify_references()?;

// Safety: we manually keep the owned index alongside the associated validator,
// and drop it before the validator.
let index = Box::pin(index.into_index());
let index_static =
unsafe { std::mem::transmute::<&'_ SchemaIndex, &'static SchemaIndex>(&index) };

Ok(Self {
inner: json::validator::Validator::new(index_static),
index,
Ok(Arc::new(Self {
index: index.into_index(),
schemas,
})
}))
}

/// Fetch the SchemaIndex of this Validator.
/// Fetch the SchemaIndex of this validation Context.
pub fn schema_index(&self) -> &SchemaIndex<'static> {
&self.index
}

/// Fetch the Schemas indexed by this Validator.
/// Fetch the Schemas indexed by this validation Context.
pub fn schemas(&self) -> &[Schema] {
&self.schemas
}
}

// Validator wraps a json::Validator and manages ownership of the schemas under validation.
pub struct Validator {
inner: json::validator::Validator<'static, Annotation, SpanContext>,
context: Arc<Context>, // Safety: must drop after `inner`.
}

impl Validator {
pub fn new(context: Arc<Context>) -> Self {
// Safety: we manually keep the owned index alongside the associated validator,
// and drop it before the validator.
let index_static = unsafe {
std::mem::transmute::<&'_ SchemaIndex, &'static SchemaIndex>(context.schema_index())
};

Self {
inner: json::validator::Validator::new(index_static),
context,
}
}

pub fn from_schema(schema: Schema) -> Result<Self, json::schema::index::Error> {
Ok(Self::new(Context::from_schema(schema)?))
}

/// Fetch the Context of this Validator.
pub fn context(&self) -> &Arc<Context> {
&self.context
}

/// Validate validates the given document against the given schema.
/// If schema is None, than the root_curi() of this Validator is validated.
Expand All @@ -104,7 +126,7 @@ impl Validator {
) -> Result<Validation<'static, 'doc, 'v, N>, json::schema::index::Error> {
let effective_schema = match schema {
Some(schema) => schema,
None if self.schemas.len() == 1 => &self.schemas[0].curi,
None if self.context.schemas.len() == 1 => &self.context.schemas[0].curi,
None => {
panic!("root_curi() may only be used with Validators having a single root schema")
}
Expand All @@ -123,6 +145,12 @@ impl Validator {
}
}

impl Clone for Validator {
fn clone(&self) -> Self {
Self::new(self.context.clone())
}
}

/// Validation represents the outcome of a document validation.
pub struct Validation<'schema, 'doc, 'tmp, N: AsNode> {
/// Document which was validated.
Expand Down
2 changes: 1 addition & 1 deletion crates/doc/tests/combiner_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub fn combiner_perf() {
true, // Full reductions.
vec![Extractor::new("/key", &doc::SerPolicy::default())],
None,
Validator::new(schema).unwrap(),
Validator::from_schema(schema).unwrap(),
);
let mut accum = doc::combine::Accumulator::new(spec, tempfile::tempfile().unwrap()).unwrap();

Expand Down
5 changes: 3 additions & 2 deletions crates/doc/tests/merge_patch_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ fn reduce_stack(input: Vec<ArbitraryValue>) -> bool {
let alloc = HeapNode::new_allocator();
let curi = url::Url::parse("http://example").unwrap();
let mut validator =
Validator::new(build_schema(curi, &doc::reduce::merge_patch_schema()).unwrap()).unwrap();
Validator::from_schema(build_schema(curi, &doc::reduce::merge_patch_schema()).unwrap())
.unwrap();

let mut it = input.into_iter().map(|a| a.0);

Expand Down Expand Up @@ -94,7 +95,7 @@ fn reduce_combiner(input: Vec<ArbitraryValue>) -> bool {
is_full,
[], // Empty key (all docs are equal)
None,
Validator::new(schema).unwrap(),
Validator::from_schema(schema).unwrap(),
)
};
let memtable_1 = MemTable::new(Spec::with_bindings([spec(false), spec(true)].into_iter()));
Expand Down
4 changes: 2 additions & 2 deletions crates/doc/tests/reduce_annotations_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn test_validate_then_reduce() {
}),
)
.unwrap();
let mut validator = Validator::new(schema).unwrap();
let mut validator = Validator::from_schema(schema).unwrap();

let cases = vec![
(json!({"lww": "one"}), json!({"lww": "one"})),
Expand Down Expand Up @@ -346,7 +346,7 @@ struct TestMap {
}

fn reduce_tree(schema: Schema, docs: Vec<Value>) -> Value {
let mut validator = Validator::new(schema).unwrap();
let mut validator = Validator::from_schema(schema).unwrap();
let alloc = HeapNode::new_allocator();

let mut docs = docs
Expand Down
2 changes: 1 addition & 1 deletion crates/doc/tests/shape_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn assert_docs_fit_schema(docs: Vec<Value>, shape: Shape) -> bool {

let schema_yaml = serde_yaml::to_string(&to_schema(shape)).unwrap();

let mut validator = Validator::new(schema).unwrap();
let mut validator = Validator::from_schema(schema).unwrap();

for val in docs {
let res = validator.validate(None, &val);
Expand Down
2 changes: 1 addition & 1 deletion crates/doc/tests/spill_merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn run_sequence(seq: Vec<(u8, u8, bool)>) -> Result<(), FuzzError> {
true, // Full reductions.
vec![Extractor::new("/key", &ser_policy)],
None,
Validator::new(schema).unwrap(),
Validator::from_schema(schema).unwrap(),
)
})
.take(2),
Expand Down
2 changes: 1 addition & 1 deletion crates/flowctl/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ async fn do_combine(
&doc::SerPolicy::default(),
)?,
None,
doc::Validator::new(schema).unwrap(),
doc::Validator::from_schema(schema).unwrap(),
),
tempfile::tempfile().context("opening tempfile")?,
)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/runtime/src/derive/combine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ impl Opened {

let write_schema_json = doc::validation::build_bundle(&write_schema_json)
.context("collection write_schema_json is not a JSON schema")?;
let validator =
doc::Validator::new(write_schema_json).context("could not build a schema validator")?;
let validator = doc::Validator::from_schema(write_schema_json)
.context("could not build a schema validator")?;

let combiner = doc::Combiner::new(
doc::combine::Spec::with_one_binding(
Expand Down
10 changes: 6 additions & 4 deletions crates/validation/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ pub struct Schema {
impl Schema {
pub fn new(bundle: &str) -> Result<Self, Error> {
let schema = doc::validation::build_bundle(bundle)?;
let validator = doc::Validator::new(schema)?;
let shape = Shape::infer(&validator.schemas()[0], validator.schema_index());
let context = doc::ValidationContext::from_schema(schema)?;

let schema = &context.schemas()[0];
let shape = Shape::infer(schema, context.schema_index());

Ok(Self {
curi: validator.schemas()[0].curi.clone(),
validator,
curi: context.schemas()[0].curi.clone(),
validator: doc::Validator::new(context),
shape,
})
}
Expand Down

0 comments on commit f4ad5e8

Please sign in to comment.