Skip to content

Commit

Permalink
Handle situations with complex schemas in json converter (#4144) (#4151)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal authored Dec 20, 2023
1 parent 29662c6 commit 1420c68
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,116 @@ public void testPrettyJson() throws Exception {
);
}

@Test
public void testConnectStruct() throws Exception {
try (ExtJsonConverter converter = new ExtJsonConverter()) {

converter.setFormatStrategy(new CompactFormatStrategy());
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.REGISTRY_URL, getRegistryV2ApiUrl());
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
converter.configure(config, false);

org.apache.kafka.connect.data.Schema envelopeSchema = buildEnvelopeSchema();

// Create a Struct object for the Envelope
Struct envelopeStruct = new Struct(envelopeSchema);

// Set values for the fields in the Envelope
envelopeStruct.put("before", buildValueStruct());
envelopeStruct.put("after", buildValueStruct());
envelopeStruct.put("source", buildSourceStruct());
envelopeStruct.put("op", "insert");
envelopeStruct.put("ts_ms", 1638362438000L); // Replace with the actual timestamp
envelopeStruct.put("transaction", buildTransactionStruct());


String subject = TestUtils.generateArtifactId();

byte[] bytes = converter.fromConnectData(subject, envelopeSchema, envelopeStruct);

// some impl details ...
TestUtils.waitForSchema(globalId -> registryClient.getContentByGlobalId(globalId) != null, bytes);


Struct ir = (Struct) converter.toConnectData(subject, bytes).value();
Assertions.assertEquals(envelopeStruct, ir);
}
}

private static org.apache.kafka.connect.data.Schema buildEnvelopeSchema() {
// Define the Envelope schema
return SchemaBuilder.struct()
.name("dbserver1.public.aviation.Envelope")
.version(1)
.field("before", buildValueSchema())
.field("after", buildValueSchema())
.field("source", buildSourceSchema())
.field("op", SchemaBuilder.STRING_SCHEMA)
.field("ts_ms", SchemaBuilder.OPTIONAL_INT64_SCHEMA)
.field("transaction", buildTransactionSchema())
.build();
}

private static org.apache.kafka.connect.data.Schema buildValueSchema() {
// Define the Value schema
return SchemaBuilder.struct()
.name("dbserver1.public.aviation.Value")
.version(1)
.field("id", SchemaBuilder.INT32_SCHEMA)
.build();
}

private static Struct buildValueStruct() {
// Create a Struct object for the Value
Struct valueStruct = new Struct(buildValueSchema());

// Set value for the "id" field
valueStruct.put("id", 123); // Replace with the actual ID value

return valueStruct;
}

private static org.apache.kafka.connect.data.Schema buildSourceSchema() {
// Define the Source schema
return SchemaBuilder.struct()
.name("io.debezium.connector.postgresql.Source")
.version(1)
.field("id", SchemaBuilder.STRING_SCHEMA)
.field("version", SchemaBuilder.STRING_SCHEMA)
.build();
}

private static Struct buildSourceStruct() {
// Create a Struct object for the Source
Struct sourceStruct = new Struct(buildSourceSchema());

// Set values for the fields in the Source
sourceStruct.put("id", "source_id");
sourceStruct.put("version", "1.0");

return sourceStruct;
}

private static org.apache.kafka.connect.data.Schema buildTransactionSchema() {
// Define the Transaction schema
return SchemaBuilder.struct()
.name("event.block")
.version(1)
.field("id", SchemaBuilder.STRING_SCHEMA)
.build();
}

private static Struct buildTransactionStruct() {
// Create a Struct object for the Transaction
Struct transactionStruct = new Struct(buildTransactionSchema());

// Set value for the "id" field in Transaction
transactionStruct.put("id", "transaction_id");

return transactionStruct;
}

