Skip to content

Commit

Permalink
DGS-19409 Ensure Avro serde caches per subject (#2387)
Browse files Browse the repository at this point in the history
* DGS-19409 Ensure Avro serde caches per subject

* Add test

* Fix test
  • Loading branch information
rayokota authored Dec 19, 2024
1 parent 00f1fa8 commit aef7faa
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 81 deletions.
48 changes: 12 additions & 36 deletions src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ namespace Confluent.SchemaRegistry.Serdes
{
internal class GenericSerializerImpl : AsyncSerializer<GenericRecord, Avro.Schema>
{
private Dictionary<Avro.Schema, string> knownSchemas = new Dictionary<global::Avro.Schema, string>();
private HashSet<KeyValuePair<string, string>> registeredSchemas = new HashSet<KeyValuePair<string, string>>();
private Dictionary<string, int> schemaIds = new Dictionary<string, int>();
private Dictionary<Avro.Schema, string> knownSchemas =
new Dictionary<global::Avro.Schema, string>();
private Dictionary<KeyValuePair<string, string>, int> registeredSchemas =
new Dictionary<KeyValuePair<string, string>, int>();

public GenericSerializerImpl(
ISchemaRegistryClient schemaRegistryClient,
Expand Down Expand Up @@ -99,12 +100,10 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
// something more sophisticated than the below + not allow
// the misuse to keep happening without warning.
if (knownSchemas.Count > schemaRegistryClient.MaxCachedSchemas ||
registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas ||
schemaIds.Count > schemaRegistryClient.MaxCachedSchemas)
registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas)
{
knownSchemas.Clear();
registeredSchemas.Clear();
schemaIds.Clear();
}

// Determine a schema string corresponding to the schema object.
Expand Down Expand Up @@ -139,41 +138,18 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
{
schemaId = latestSchema.Id;
}
else if (!registeredSchemas.Contains(subjectSchemaPair))
else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId))
{
int newSchemaId;

// first usage: register/get schema to check compatibility
if (autoRegisterSchema)
{
newSchemaId = await schemaRegistryClient
schemaId = autoRegisterSchema
? await schemaRegistryClient
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient
.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);
}
else
{
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);
}

if (!schemaIds.ContainsKey(writerSchemaString))
{
schemaIds.Add(writerSchemaString, newSchemaId);
}
else if (schemaIds[writerSchemaString] != newSchemaId)
{
schemaIds.Clear();
registeredSchemas.Clear();
throw new KafkaException(new Error(isKey ? ErrorCode.Local_KeySerialization : ErrorCode.Local_ValueSerialization, $"Duplicate schema registration encountered: Schema ids {schemaIds[writerSchemaString]} and {newSchemaId} are associated with the same schema."));
}

registeredSchemas.Add(subjectSchemaPair);

