Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro references #31

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def normalize_schema_str(
except JSONDecodeError as e:
LOG.info("Schema is not valid JSON")
raise e

elif schema_type == SchemaType.PROTOBUF:
if schema:
schema_str = str(schema)
Expand Down Expand Up @@ -180,6 +181,17 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str:
"""To support references in AVRO we recursively merge all referenced schemas with current schema"""
if dependencies:
merged_schema = ""
for dependency in dependencies.values():
merged_schema += avro_schema_merge(dependency.schema.schema_str, dependency.schema.dependencies) + ",\n"
merged_schema += schema_str
return "[\n" + merged_schema + "\n]"
return schema_str


def parse(
schema_type: SchemaType,
schema_str: str,
Expand All @@ -196,7 +208,7 @@ def parse(
if schema_type is SchemaType.AVRO:
try:
parsed_schema = parse_avro_schema_definition(
schema_str,
avro_schema_merge(schema_str, dependencies),
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
Expand Down
34 changes: 23 additions & 11 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

try:
schema_type_parsed = SchemaType(schema_type)
except ValueError:
except ValueError as e:
LOG.warning("Invalid schema type: %s", schema_type)
return
raise e

# This does two jobs:
# - Validates the schema's JSON
Expand All @@ -513,12 +513,24 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None
resolved_dependencies: dict[str, Dependency] | None = None
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
if schema_type_parsed == SchemaType.AVRO:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
raise e
except InvalidReferences as e:
LOG.exception("Invalid AVRO references")
raise e
elif schema_type_parsed == SchemaType.JSONSCHEMA:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
return
raise e
elif schema_type_parsed == SchemaType.PROTOBUF:
try:
if schema_references:
Expand All @@ -532,12 +544,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
normalize=False,
)
schema_str = str(parsed_schema)
except InvalidSchema:
except InvalidSchema as e:
LOG.exception("Schema is not valid ProtoBuf definition")
return
except InvalidReferences:
raise e
except InvalidReferences as e:
LOG.exception("Invalid Protobuf references")
return
raise e

try:
typed_schema = TypedSchema(
Expand All @@ -547,8 +559,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
dependencies=resolved_dependencies,
schema=parsed_schema,
)
except (InvalidSchema, JSONDecodeError):
return
except (InvalidSchema, JSONDecodeError) as e:
raise e

self.database.insert_schema_version(
subject=schema_subject,
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ def _validate_references(
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
if references and schema_type != SchemaType.PROTOBUF:
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO:
self.r(
body={
"error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value,
Expand Down
58 changes: 58 additions & 0 deletions tests/integration/test_schema_avro_references.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
karapace - schema tests

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.client import Client

import json

baseurl = "http://localhost:8081"


async def test_avro_references(registry_async_client: Client) -> None:
schema_country = {
"type": "record",
"name": "Country",
"namespace": "com.netapp",
"fields": [{"name": "name", "type": "string"}, {"name": "code", "type": "string"}],
}

schema_address = {
"type": "record",
"name": "Address",
"namespace": "com.netapp",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "postalCode", "type": "string"},
{"name": "country", "type": "Country"},
],
}

res = await registry_async_client.post("subjects/country/versions", json={"schema": json.dumps(schema_country)})
assert res.status_code == 200
assert "id" in res.json()
country_references = [{"name": "country.proto", "subject": "country", "version": 1}]

res = await registry_async_client.post(
"subjects/address/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references},
)
assert res.status_code == 200
assert "id" in res.json()
address_id = res.json()["id"]

# Check if the schema has now been registered under the subject

res = await registry_async_client.post(
"subjects/address",
json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references},
)
assert res.status_code == 200
assert "subject" in res.json()
assert "id" in res.json()
assert address_id == res.json()["id"]
assert "version" in res.json()
assert "schema" in res.json()
Loading