@Test
public void testCompactJson() throws Exception {
testJson(
Expand Down Expand Up @@ -270,7 +380,7 @@ private void testJson(RegistryClient restClient, FormatStrategy formatStrategy,
TestUtils.waitForSchemaCustom(globalId -> restClient.getContentByGlobalId(globalId) != null, bytes, fn);

//noinspection rawtypes
Map ir = (Map) converter.toConnectData("extjson", bytes).value();
Struct ir = (Struct) converter.toConnectData("extjson", bytes).value();
Assertions.assertEquals("somebar", ir.get("bar").toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package io.apicurio.registry.utils.converter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.apicurio.registry.resolver.ParsedSchema;
import io.apicurio.registry.resolver.ParsedSchemaImpl;
import io.apicurio.registry.resolver.SchemaLookupResult;
Expand All @@ -33,12 +35,12 @@
import io.apicurio.registry.utils.converter.json.JsonConverterMetadata;
import io.apicurio.registry.utils.converter.json.JsonConverterRecord;
import io.apicurio.registry.utils.converter.json.PrettyFormatStrategy;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.storage.Converter;

import java.io.IOException;
Expand All @@ -53,19 +55,24 @@
*/
public class ExtJsonConverter extends SchemaResolverConfigurer<JsonNode, Object> implements Converter, SchemaParser<JsonNode, Object>, AutoCloseable {
private final JsonConverter jsonConverter;
private final JsonConverter deserializingConverter;
private final ObjectMapper mapper;
private FormatStrategy formatStrategy;
private boolean isKey;

private JsonDeserializer jsonDeserializer;

public ExtJsonConverter() {
this(null);
}

public ExtJsonConverter(RegistryClient client) {
super(client);
this.jsonConverter = new JsonConverter();
this.deserializingConverter = new JsonConverter();
this.mapper = new ObjectMapper();
this.formatStrategy = new PrettyFormatStrategy();
this.jsonDeserializer = new JsonDeserializer();
}

public ExtJsonConverter setFormatStrategy(FormatStrategy formatStrategy) {
Expand All @@ -75,11 +82,16 @@ public ExtJsonConverter setFormatStrategy(FormatStrategy formatStrategy) {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
super.configure((Map<String, Object>)configs, isKey, this);
super.configure((Map<String, Object>) configs, isKey, this);
this.isKey = isKey;
Map<String, Object> wrapper = new HashMap<>(configs);
wrapper.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
jsonConverter.configure(wrapper, isKey);

Map<String, Object> deserializingConfig = new HashMap<>(configs);
wrapper.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true);
deserializingConverter.configure(deserializingConfig, false);
jsonDeserializer.configure(wrapper, false);
}

@Override
Expand Down Expand Up @@ -110,10 +122,24 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {

SchemaLookupResult<JsonNode> schemaLookupResult = getSchemaResolver().resolveSchemaByArtifactReference(ArtifactReference.builder().globalId(globalId).build());

Schema schema = jsonConverter.asConnectSchema(schemaLookupResult.getParsedSchema().getParsedSchema());
JsonNode parsedSchema = schemaLookupResult.getParsedSchema().getParsedSchema();
JsonNode dataDeserialized = jsonDeserializer.deserialize(topic, ip.getPayload());

//Since the json converter is expecting the data to have the schema to fully validate it, we build an envelope object containing the schema from registry and the data deserialized
ObjectNode envelope = JsonNodeFactory.withExactBigDecimals(true).objectNode();
envelope.set("schema", parsedSchema);
envelope.set("payload", dataDeserialized);
dataDeserialized = envelope;

byte[] payload = ip.getPayload();
SchemaAndValue sav = jsonConverter.toConnectData(topic, payload);
SchemaAndValue sav;
try {
sav = deserializingConverter.toConnectData(topic, mapper.writeValueAsBytes(dataDeserialized));

} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

Schema schema = deserializingConverter.asConnectSchema(schemaLookupResult.getParsedSchema().getParsedSchema());

return new SchemaAndValue(schema, sav.value());
}
Expand Down Expand Up @@ -163,5 +189,4 @@ public ParsedSchema<JsonNode> getSchemaFromData(Record<Object> data, boolean der
public void close() throws Exception {
jsonConverter.close();
}

}

0 comments on commit 1420c68

Please sign in to comment.