diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs index 83daf504abf..aca5156e645 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs @@ -1,7 +1,7 @@ -using System.Data; using System.Threading.Channels; using HotChocolate.Subscriptions.Diagnostics; using Npgsql; +using NpgsqlTypes; using static HotChocolate.Subscriptions.Postgres.PostgresResources; namespace HotChocolate.Subscriptions.Postgres; @@ -136,9 +136,20 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken _diagnosticEvents.ProviderInfo(msg); // if we cannot send the message we put it back into the channel + // however as the channel is bounded, we might not able to requeue the message and will be forced to drop them if they can't be written + var failedCount = 0; + foreach (var message in messages) { - await _channel.Writer.WriteAsync(message, CancellationToken.None); + if (!_channel.Writer.TryWrite(message)) + { + failedCount++; + } + } + + if (failedCount > 0) + { + _diagnosticEvents.ProviderInfo(string.Format(ChannelWriter_FailedToRequeueMessage, failedCount)); } } } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs index a614a375658..22fb294d943 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs @@ -104,5 +104,11 @@ internal static string PostgresMessageEnvelope_PayloadTooLarge { return ResourceManager.GetString("PostgresMessageEnvelope_PayloadTooLarge", resourceCulture); } } + + internal static string ChannelWriter_FailedToRequeueMessage { + get { + return ResourceManager.GetString("ChannelWriter_FailedToRequeueMessage", resourceCulture); + } + } } } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx index cb4593fb29c..adc265e1c6d 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx @@ -55,4 +55,7 @@ Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes + + The postgres writer was unable to requeue messages. {0} messages have been lost +