schemaId = schemaIds[writerSchemaString];
}
else
{
schemaId = schemaIds[writerSchemaString];
registeredSchemas.Add(subjectSchemaPair, schemaId);
}
}
finally
Expand Down
58 changes: 13 additions & 45 deletions src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,8 @@ internal class SerializerSchemaData
{
private string writerSchemaString;
private global::Avro.Schema writerSchema;

/// <remarks>
/// A given schema is uniquely identified by a schema id, even when
/// registered against multiple subjects.
/// </remarks>
private int? writerSchemaId;

private SpecificWriter<T> avroWriter;

private HashSet<string> subjectsRegistered = new HashSet<string>();

public HashSet<string> SubjectsRegistered
{
get => subjectsRegistered;
set => subjectsRegistered = value;
}

public string WriterSchemaString
{
get => writerSchemaString;
Expand All @@ -64,12 +49,6 @@ public Avro.Schema WriterSchema
set => writerSchema = value;
}

public int? WriterSchemaId
{
get => writerSchemaId;
set => writerSchemaId = value;
}

public SpecificWriter<T> AvroWriter
{
get => avroWriter;
Expand All @@ -79,20 +58,14 @@ public SpecificWriter<T> AvroWriter

private Dictionary<Type, SerializerSchemaData> multiSchemaData =
new Dictionary<Type, SerializerSchemaData>();

private SerializerSchemaData singleSchemaData;
private Dictionary<KeyValuePair<string, string>, int> registeredSchemas =
new Dictionary<KeyValuePair<string, string>, int>();

public SpecificSerializerImpl(
ISchemaRegistryClient schemaRegistryClient,
AvroSerializerConfig config,
RuleRegistry ruleRegistry) : base(schemaRegistryClient, config, ruleRegistry)
{
Type writerType = typeof(T);
if (writerType != typeof(ISpecificRecord))
{
singleSchemaData = ExtractSchemaData(writerType);
}

if (config == null) { return; }

if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
Expand Down Expand Up @@ -177,24 +150,18 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
{
try
{
int schemaId;
string subject;
RegisteredSchema latestSchema = null;
SerializerSchemaData currentSchemaData;
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
if (singleSchemaData == null)
{
var key = data.GetType();
if (!multiSchemaData.TryGetValue(key, out currentSchemaData))
{
currentSchemaData = ExtractSchemaData(key);
multiSchemaData[key] = currentSchemaData;
}
}
else
var key = data != null ? data.GetType() : typeof(Null);
if (!multiSchemaData.TryGetValue(key, out currentSchemaData))
{
currentSchemaData = singleSchemaData;
currentSchemaData = ExtractSchemaData(key);
multiSchemaData[key] = currentSchemaData;
}

string fullname = null;
Expand All @@ -204,25 +171,26 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
}

subject = GetSubjectName(topic, isKey, fullname);
var subjectSchemaPair = new KeyValuePair<string, string>(subject, currentSchemaData.WriterSchemaString);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);

if (latestSchema != null)
{
currentSchemaData.WriterSchemaId = latestSchema.Id;
schemaId = latestSchema.Id;
}
else if (!currentSchemaData.SubjectsRegistered.Contains(subject))
else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId))
{
// first usage: register/get schema to check compatibility
currentSchemaData.WriterSchemaId = autoRegisterSchema
schemaId = autoRegisterSchema
? await schemaRegistryClient
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);

currentSchemaData.SubjectsRegistered.Add(subject);
registeredSchemas.Add(subjectSchemaPair, schemaId);
}
}
finally
Expand All @@ -248,7 +216,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
{
stream.WriteByte(Constants.MagicByte);

writer.Write(IPAddress.HostToNetworkOrder(currentSchemaData.WriterSchemaId.Value));
writer.Write(IPAddress.HostToNetworkOrder(schemaId));
currentSchemaData.AvroWriter.Write(data, new BinaryEncoder(stream));

// TODO: maybe change the ISerializer interface so that this copy isn't necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public BaseSerializeDeserializeTests()
schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(
(string subject, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1
);
schemaRegistryMock.Setup(x => x.GetSchemaIdAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(
(string subject, string schema, bool normalize) =>
{
return subjectStore[subject].First(x =>
x.SchemaString == schema
).Id;
}
);
schemaRegistryMock.Setup(x => x.LookupSchemaAsync(It.IsAny<string>(), It.IsAny<Schema>(), It.IsAny<bool>(), It.IsAny<bool>())).ReturnsAsync(
(string subject, Schema schema, bool ignoreDeleted, bool normalize) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,43 @@ public void ISpecificRecord()
Assert.Equal(user.favorite_number, result.favorite_number);
}

[Fact]
public void ISpecificRecordStrings()
{
var schemaStr = "{\"type\":\"string\"}";
var schema = new RegisteredSchema("topic1-value", 1, 1, schemaStr, SchemaType.Avro, null);
store[schemaStr] = 1;
subjectStore["topic1-value"] = new List<RegisteredSchema> { schema };

schema = new RegisteredSchema("topic2-value", 1, 2, schemaStr, SchemaType.Avro, null);
schema.Metadata = new Metadata(null, new Dictionary<string, string>
{
{ "confluent:version", "1" }
}, null);
store[schemaStr] = 2;
subjectStore["topic2-value"] = new List<RegisteredSchema> { schema };

var config = new AvroSerializerConfig
{
AutoRegisterSchemas = false,
SubjectNameStrategy = SubjectNameStrategy.Topic
};
var serializer = new AvroSerializer<String>(schemaRegistryClient, config);

Headers headers = new Headers();
var bytes = serializer.SerializeAsync("hi", new SerializationContext(MessageComponentType.Value, "topic1", headers)).Result;
Assert.Equal(1, bytes[4]);

bytes = serializer.SerializeAsync("world", new SerializationContext(MessageComponentType.Value, "topic2", headers)).Result;
Assert.Equal(2, bytes[4]);

bytes = serializer.SerializeAsync("hi", new SerializationContext(MessageComponentType.Value, "topic1", headers)).Result;
Assert.Equal(1, bytes[4]);

bytes = serializer.SerializeAsync("world", new SerializationContext(MessageComponentType.Value, "topic2", headers)).Result;
Assert.Equal(2, bytes[4]);
}

[Fact]
public void ISpecificRecordRecordNameStrategy()
{
Expand Down

0 comments on commit aef7faa

Please sign in to comment.