Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Schema::independent_canonical_form #66

Merged
merged 10 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 87 additions & 22 deletions avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use serde::{
};
use serde_json::{Map, Value};
use std::{
borrow::{Borrow, Cow},
borrow::Borrow,
collections::{BTreeMap, HashMap, HashSet},
fmt,
fmt::Debug,
Expand Down Expand Up @@ -1041,7 +1041,19 @@ impl Schema {
pub fn canonical_form(&self) -> String {
let json = serde_json::to_value(self)
.unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
parsing_canonical_form(&json)
let mut defined_names = HashSet::new();
parsing_canonical_form(&json, &mut defined_names)
}

/// Returns the [Parsing Canonical Form] of `self` that is self contained (not dependent on
/// any definitions in `schemata`)
///
/// [Parsing Canonical Form]:
/// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas
pub fn independent_canonical_form(&self, schemata: &Vec<Schema>) -> Result<String, Error> {
let mut this = self.clone();
this.denormalize(schemata)?;
Ok(this.canonical_form())
}

/// Generate [fingerprint] of Schema's [Parsing Canonical Form].
Expand Down Expand Up @@ -1246,6 +1258,41 @@ impl Schema {
attributes,
})
}

fn denormalize(&mut self, schemata: &Vec<Schema>) -> AvroResult<()> {
match self {
Schema::Ref { name } => {
let replacement_schema = schemata
.iter()
.find(|s| s.name().map(|n| *n == *name).unwrap_or(false));
if let Some(schema) = replacement_schema {
let mut denorm = schema.clone();
denorm.denormalize(schemata)?;
*self = denorm;
} else {
return Err(Error::SchemaResolutionError(name.clone()));
}
}
Schema::Record(record_schema) => {
for field in &mut record_schema.fields {
field.schema.denormalize(schemata)?;
}
}
Schema::Array(array_schema) => {
array_schema.items.denormalize(schemata)?;
}
Schema::Map(map_schema) => {
map_schema.types.denormalize(schemata)?;
}
Schema::Union(union_schema) => {
for schema in &mut union_schema.schemas {
schema.denormalize(schemata)?;
}
}
_ => (),
}
Ok(())
}
}

impl Parser {
Expand Down Expand Up @@ -2245,19 +2292,39 @@ impl Serialize for RecordField {

/// Parses a **valid** avro schema into the Parsing Canonical Form.
/// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas
fn parsing_canonical_form(schema: &Value) -> String {
fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet<String>) -> String {
match schema {
Value::Object(map) => pcf_map(map),
Value::Object(map) => pcf_map(map, defined_names),
Value::String(s) => pcf_string(s),
Value::Array(v) => pcf_array(v),
Value::Array(v) => pcf_array(v, defined_names),
json => panic!("got invalid JSON value for canonical form of schema: {json}"),
}
}

fn pcf_map(schema: &Map<String, Value>) -> String {
fn pcf_map(schema: &Map<String, Value>, defined_names: &mut HashSet<String>) -> String {
// Look for the namespace variant up front.
let ns = schema.get("namespace").and_then(|v| v.as_str());
let typ = schema.get("type").and_then(|v| v.as_str());
let raw_name = schema.get("name").and_then(|v| v.as_str());
let name = if is_named_type(typ) {
Some(format!(
"{}{}",
ns.map_or("".to_string(), |n| { format!("{n}.") }),
raw_name.unwrap_or_default()
))
} else {
None
};

//if this is already a defined type, early return
if let Some(ref n) = name {
if defined_names.contains(n) {
return pcf_string(n);
} else {
defined_names.insert(n.clone());
}
}

let mut fields = Vec::new();
for (k, v) in schema {
// Reduce primitive types to their simple form. ([PRIMITIVE] rule)
Expand All @@ -2280,17 +2347,10 @@ fn pcf_map(schema: &Map<String, Value>) -> String {

// Fully qualify the name, if it isn't already ([FULLNAMES] rule).
if k == "name" {
// Invariant: Only valid schemas. Must be a string.
let name = v.as_str().unwrap();
let n = match ns {
Some(namespace) if is_named_type(typ) && !name.contains('.') => {
Cow::Owned(format!("{namespace}.{name}"))
}
_ => Cow::Borrowed(name),
};

fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&n))));
continue;
if let Some(ref n) = name {
fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n))));
continue;
}
}

// Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
Expand All @@ -2306,7 +2366,11 @@ fn pcf_map(schema: &Map<String, Value>) -> String {
// For anything else, recursively process the result.
fields.push((
k,
format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
format!(
"{}:{}",
pcf_string(k),
parsing_canonical_form(v, defined_names)
),
));
}

Expand All @@ -2327,10 +2391,10 @@ fn is_named_type(typ: Option<&str>) -> bool {
)
}

fn pcf_array(arr: &[Value]) -> String {
fn pcf_array(arr: &[Value], defined_names: &mut HashSet<String>) -> String {
let inter = arr
.iter()
.map(parsing_canonical_form)
.map(|a| parsing_canonical_form(a, defined_names))
.collect::<Vec<String>>()
.join(",");
format!("[{inter}]")
Expand Down Expand Up @@ -2376,6 +2440,7 @@ pub trait AvroSchema {
#[cfg(feature = "derive")]
pub mod derive {
use super::*;
use std::borrow::Cow;

/// Trait for types that serve as fully defined components inside an Avro data model. Derive
/// implementation available through `derive` feature. This is what is implemented by
Expand Down Expand Up @@ -3424,7 +3489,7 @@ mod tests {
assert_eq!(schema, expected);

let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}}]}"#;
let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#;
martin-g marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(canonical_form, &expected);

Ok(())
Expand Down Expand Up @@ -3508,7 +3573,7 @@ mod tests {
assert_eq!(schema, expected);

let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":{"name":"fixed","type":"fixed","size":456}}]}"#;
let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#;
martin-g marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(canonical_form, &expected);

Ok(())
Expand Down
Loading
Loading