Skip to content

Commit

Permalink
AVRO-3894: [Rust] Record field aliases are not taken into account whe…
Browse files Browse the repository at this point in the history
…n serializing (apache#2566)

* AVRO-3894: [Rust] Record field aliases are not taken into account when serializing

Also don't treat field's aliases as custom attributes.

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

* AVRO-3894: [Rust] Add a unit test for schema_compatibility

Provided-by: Josua Stingelin

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

---------

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
martin-g authored and Ranbir Kumar committed May 13, 2024
1 parent 282c76d commit 660e66e
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 18 deletions.
37 changes: 21 additions & 16 deletions lang/rust/avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,28 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(

for schema_field in schema_fields.iter() {
let name = &schema_field.name;
let value = match lookup.get(name) {
Some(value) => value,
None => {
return Err(Error::NoEntryInLookupTable(
name.clone(),
format!("{lookup:?}"),
));
let value_opt = lookup.get(name).or_else(|| {
if let Some(aliases) = &schema_field.aliases {
aliases.iter().find_map(|alias| lookup.get(alias))
} else {
None
}
};

encode_internal(
value,
&schema_field.schema,
names,
&record_namespace,
buffer,
)?;
});

if let Some(value) = value_opt {
encode_internal(
value,
&schema_field.schema,
names,
&record_namespace,
buffer,
)?;
} else {
return Err(Error::NoEntryInLookupTable(
name.clone(),
format!("{lookup:?}"),
));
}
}
} else {
error!("invalid schema type for Record: {:?}", schema);
Expand Down
2 changes: 1 addition & 1 deletion lang/rust/avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ pub enum Error {
#[error("Signed decimal bytes length {0} not equal to fixed schema size {1}.")]
EncodeDecimalAsFixedError(usize, usize),

#[error("There is no entry for {0} in the lookup table: {1}.")]
#[error("There is no entry for '{0}' in the lookup table: {1}.")]
NoEntryInLookupTable(String, String),

#[error("Can only encode value type {value_kind:?} as one of {supported_schema:?}")]
Expand Down
2 changes: 1 addition & 1 deletion lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ impl RecordField {
let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
for (key, value) in field {
match key.as_str() {
"type" | "name" | "doc" | "default" | "order" | "position" => continue,
"type" | "name" | "doc" | "default" | "order" | "position" | "aliases" => continue,
_ => custom_attributes.insert(key.clone(), value.clone()),
};
}
Expand Down
45 changes: 45 additions & 0 deletions lang/rust/avro/src/schema_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,4 +1038,49 @@ mod tests {

Ok(())
}

#[test]
fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
{
use serde::{Deserialize, Serialize};

const RAW_SCHEMA_V1: &str = r#"
{
"type": "record",
"name": "Conference",
"namespace": "advdaba",
"fields": [
{"type": "string", "name": "name"},
{"type": "long", "name": "date"}
]
}"#;
const RAW_SCHEMA_V2: &str = r#"
{
"type": "record",
"name": "Conference",
"namespace": "advdaba",
"fields": [
{"type": "string", "name": "name"},
{"type": "long", "name": "date", "aliases" : [ "time" ]}
]
}"#;

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct Conference {
pub name: String,
pub date: i64,
}
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ConferenceV2 {
pub name: String,
pub time: i64,
}

let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?;
let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?;

assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2));

Ok(())
}
}
33 changes: 33 additions & 0 deletions lang/rust/avro/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,4 +1343,37 @@ mod tests {

Ok(())
}

#[test]
fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
const SCHEMA: &str = r#"
{
"type": "record",
"name": "Conference",
"fields": [
{"type": "string", "name": "name"},
{"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
]
}"#;

#[derive(Debug, PartialEq, Eq, Clone, Serialize)]
pub struct Conference {
pub name: String,
pub time: Option<i64>,
}

let conf = Conference {
name: "RustConf".to_string(),
time: Some(1234567890),
};

let schema = Schema::parse_str(SCHEMA)?;
let mut writer = Writer::new(&schema, Vec::new());

let bytes = writer.append_ser(conf)?;

assert_eq!(198, bytes);

Ok(())
}
}

0 comments on commit 660e66e

Please sign in to comment.