From 84f4e32bf3509bd0cb460e1a8eb00fa82ed81c70 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 26 Nov 2024 14:59:30 -0800 Subject: [PATCH 01/14] MINOR refactor env var lookup for consistency with other repos (#2368) --- .../HcVaultKmsClient.cs | 5 ----- .../HcVaultKmsDriver.cs | 5 +++++ src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs | 8 -------- src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs | 8 ++++++++ 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs index 128a531b1..b63fc703f 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs @@ -21,11 +21,6 @@ public class HcVaultKmsClient : IKmsClient public HcVaultKmsClient(string kekId, string ns, string tokenId) { - if (tokenId == null) - { - tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN"); - ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE"); - } KekId = kekId; Namespace = ns; TokenId = tokenId; diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs index e220afc7d..b2cfd17ca 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs @@ -23,6 +23,11 @@ public IKmsClient NewKmsClient(IDictionary config, string keyUrl { config.TryGetValue(TokenId, out string tokenId); config.TryGetValue(Namespace, out string ns); + if (tokenId == null) + { + tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN"); + ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE"); + } return new HcVaultKmsClient(keyUrl, ns, tokenId); } } diff --git a/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs index 66afb08ef..5b05dac89 100644 --- a/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs @@ -16,14 +16,6 @@ public class LocalKmsClient : IKmsClient public LocalKmsClient(string secret) { - if (secret == null) - { - secret = Environment.GetEnvironmentVariable("LOCAL_SECRET"); - } - if (secret == null) - { - throw new ArgumentNullException("Cannot load secret"); - } Secret = secret; cryptor = new Cryptor(DekFormat.AES128_GCM); byte[] rawKey = Hkdf.DeriveKey( diff --git a/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs b/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs index 87ad7bd91..2def8c054 100644 --- a/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs +++ b/src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs @@ -22,6 +22,14 @@ public string GetKeyUrlPrefix() public IKmsClient NewKmsClient(IDictionary config, string keyUrl) { config.TryGetValue(Secret, out string secret); + if (secret == null) + { + secret = Environment.GetEnvironmentVariable("LOCAL_SECRET"); + } + if (secret == null) + { + throw new ArgumentNullException("Cannot load secret"); + } return new LocalKmsClient(secret); } } From 7d82888458f4d7ace35f850eb46408d8a944a554 Mon Sep 17 00:00:00 2001 From: Fernando Luiz de Lima Date: Fri, 29 Nov 2024 03:58:12 -0300 Subject: [PATCH 02/14] Add Kafka Core to Third-Party Libraries (#2361) * Add Kafka Core to 3rd party libraries. --- 3RD_PARTY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/3RD_PARTY.md b/3RD_PARTY.md index 6a4a20f57..df00ef05c 100644 --- a/3RD_PARTY.md +++ b/3RD_PARTY.md @@ -10,3 +10,4 @@ To add your project, open a pull request! - [Multi Schema Avro Deserializer](https://github.com/ycherkes/multi-schema-avro-desrializer) - Avro deserializer for reading messages serialized with multiple schemas. - [OpenSleigh.Transport.Kafka](https://github.com/mizrael/OpenSleigh/tree/develop/src/OpenSleigh.Transport.Kafka) - A Kafka Transport for OpenSleigh, a distributed saga management library. - [SlimMessageBus.Host.Kafka](https://github.com/zarusz/SlimMessageBus) - Apache Kafka transport for SlimMessageBus (lightweight message bus for .NET) +- [Kafka Core](https://github.com/ffernandolima/confluent-kafka-core-dotnet) - Kafka Core empowers developers to build robust .NET applications on top of Confluent Kafka, focusing on simplicity, maintainability, and extensibility with intuitive abstractions and builders. \ No newline at end of file From 224a56110ca9d0bb977efcc2c25d838e7d2b9fc7 Mon Sep 17 00:00:00 2001 From: Krzysztof Porebski Date: Mon, 2 Dec 2024 22:59:40 +0100 Subject: [PATCH 03/14] Improve lock utilization on the consumer hot path (#2370) --- src/Confluent.SchemaRegistry/AsyncSerde.cs | 10 ++++++++-- .../CachedSchemaRegistryClient.cs | 11 ++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index 70e1ee047..d63d3e2d9 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -18,6 +18,7 @@ #pragma warning disable CS0618 using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -41,7 +42,7 @@ public abstract class AsyncSerde protected SemaphoreSlim serdeMutex = new SemaphoreSlim(1); - private readonly IDictionary parsedSchemaCache = new Dictionary(); + private readonly IDictionary parsedSchemaCache = new ConcurrentDictionary(); private SemaphoreSlim parsedSchemaMutex = new SemaphoreSlim(1); protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig config, RuleRegistry ruleRegistry = null) @@ -98,10 +99,15 @@ protected string GetSubjectName(string topic, bool isKey, string recordType) protected async Task GetParsedSchema(Schema schema) { + if (parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema)) + { + return parsedSchema; + } + await parsedSchemaMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (!parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema)) + if (!parsedSchemaCache.TryGetValue(schema, out parsedSchema)) { if (parsedSchemaCache.Count > schemaRegistryClient.MaxCachedSchemas) { diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index 0379454c7..fe3ed684b 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -23,6 +23,7 @@ using System.Threading.Tasks; using System.Linq; using System; +using System.Collections.Concurrent; using System.Net; using System.Threading; using System.Security.Cryptography.X509Certificates; @@ -65,7 +66,7 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient private IRestService restService; private int identityMapCapacity; private int latestCacheTtlSecs; - private readonly Dictionary schemaById = new Dictionary(); + private readonly ConcurrentDictionary schemaById = new ConcurrentDictionary(); private readonly Dictionary> idBySchemaBySubject = new Dictionary>(); @@ -657,11 +658,15 @@ public async Task GetSchemaAsync(int id, string format = null) /// public async Task GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null) { + if (this.schemaById.TryGetValue(id, out Schema schema) && checkSchemaMatchesFormat(format, schema.SchemaString)) + { + return schema; + } + await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (!this.schemaById.TryGetValue(id, out Schema schema) || - !checkSchemaMatchesFormat(format, schema.SchemaString)) + if (!this.schemaById.TryGetValue(id, out schema) || !checkSchemaMatchesFormat(format, schema.SchemaString)) { CleanCacheIfFull(); schema = (await restService.GetSchemaBySubjectAndIdAsync(subject, id, format) From 35b8f9938ff786c10912144becab37be509efe7f Mon Sep 17 00:00:00 2001 From: Krzysztof Porebski Date: Mon, 2 Dec 2024 23:00:55 +0100 Subject: [PATCH 04/14] Improve perf of base64 encoding check on a consumer hot path (#2371) Co-authored-by: Robert Yokota --- .../CachedSchemaRegistryClient.cs | 25 +++---------------- src/Confluent.SchemaRegistry/Utils.cs | 20 +++++++++++++++ 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index fe3ed684b..949b0f3ba 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -593,16 +593,8 @@ private bool checkSchemaMatchesFormat(string format, string schemaString) // if a format isn't specified, then assume text is desired. if (format == null) { - try - { - Convert.FromBase64String(schemaString); - } - catch (Exception) - { - return true; // Base64 conversion failed, infer the schemaString format is text. - } - - return false; // Base64 conversion succeeded, so infer the schamaString format is base64. + // If schemaString is not Base64, infer the schemaString format is text. + return !Utils.IsBase64String(schemaString); } else { @@ -610,17 +602,8 @@ private bool checkSchemaMatchesFormat(string format, string schemaString) { throw new ArgumentException($"Invalid schema format was specified: {format}."); } - - try - { - Convert.FromBase64String(schemaString); - } - catch (Exception) - { - return false; - } - - return true; + + return Utils.IsBase64String(schemaString); } } diff --git a/src/Confluent.SchemaRegistry/Utils.cs b/src/Confluent.SchemaRegistry/Utils.cs index 5c6e16cfc..f652f7b5f 100644 --- a/src/Confluent.SchemaRegistry/Utils.cs +++ b/src/Confluent.SchemaRegistry/Utils.cs @@ -20,6 +20,9 @@ using System.Linq; using Confluent.Kafka; +#if NET8_0_OR_GREATER +using System.Buffers.Text; +#endif namespace Confluent.SchemaRegistry { @@ -70,5 +73,22 @@ public static bool ListEquals(IList a, IList b) if (a == null || b == null) return false; return a.SequenceEqual(b); } + + internal static bool IsBase64String(string value) + { +#if NET8_0_OR_GREATER + return Base64.IsValid(value); +#else + try + { + _ = Convert.FromBase64String(value); + return true; + } + catch (FormatException) + { + return false; + } +#endif + } } } From 8656b582f2264cec51e7f9e0454af8c4dda659ca Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 2 Dec 2024 19:49:14 -0800 Subject: [PATCH 05/14] Add missing validate check (#2372) This branch was missed when adding the validate config --- src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index 8a2e47300..9d75e708d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -251,7 +251,7 @@ public override async Task DeserializeAsync(ReadOnlyMemory data, bool i { string serializedString = jsonReader.ReadToEnd(); - if (schema != null) + if (schema != null && validate) { var validationResult = validator.Validate(serializedString, schema); From 1a336456147c57298390ea14bff1cd62c243c56a Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 4 Dec 2024 08:44:14 -0800 Subject: [PATCH 06/14] Fix null reference exception (#2373) --- .../Rest/DataContracts/RuleSet.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs index 822ff74d7..6a9154020 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/RuleSet.cs @@ -45,14 +45,14 @@ public bool HasRules(RuleMode mode) { switch (mode) { case RuleMode.Upgrade: case RuleMode.Downgrade: - return MigrationRules.Any(r => r.Mode == mode || r.Mode == RuleMode.UpDown); + return MigrationRules != null && MigrationRules.Any(r => r.Mode == mode || r.Mode == RuleMode.UpDown); case RuleMode.UpDown: - return MigrationRules.Any(r => r.Mode == mode); + return MigrationRules != null && MigrationRules.Any(r => r.Mode == mode); case RuleMode.Write: case RuleMode.Read: - return DomainRules.Any(r => r.Mode == mode || r.Mode == RuleMode.Write); + return DomainRules != null && DomainRules.Any(r => r.Mode == mode || r.Mode == RuleMode.Write); case RuleMode.WriteRead: - return DomainRules.Any(r => r.Mode == mode); + return DomainRules != null && DomainRules.Any(r => r.Mode == mode); default: return false; } From 16367a7b701e990df72de1e6ab4390aa473df622 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 4 Dec 2024 11:29:37 -0800 Subject: [PATCH 07/14] Ensure different key ids use different client instances (#2374) * Ensure different key ids use different client instances * Minor cleanup * Minor cleanup --- src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs | 4 ++-- .../AzureKmsClient.cs | 2 +- src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs | 2 +- .../HcVaultKmsClient.cs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs index 0b4a0ff94..5a3c14ac8 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs @@ -12,7 +12,7 @@ public class AwsKmsClient : IKmsClient { private AmazonKeyManagementServiceClient kmsClient; private string keyId; - + public string KekId { get; } public AwsKmsClient(string kekId, AWSCredentials credentials) @@ -36,7 +36,7 @@ public AwsKmsClient(string kekId, AWSCredentials credentials) public bool DoesSupport(string uri) { - return uri.StartsWith(AwsKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs index 2ef3f1cd7..1a70524c6 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs @@ -25,7 +25,7 @@ public AzureKmsClient(string kekId, TokenCredential tokenCredential) public bool DoesSupport(string uri) { - return uri.StartsWith(AzureKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs index db5fd4683..c3a995bfe 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs @@ -36,7 +36,7 @@ public GcpKmsClient(string kekId, GoogleCredential credential) public bool DoesSupport(string uri) { - return uri.StartsWith(GcpKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) diff --git a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs index b63fc703f..781af5714 100644 --- a/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs +++ b/src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs @@ -48,7 +48,7 @@ public HcVaultKmsClient(string kekId, string ns, string tokenId) public bool DoesSupport(string uri) { - return uri.StartsWith(HcVaultKmsDriver.Prefix); + return KekId == uri; } public async Task Encrypt(byte[] plaintext) From 30671534eab07df0d2d62755237b79425e9cc588 Mon Sep 17 00:00:00 2001 From: Kunal Gupta Date: Thu, 5 Dec 2024 11:22:32 -0600 Subject: [PATCH 08/14] DGS-19008: Add PR template (#2375) --- .github/pull_request_template.md | 43 ++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 .github/pull_request_template.md diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 000000000..c4d1853ee --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,43 @@ + +What +---- + + +Checklist +------------------ +- [ ] Contains customer facing changes? Including API/behavior changes +- [ ] Did you add sufficient unit test and/or integration test coverage for this PR? + - If not, please explain why it is not required + +References +---------- +JIRA: + + +Test & Review +------------ + + +Open questions / Follow-ups +-------------------------- + + + From 16456fddbb1abd439d1c526bb9caeb092d36d3a2 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 6 Dec 2024 14:27:44 -0800 Subject: [PATCH 09/14] Set skipKnownTypes default to be consistent with other clients (#2376) * Change skipKnownTypes to be consistent with other clients * Enhance test --- .../ProtobufSerializer.cs | 11 +++++++++-- .../ProtobufSerializerConfig.cs | 2 +- src/Confluent.SchemaRegistry/AsyncSerde.cs | 7 ++++++- .../Tests_Protobuf/ProduceConsumeGoogleRef.cs | 3 ++- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs index 898ec21da..53ad08c4d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs @@ -54,7 +54,7 @@ namespace Confluent.SchemaRegistry.Serdes /// public class ProtobufSerializer : AsyncSerializer where T : IMessage, new() { - private bool skipKnownTypes; + private bool skipKnownTypes = true; private bool useDeprecatedFormat; private ReferenceSubjectNameStrategyDelegate referenceSubjectNameStrategy; @@ -190,7 +190,7 @@ private async Task> RegisterOrGetReferences(FileDescriptor for (int i=0; i ParseSchema(Schema schema) .ConfigureAwait(continueOnCapturedContext: false); return ProtobufUtils.Parse(schema.SchemaString, references); } + + protected override bool IgnoreReference(string name) + { + return name.StartsWith("confluent/") || + name.StartsWith("google/protobuf/") || + name.StartsWith("google/type/"); + } } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs index b40a0d9ca..72071509b 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs @@ -195,7 +195,7 @@ public IDictionary UseLatestWithMetadata /// Specifies whether or not the Protobuf serializer should skip known types /// when resolving dependencies. /// - /// default: false + /// default: true /// public bool? SkipKnownTypes { diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index d63d3e2d9..70934d291 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -142,6 +142,11 @@ protected async Task> ResolveReferences(Schema schem .ConfigureAwait(continueOnCapturedContext: false); return result; } + + protected virtual bool IgnoreReference(string name) + { + return false; + } private async Task> ResolveReferences( Schema schema, IDictionary schemas, ISet visited) @@ -149,7 +154,7 @@ private async Task> ResolveReferences( IList references = schema.References; foreach (SchemaReference reference in references) { - if (visited.Contains(reference.Name)) + if (IgnoreReference(reference.Name) || visited.Contains(reference.Name)) { continue; } diff --git a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs index 427e609d6..f29048e2e 100644 --- a/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs +++ b/test/Confluent.SchemaRegistry.Serdes.IntegrationTests/Tests_Protobuf/ProduceConsumeGoogleRef.cs @@ -33,12 +33,13 @@ public static void ProduceConsumeGoogleRefProtobuf(string bootstrapServers, stri { var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers }; var schemaRegistryConfig = new SchemaRegistryConfig { Url = schemaRegistryServers }; + var serializerConfig = new ProtobufSerializerConfig() { SkipKnownTypes = false }; using (var topic = new TemporaryTopic(bootstrapServers, 1)) using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig)) using (var producer = new ProducerBuilder(producerConfig) - .SetValueSerializer(new ProtobufSerializer(schemaRegistry)) + .SetValueSerializer(new ProtobufSerializer(schemaRegistry, serializerConfig)) .Build()) { var u = new WithGoogleRefs.TheRecord(); From 87979d8ffa6cbaa42a35ed323ff7b2b83ed4f514 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 10 Dec 2024 10:12:08 -0800 Subject: [PATCH 10/14] Add ability to override disable flag and actions on a rule (#2377) * Add ability to override disable flag and actions on a rule * Add test --- src/Confluent.SchemaRegistry/AsyncSerde.cs | 47 ++++++++++++++-- src/Confluent.SchemaRegistry/RuleOverride.cs | 40 ++++++++++++++ src/Confluent.SchemaRegistry/RuleRegistry.cs | 54 +++++++++++++++++-- .../SerializeDeserialize.cs | 44 ++++++++++++++- 4 files changed, 174 insertions(+), 11 deletions(-) create mode 100644 src/Confluent.SchemaRegistry/RuleOverride.cs diff --git a/src/Confluent.SchemaRegistry/AsyncSerde.cs b/src/Confluent.SchemaRegistry/AsyncSerde.cs index 70934d291..698e6873b 100644 --- a/src/Confluent.SchemaRegistry/AsyncSerde.cs +++ b/src/Confluent.SchemaRegistry/AsyncSerde.cs @@ -359,7 +359,7 @@ protected async Task ExecuteRules( for (int i = 0; i < rules.Count; i++) { Rule rule = rules[i]; - if (rule.Disabled) + if (IsDisabled(rule)) { continue; } @@ -406,21 +406,21 @@ protected async Task ExecuteRules( default: throw new ArgumentException("Unsupported rule kind " + rule.Kind); } - await RunAction(ctx, ruleMode, rule, message != null ? rule.OnSuccess : rule.OnFailure, + await RunAction(ctx, ruleMode, rule, message != null ? GetOnSuccess(rule) : GetOnFailure(rule), message, null, message != null ? null : ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); } catch (RuleException ex) { - await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, + await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message, ex, ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); } } else { - await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, + await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message, new RuleException("Could not find rule executor of type " + rule.Type), ErrorAction.ActionType, ruleRegistry) .ConfigureAwait(continueOnCapturedContext: false); @@ -429,6 +429,45 @@ await RunAction(ctx, ruleMode, rule, rule.OnFailure, message, return message; } + private string GetOnSuccess(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.OnSuccess != null) + { + return ruleOverride.OnSuccess; + } + } + + return rule.OnSuccess; + } + + private string GetOnFailure(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.OnFailure != null) + { + return ruleOverride.OnFailure; + } + } + + return rule.OnFailure; + } + + private bool IsDisabled(Rule rule) + { + if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride)) + { + if (ruleOverride.Disabled.HasValue) + { + return ruleOverride.Disabled.Value; + } + } + + return rule.Disabled; + } + private static IRuleExecutor GetRuleExecutor(RuleRegistry ruleRegistry, string type) { if (ruleRegistry.TryGetExecutor(type, out IRuleExecutor result)) diff --git a/src/Confluent.SchemaRegistry/RuleOverride.cs b/src/Confluent.SchemaRegistry/RuleOverride.cs new file mode 100644 index 000000000..cc00e4316 --- /dev/null +++ b/src/Confluent.SchemaRegistry/RuleOverride.cs @@ -0,0 +1,40 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.SchemaRegistry +{ + /// + /// A rule override. + /// + public class RuleOverride + { + public string Type { get; set; } + + public string OnSuccess { get; set; } + + public string OnFailure { get; set; } + + public bool? Disabled { get; set; } + + public RuleOverride(string type, string onSuccess, string onFailure, bool? disabled) + { + Type = type; + OnSuccess = onSuccess; + OnFailure = onFailure; + Disabled = disabled; + } + } +} \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/RuleRegistry.cs b/src/Confluent.SchemaRegistry/RuleRegistry.cs index af71421a0..16ae348c7 100644 --- a/src/Confluent.SchemaRegistry/RuleRegistry.cs +++ b/src/Confluent.SchemaRegistry/RuleRegistry.cs @@ -26,19 +26,16 @@ public class RuleRegistry { private readonly SemaphoreSlim ruleExecutorsMutex = new SemaphoreSlim(1); private readonly SemaphoreSlim ruleActionsMutex = new SemaphoreSlim(1); + private readonly SemaphoreSlim ruleOverridesMutex = new SemaphoreSlim(1); private IDictionary ruleExecutors = new Dictionary(); private IDictionary ruleActions = new Dictionary(); + private IDictionary ruleOverrides = new Dictionary(); private static readonly RuleRegistry GLOBAL_INSTANCE = new RuleRegistry(); public static RuleRegistry GlobalInstance => GLOBAL_INSTANCE; - public static List GetRuleActions() - { - return GlobalInstance.GetActions(); - } - public void RegisterExecutor(IRuleExecutor executor) { ruleExecutorsMutex.Wait(); @@ -123,6 +120,48 @@ public List GetActions() } } + public void RegisterOverride(RuleOverride ruleOverride) + { + ruleOverridesMutex.Wait(); + try + { + if (!ruleOverrides.ContainsKey(ruleOverride.Type)) + { + ruleOverrides.Add(ruleOverride.Type, ruleOverride); + } + } + finally + { + ruleOverridesMutex.Release(); + } + } + + public bool TryGetOverride(string name, out RuleOverride ruleOverride) + { + ruleOverridesMutex.Wait(); + try + { + return ruleOverrides.TryGetValue(name, out ruleOverride); + } + finally + { + ruleOverridesMutex.Release(); + } + } + + public List GetOverrides() + { + ruleOverridesMutex.Wait(); + try + { + return new List(ruleOverrides.Values); + } + finally + { + ruleOverridesMutex.Release(); + } + } + public static void RegisterRuleExecutor(IRuleExecutor executor) { GlobalInstance.RegisterExecutor(executor); @@ -132,5 +171,10 @@ public static void RegisterRuleAction(IRuleAction action) { GlobalInstance.RegisterAction(action); } + + public static void RegisterRuleOverride(RuleOverride ruleOverride) + { + GlobalInstance.RegisterOverride(ruleOverride); + } } } diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index 8843f882d..db021d651 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -275,12 +275,12 @@ public void ISpecificRecordCELFieldTransform() schema.RuleSet = new RuleSet(new List(), new List { - new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, "typeName == 'STRING' ; value + '-suffix'", null, null, false) } ); store[schemaStr] = 1; - subjectStore["topic-value"] = new List { schema }; + subjectStore["topic-value"] = new List { schema }; var config = new AvroSerializerConfig { AutoRegisterSchemas = false, @@ -305,6 +305,46 @@ public void ISpecificRecordCELFieldTransform() Assert.Equal(user.favorite_number, result.favorite_number); } + [Fact] + public void ISpecificRecordCELFieldTransformDisable() + { + var schemaStr = User._SCHEMA.ToString(); + var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null); + schema.RuleSet = new RuleSet(new List(), + new List + { + new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null, + "typeName == 'STRING' ; value + '-suffix'", null, null, false) + } + ); + store[schemaStr] = 1; + subjectStore["topic-value"] = new List { schema }; + var config = new AvroSerializerConfig + { + AutoRegisterSchemas = false, + UseLatestVersion = true + }; + RuleRegistry registry = new RuleRegistry(); + registry.RegisterOverride(new RuleOverride("CEL_FIELD", null, null, true)); + var serializer = new AvroSerializer(schemaRegistryClient, config, registry); + var deserializer = new AvroDeserializer(schemaRegistryClient, null); + + var user = new User + { + favorite_color = "blue", + favorite_number = 100, + name = "awesome" + }; + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result; + + Assert.Equal("awesome", result.name); + Assert.Equal("blue", result.favorite_color); + Assert.Equal(user.favorite_number, result.favorite_number); + } + [Fact] public void ISpecificRecordCELFieldCondition() { From 29157859e532f1197d8b083e32005fc021ff7089 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 11 Dec 2024 11:02:48 -0800 Subject: [PATCH 11/14] Add AWS AssumeRole support to AWS KMS (#2379) --- .../AwsKmsDriver.cs | 59 ++++++++++++++++++- ...luent.SchemaRegistry.Encryption.Aws.csproj | 3 +- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsDriver.cs b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsDriver.cs index 8f605e13d..f4c26f61d 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsDriver.cs +++ b/src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsDriver.cs @@ -1,5 +1,7 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using Amazon.Runtime; +using Amazon.Runtime.CredentialManagement; namespace Confluent.SchemaRegistry.Encryption.Aws { @@ -13,7 +15,11 @@ public static void Register() public static readonly string Prefix = "aws-kms://"; public static readonly string AccessKeyId = "access.key.id"; public static readonly string SecretAccessKey = "secret.access.key"; - + public static readonly string Profile = "profile"; + public static readonly string RoleArn = "role.arn"; + public static readonly string RoleSessionName = "role.session.name"; + public static readonly string RoleExternalId = "role.external.id"; + public string GetKeyUrlPrefix() { return Prefix; @@ -21,12 +27,61 @@ public string GetKeyUrlPrefix() public IKmsClient NewKmsClient(IDictionary config, string keyUrl) { + config.TryGetValue(RoleArn, out string roleArn); + if (roleArn == null) + { + roleArn = Environment.GetEnvironmentVariable("AWS_ROLE_ARN"); + } + config.TryGetValue(RoleSessionName, out string roleSessionName); + if (roleSessionName == null) + { + roleSessionName = Environment.GetEnvironmentVariable("AWS_ROLE_SESSION_NAME"); + } + config.TryGetValue(RoleExternalId, out string roleExternalId); + if (roleExternalId == null) + { + roleExternalId = Environment.GetEnvironmentVariable("AWS_ROLE_EXTERNAL_ID"); + } AWSCredentials credentials = null; if (config.TryGetValue(AccessKeyId, out string accessKeyId) && config.TryGetValue(SecretAccessKey, out string secretAccessKey)) { credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey); } + else if (config.TryGetValue(Profile, out string profile)) + { + var credentialProfileStoreChain = new CredentialProfileStoreChain(); + if (credentialProfileStoreChain.TryGetAWSCredentials( + profile, out AWSCredentials creds)) + credentials = creds; + } + if (credentials == null) + { + credentials = FallbackCredentialsFactory.GetCredentials(); + } + if (roleArn != null) + { + if (string.IsNullOrEmpty(roleExternalId)) + { + credentials = new AssumeRoleAWSCredentials( + credentials, + roleArn, + roleSessionName ?? "confluent-encrypt"); + } + else + { + var options = new AssumeRoleAWSCredentialsOptions + { + ExternalId = roleExternalId + }; + + credentials = new AssumeRoleAWSCredentials( + credentials, + roleArn, + roleSessionName ?? "confluent-encrypt", + options); + } + } return new AwsKmsClient(keyUrl, credentials); } } diff --git a/src/Confluent.SchemaRegistry.Encryption.Aws/Confluent.SchemaRegistry.Encryption.Aws.csproj b/src/Confluent.SchemaRegistry.Encryption.Aws/Confluent.SchemaRegistry.Encryption.Aws.csproj index 9cdf64968..c243c0b39 100644 --- a/src/Confluent.SchemaRegistry.Encryption.Aws/Confluent.SchemaRegistry.Encryption.Aws.csproj +++ b/src/Confluent.SchemaRegistry.Encryption.Aws/Confluent.SchemaRegistry.Encryption.Aws.csproj @@ -29,7 +29,8 @@ - + + From a4d58e04e1f8c6ac2b1ca809327a80ac15a15648 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Thu, 12 Dec 2024 15:46:45 -0800 Subject: [PATCH 12/14] Minor change to default for client retries (#2381) --- src/Confluent.SchemaRegistry/Rest/RestService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index 25e24d6d8..0ecee7dda 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -34,7 +34,7 @@ public class RestService : IRestService private static readonly string acceptHeader = string.Join(", ", Versions.PreferredResponseTypes); - public const int DefaultMaxRetries = 2; + public const int DefaultMaxRetries = 3; public const int DefaultRetriesWaitMs = 1000; From 00f1fa817dcb238da828bd52ae7094bc0a850303 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 13 Dec 2024 09:54:55 -0800 Subject: [PATCH 13/14] MINOR fix disable test (#2382) --- src/Confluent.SchemaRegistry/Rest/RestService.cs | 3 +-- .../SerializeDeserialize.cs | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index 0ecee7dda..2a8d71e12 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -238,8 +238,7 @@ private async Task ExecuteOnOneInstanceAsync(Func= 400 && (int)response.StatusCode < 500) + if (!IsRetriable((int)response.StatusCode)) { try { diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index db021d651..a8ca546c2 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -27,6 +27,7 @@ using Avro; using Avro.Generic; using Confluent.SchemaRegistry.Encryption; +using Confluent.SchemaRegistry.Rules; namespace Confluent.SchemaRegistry.Serdes.UnitTests @@ -325,6 +326,7 @@ public void ISpecificRecordCELFieldTransformDisable() UseLatestVersion = true }; RuleRegistry registry = new RuleRegistry(); + registry.RegisterExecutor(new CelFieldExecutor()); registry.RegisterOverride(new RuleOverride("CEL_FIELD", null, null, true)); var serializer = new AvroSerializer(schemaRegistryClient, config, registry); var deserializer = new AvroDeserializer(schemaRegistryClient, null); From aef7faa922fb0e22a6e47c8a2d57df75d434b85d Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Thu, 19 Dec 2024 09:38:02 -0800 Subject: [PATCH 14/14] DGS-19409 Ensure Avro serde caches per subject (#2387) * DGS-19409 Ensure Avro serde caches per subject * Add test * Fix test --- .../GenericSerializerImpl.cs | 48 ++++----------- .../SpecificSerializerImpl.cs | 58 +++++-------------- .../BaseSerializeDeserialize.cs | 8 +++ .../SerializeDeserialize.cs | 37 ++++++++++++ 4 files changed, 70 insertions(+), 81 deletions(-) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs index 13d92afef..3049ff6f5 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs @@ -31,9 +31,10 @@ namespace Confluent.SchemaRegistry.Serdes { internal class GenericSerializerImpl : AsyncSerializer { - private Dictionary knownSchemas = new Dictionary(); - private HashSet> registeredSchemas = new HashSet>(); - private Dictionary schemaIds = new Dictionary(); + private Dictionary knownSchemas = + new Dictionary(); + private Dictionary, int> registeredSchemas = + new Dictionary, int>(); public GenericSerializerImpl( ISchemaRegistryClient schemaRegistryClient, @@ -99,12 +100,10 @@ public async Task Serialize(string topic, Headers headers, GenericRecord // something more sophisticated than the below + not allow // the misuse to keep happening without warning. if (knownSchemas.Count > schemaRegistryClient.MaxCachedSchemas || - registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas || - schemaIds.Count > schemaRegistryClient.MaxCachedSchemas) + registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas) { knownSchemas.Clear(); registeredSchemas.Clear(); - schemaIds.Clear(); } // Determine a schema string corresponding to the schema object. @@ -139,41 +138,18 @@ public async Task Serialize(string topic, Headers headers, GenericRecord { schemaId = latestSchema.Id; } - else if (!registeredSchemas.Contains(subjectSchemaPair)) + else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId)) { - int newSchemaId; - // first usage: register/get schema to check compatibility - if (autoRegisterSchema) - { - newSchemaId = await schemaRegistryClient + schemaId = autoRegisterSchema + ? await schemaRegistryClient .RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas) + .ConfigureAwait(continueOnCapturedContext: false) + : await schemaRegistryClient + .GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas) .ConfigureAwait(continueOnCapturedContext: false); - } - else - { - newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas) - .ConfigureAwait(continueOnCapturedContext: false); - } - - if (!schemaIds.ContainsKey(writerSchemaString)) - { - schemaIds.Add(writerSchemaString, newSchemaId); - } - else if (schemaIds[writerSchemaString] != newSchemaId) - { - schemaIds.Clear(); - registeredSchemas.Clear(); - throw new KafkaException(new Error(isKey ? ErrorCode.Local_KeySerialization : ErrorCode.Local_ValueSerialization, $"Duplicate schema registration encountered: Schema ids {schemaIds[writerSchemaString]} and {newSchemaId} are associated with the same schema.")); - } - - registeredSchemas.Add(subjectSchemaPair); - schemaId = schemaIds[writerSchemaString]; - } - else - { - schemaId = schemaIds[writerSchemaString]; + registeredSchemas.Add(subjectSchemaPair, schemaId); } } finally diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs index 90ebdb46f..f3effb5ba 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs @@ -35,23 +35,8 @@ internal class SerializerSchemaData { private string writerSchemaString; private global::Avro.Schema writerSchema; - - /// - /// A given schema is uniquely identified by a schema id, even when - /// registered against multiple subjects. - /// - private int? writerSchemaId; - private SpecificWriter avroWriter; - private HashSet subjectsRegistered = new HashSet(); - - public HashSet SubjectsRegistered - { - get => subjectsRegistered; - set => subjectsRegistered = value; - } - public string WriterSchemaString { get => writerSchemaString; @@ -64,12 +49,6 @@ public Avro.Schema WriterSchema set => writerSchema = value; } - public int? WriterSchemaId - { - get => writerSchemaId; - set => writerSchemaId = value; - } - public SpecificWriter AvroWriter { get => avroWriter; @@ -79,20 +58,14 @@ public SpecificWriter AvroWriter private Dictionary multiSchemaData = new Dictionary(); - - private SerializerSchemaData singleSchemaData; + private Dictionary, int> registeredSchemas = + new Dictionary, int>(); public SpecificSerializerImpl( ISchemaRegistryClient schemaRegistryClient, AvroSerializerConfig config, RuleRegistry ruleRegistry) : base(schemaRegistryClient, config, ruleRegistry) { - Type writerType = typeof(T); - if (writerType != typeof(ISpecificRecord)) - { - singleSchemaData = ExtractSchemaData(writerType); - } - if (config == null) { return; } if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; } @@ -177,24 +150,18 @@ public async Task Serialize(string topic, Headers headers, T data, bool { try { + int schemaId; string subject; RegisteredSchema latestSchema = null; SerializerSchemaData currentSchemaData; await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (singleSchemaData == null) - { - var key = data.GetType(); - if (!multiSchemaData.TryGetValue(key, out currentSchemaData)) - { - currentSchemaData = ExtractSchemaData(key); - multiSchemaData[key] = currentSchemaData; - } - } - else + var key = data != null ? data.GetType() : typeof(Null); + if (!multiSchemaData.TryGetValue(key, out currentSchemaData)) { - currentSchemaData = singleSchemaData; + currentSchemaData = ExtractSchemaData(key); + multiSchemaData[key] = currentSchemaData; } string fullname = null; @@ -204,17 +171,18 @@ public async Task Serialize(string topic, Headers headers, T data, bool } subject = GetSubjectName(topic, isKey, fullname); + var subjectSchemaPair = new KeyValuePair(subject, currentSchemaData.WriterSchemaString); latestSchema = await GetReaderSchema(subject) .ConfigureAwait(continueOnCapturedContext: false); if (latestSchema != null) { - currentSchemaData.WriterSchemaId = latestSchema.Id; + schemaId = latestSchema.Id; } - else if (!currentSchemaData.SubjectsRegistered.Contains(subject)) + else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId)) { // first usage: register/get schema to check compatibility - currentSchemaData.WriterSchemaId = autoRegisterSchema + schemaId = autoRegisterSchema ? await schemaRegistryClient .RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas) .ConfigureAwait(continueOnCapturedContext: false) @@ -222,7 +190,7 @@ public async Task Serialize(string topic, Headers headers, T data, bool .GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas) .ConfigureAwait(continueOnCapturedContext: false); - currentSchemaData.SubjectsRegistered.Add(subject); + registeredSchemas.Add(subjectSchemaPair, schemaId); } } finally @@ -248,7 +216,7 @@ public async Task Serialize(string topic, Headers headers, T data, bool { stream.WriteByte(Constants.MagicByte); - writer.Write(IPAddress.HostToNetworkOrder(currentSchemaData.WriterSchemaId.Value)); + writer.Write(IPAddress.HostToNetworkOrder(schemaId)); currentSchemaData.AvroWriter.Write(data, new BinaryEncoder(stream)); // TODO: maybe change the ISerializer interface so that this copy isn't necessary. diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs index 0dbd34b67..e7db25386 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/BaseSerializeDeserialize.cs @@ -47,6 +47,14 @@ public BaseSerializeDeserializeTests() schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( (string subject, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1 ); + schemaRegistryMock.Setup(x => x.GetSchemaIdAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( + (string subject, string schema, bool normalize) => + { + return subjectStore[subject].First(x => + x.SchemaString == schema + ).Id; + } + ); schemaRegistryMock.Setup(x => x.LookupSchemaAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync( (string subject, Schema schema, bool ignoreDeleted, bool normalize) => { diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index a8ca546c2..85a859ecc 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -154,6 +154,43 @@ public void ISpecificRecord() Assert.Equal(user.favorite_number, result.favorite_number); } + [Fact] + public void ISpecificRecordStrings() + { + var schemaStr = "{\"type\":\"string\"}"; + var schema = new RegisteredSchema("topic1-value", 1, 1, schemaStr, SchemaType.Avro, null); + store[schemaStr] = 1; + subjectStore["topic1-value"] = new List { schema }; + + schema = new RegisteredSchema("topic2-value", 1, 2, schemaStr, SchemaType.Avro, null); + schema.Metadata = new Metadata(null, new Dictionary + { + { "confluent:version", "1" } + }, null); + store[schemaStr] = 2; + subjectStore["topic2-value"] = new List { schema }; + + var config = new AvroSerializerConfig + { + AutoRegisterSchemas = false, + SubjectNameStrategy = SubjectNameStrategy.Topic + }; + var serializer = new AvroSerializer(schemaRegistryClient, config); + + Headers headers = new Headers(); + var bytes = serializer.SerializeAsync("hi", new SerializationContext(MessageComponentType.Value, "topic1", headers)).Result; + Assert.Equal(1, bytes[4]); + + bytes = serializer.SerializeAsync("world", new SerializationContext(MessageComponentType.Value, "topic2", headers)).Result; + Assert.Equal(2, bytes[4]); + + bytes = serializer.SerializeAsync("hi", new SerializationContext(MessageComponentType.Value, "topic1", headers)).Result; + Assert.Equal(1, bytes[4]); + + bytes = serializer.SerializeAsync("world", new SerializationContext(MessageComponentType.Value, "topic2", headers)).Result; + Assert.Equal(2, bytes[4]); + } + [Fact] public void ISpecificRecordRecordNameStrategy() {