Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnection loop after server PersistenceError #284

Open
adrianotr opened this issue Nov 28, 2024 · 10 comments
Open

Reconnection loop after server PersistenceError #284

adrianotr opened this issue Nov 28, 2024 · 10 comments

Comments

@adrianotr
Copy link
Contributor

Hello there! I think I might have found an issue.

The client connection seems to enter a loop trying to reconnect after we receive a PersistenceError from the server, as seen in the first line of the log.

Logs:

"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Received send error from server: PersistenceError : "Cannot determine whether the message is a duplicate at this time"
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" requestsMb mailbox has stopped normally
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready   clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
Connecting to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)" with maxMessageSize: 5242880
Connected to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)"
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Connected ProtocolVersion: 19 ServerVersion: "Pulsar Server3.0.5" MaxMessageSize: 5242880
"producer(0, nonprod-use1-191-16442, -1)" starting register to topic "persistent://stg-auction/auction-pvt/phase-action-submitted-v1"
"producer(0, nonprod-use1-191-16442, -1)" registered
"producer(0, nonprod-use1-191-16442, -1)" resending 8 pending messages
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready   clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" requestsMb mailbox has stopped normally
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
Connecting to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)" with maxMessageSize: 5242880
Connected to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)"
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Connected ProtocolVersion: 19 ServerVersion: "Pulsar Server3.0.5" MaxMessageSize: 5242880
"producer(0, nonprod-use1-191-16442, -1)" starting register to topic "persistent://stg-auction/auction-pvt/phase-action-submitted-v1"
"producer(0, nonprod-use1-191-16442, -1)" registered
"producer(0, nonprod-use1-191-16442, -1)" resending 12 pending messages
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready   clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" requestsMb mailbox has stopped normally
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
Connecting to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)" with maxMessageSize: 5242880
"producer(0, nonprod-use1-191-16442, -1)" not connected, skipping send
Connected to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)"
"clientCnx(5, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Connected ProtocolVersion: 19 ServerVersion: "Pulsar Server3.0.5" MaxMessageSize: 5242880
"producer(0, nonprod-use1-191-16442, -1)" starting register to topic "persistent://stg-auction/auction-pvt/phase-action-submitted-v1"
"producer(0, nonprod-use1-191-16442, -1)" registered
"producer(0, nonprod-use1-191-16442, -1)" resending 14 pending messages
"clientCnx(5, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready   clientCnx(5, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms

and the reconnection attempts continue.

Eventually the producer times out.

@Lanayx
Copy link
Member

Lanayx commented Nov 28, 2024

@adrianotr It seems like the exception is missing, so it's not clear what goes wrong, can you please check why exception is not logged? Here is the logging line

This is the recommended logging setup (change Warning level to Debug):

let configureLogging() =
Log.Logger <-
LoggerConfiguration()
.MinimumLevel.Warning()
.WriteTo.Console(theme = AnsiConsoleTheme.Code, outputTemplate="[{Timestamp:HH:mm:ss.fff} {Level:u3} {ThreadId}] {Message:lj}{NewLine}{Exception}")
.Enrich.FromLogContext()
.Enrich.WithThreadId()
.CreateLogger()
let serviceCollection = ServiceCollection()
let sp =
serviceCollection
.AddLogging(fun configure -> configure.AddSerilog(dispose = true) |> ignore)
.BuildServiceProvider()
let logger = sp.GetService<ILogger<PulsarClient>>()
PulsarClient.Logger <- logger

@adrianotr
Copy link
Contributor Author

Here are the debug level logs:

"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Sending message of type Ack
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Got message of type SendError
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Received send error from server: PersistenceError : "Cannot determine whether the message is a duplicate at this time"
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Got message of type SendError
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Received send error from server: PersistenceError : "Cannot determine whether the message is a duplicate at this time"
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Got message of type SendReceipt
"producer(2, nonprod-use1-191-21804, -1)" Received ack for message "(SequenceId=763,LedgerId=743051,EntryId=707,HighestSequenceId=766)"
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Got message of type SendReceipt
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Got message of type SendReceipt
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=769
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Sending message of type Send
"producer(2, nonprod-use1-191-21804, -1)" Got ack for msg. expecting: 764 - 764 - got: 767 - 767 - queue-size: 5
"producer(2, nonprod-use1-191-21804, -1)" Got ack for msg. expecting: 764 - 764 - got: 768 - 768 - queue-size: 5
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally while reading
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" ChannelInactive, currently IsActive=True
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"producer(2, nonprod-use1-191-21804, -1)" ConnectionClosed
"producer(2, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready
  clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms 
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" fail requests
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb stopped
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" has stopped
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb stopped
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" readSocket stopped
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" requestsMb mailbox has stopped normally
"clientCnx(105, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=770
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 1
"producer(2, , -1) ConnectionHandler" Starting reconnect to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1"
"clientCnx(0, LogicalAddres Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650)" add request 217 type Lookup
"clientCnx(0, LogicalAddres Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650)" Sending message of type Lookup
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 2
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=771
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=772
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=773
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=774
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Sending message of type Ack
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=775
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=776
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=777
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"clientCnx(0, LogicalAddres Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650)" Got message of type LookupResponse
"clientCnx(0, LogicalAddres Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650)" complete request 217 type LookupResponse
Connecting to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650)" with maxMessageSize: 5242880
Socket connecting to "Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650"
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=778
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 1
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=779
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=780
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
Socket connected to "Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650"
Connection established for "Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650"
"clientCnx(106, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Started read socket
"clientCnx(106, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Sending message of type Connect
Connected to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.pulsar.svc.cluster.local:6650)"
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=781
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=782
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=783
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=784
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send
"producer(2, nonprod-use1-191-21804, -1)" BeginSendMessage
"producer(2, , -1) KeyBasedBatcher" add message to batch, num messages in batch so far is 0
"producer(2, nonprod-use1-191-21804, -1)" CryptoKeyReader not present, encryption not possible
"producer(2, nonprod-use1-191-21804, -1)" sendMessage sequenceId=785
"producer(2, nonprod-use1-191-21804, -1)" not connected, skipping send

@adrianotr
Copy link
Contributor Author

I think the issue might be related to this one.

Seems to be an issue with the KeyBased batching. I disabled batching and the error went away, however the producer throughput was affected pretty hard as expected.

@Lanayx
Copy link
Member

Lanayx commented Nov 29, 2024

@adrianotr You still have to fix logging to see the error. For example in the previous issue there was the same error text, but because exception was logged it was easy to detect the problem. BTW, are you using the latest 3.6.1 ?

@adrianotr
Copy link
Contributor Author

I'm sorry, I forgot to include the exception from the logging tool I use.

Also, I was on 3.6.0, but this time I tried with 3.6.1. Here are the logs:

"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Received send error from server: PersistenceError : "Cannot determine whether the message is a duplicate at this time"
"producer(0, nonprod-use1-191-22527, -1)" Got ack for msg. expecting: 255 - 255 - got: 257 - 257 - queue-size: 3
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Security.SslStream'.
   at System.Net.Security.SslStream.<ThrowIfExceptional>g__ThrowExceptional|126_0(ExceptionDispatchInfo e)
   at System.Net.Security.SslStream.WriteAsync(ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at Pipelines.Sockets.Unofficial.StreamConnection.AsyncStreamPipe.WriteBuffer(Stream target, ReadOnlySequence`1& data, String name) in /_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncStreamPipe.cs:line 187
   at Pipelines.Sockets.Unofficial.StreamConnection.AsyncStreamPipe.CopyFromWritePipeToStream() in /_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncStreamPipe.cs:line 148
   at System.IO.Pipelines.Pipe.FlushAsync(CancellationToken cancellationToken)
   at System.IO.Pipelines.PipeWriter.CopyFromAsync(Stream source, CancellationToken cancellationToken)
   at Pulsar.Client.Common.Commands.serializePayloadCommand@103-1.MoveNext()
   at <StartupCode$Pulsar-Client>[email protected]()
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready
  clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms 
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"clientCnx(1, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" requestsMb mailbox has stopped normally
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
Connecting to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)" with maxMessageSize: 5242880
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
Connected to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)"
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
"clientCnx(2, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Connected ProtocolVersion: 19 ServerVersion: "Pulsar Server3.0.5" MaxMessageSize: 5242880
"producer(0, nonprod-use1-191-22527, -1)" starting register to topic "persistent://stg-auction/auction-pvt/phase-action-submitted-v1"
"producer(0, nonprod-use1-191-22527, -1)" registered
"producer(0, nonprod-use1-191-22527, -1)" resending 11 pending messages
"clientCnx(2, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'The stream with Id ddbb6215-cb6a-4a05-bd81-09ee1c0c41b9 and Tag  is disposed.'.
   at Microsoft.IO.RecyclableMemoryStream.ThrowDisposedException() in /_/src/RecyclableMemoryStream.cs:line 1352
   at Microsoft.IO.RecyclableMemoryStream.Seek(Int64 offset, SeekOrigin loc) in /_/src/RecyclableMemoryStream.cs:line 1164
   at [email protected](PipeWriter output)
   at <StartupCode$Pulsar-Client>[email protected]()
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready
  clientCnx(2, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms 
"clientCnx(2, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" requestsMb mailbox has stopped normally
"clientCnx(2, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"clientCnx(2, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
"producer(0, nonprod-use1-191-22527, -1)" not connected, skipping send
Connecting to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)" with maxMessageSize: 5242880
Connected to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)"
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Connected ProtocolVersion: 19 ServerVersion: "Pulsar Server3.0.5" MaxMessageSize: 5242880
"producer(0, nonprod-use1-191-22527, -1)" starting register to topic "persistent://stg-auction/auction-pvt/phase-action-submitted-v1"
"producer(0, nonprod-use1-191-22527, -1)" registered
"producer(0, nonprod-use1-191-22527, -1)" resending 20 pending messages
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'The stream with Id ddbb6215-cb6a-4a05-bd81-09ee1c0c41b9 and Tag  is disposed.'.
   at Microsoft.IO.RecyclableMemoryStream.ThrowDisposedException() in /_/src/RecyclableMemoryStream.cs:line 1352
   at Microsoft.IO.RecyclableMemoryStream.Seek(Int64 offset, SeekOrigin loc) in /_/src/RecyclableMemoryStream.cs:line 1164
   at [email protected](PipeWriter output)
   at <StartupCode$Pulsar-Client>[email protected]()
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready
  clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms 

"clientCnx(3, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" requestsMb mailbox has stopped normally
Connecting to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)" with maxMessageSize: 5242880
Connected to "Broker (LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650, PhysicalAddress Unspecified/pulsar-proxy.apps-01.npa.us-east-1.kar-services.net:6651)"
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Connected ProtocolVersion: 19 ServerVersion: "Pulsar Server3.0.5" MaxMessageSize: 5242880
"producer(0, nonprod-use1-191-22527, -1)" starting register to topic "persistent://stg-auction/auction-pvt/phase-action-submitted-v1"
"producer(0, nonprod-use1-191-22527, -1)" registered
"producer(0, nonprod-use1-191-22527, -1)" resending 20 pending messages
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" Socket was disconnected exceptionally on writing Send
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'The stream with Id ddbb6215-cb6a-4a05-bd81-09ee1c0c41b9 and Tag  is disposed.'.
   at Microsoft.IO.RecyclableMemoryStream.ThrowDisposedException() in /_/src/RecyclableMemoryStream.cs:line 1352
   at Microsoft.IO.RecyclableMemoryStream.Seek(Int64 offset, SeekOrigin loc) in /_/src/RecyclableMemoryStream.cs:line 1164
   at [email protected](PipeWriter output)
   at <StartupCode$Pulsar-Client>[email protected]()
Connection backgroundTask "LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650" removed
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" operationsMb mailbox has stopped normally
"clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" sendMb mailbox has stopped normally
"producer(0, , -1) ConnectionHandler" Closed connection to "persistent://stg-auction/auction-pvt/phase-action-submitted-v1" Current state "Ready
  clientCnx(4, LogicalAddres Unspecified/pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650)" -- Will try again in 100ms 

@adrianotr
Copy link
Contributor Author

You can use this to reproduce the issue:

PulsarClient.Logger = LoggerFactory.Create(b => b
        .AddConsole()
        .SetMinimumLevel(LogLevel.Debug))
    .CreateLogger("Pulsar.Client");
    
var client = await new PulsarClientBuilder()
    .ServiceUrl("pulsar://localhost:6650")
    .BuildAsync();

var producer = await client.NewProducer()
    .Topic("public/test-namespace-with-deduplication-enabled/test-topic")
    .EnableBatching(true)
    .BatchBuilder(BatchBuilder.KeyBased)
    .CreateAsync();

await Task.WhenAll(Enumerable.Range(0, 100).Select(async i =>
{
    var randomKey = (byte)(i % 2);
    var message = producer.NewMessage([0])
        .WithOrderingKey(new byte[] { randomKey });
    await Task.Delay(Random.Shared.Next(0, 5));
    await producer.SendAsync(message);
}));

You need to create that namespace and enable deduplication as well:

$ pulsar-admin namespaces create public/test-namespace-with-deduplication-enabled
$ pulsar-admin namespaces set-deduplication public/test-namespace-with-deduplication-enabled --enable

@Lanayx
Copy link
Member

Lanayx commented Nov 29, 2024

@adrianotr Can you please double check you are using 3.6.1, since I can't reproduce the error on that version (while I can on previous)

@adrianotr
Copy link
Contributor Author

Yes I'm sure.

If I add this to my program

var assembly = Assembly.Load("Pulsar.Client");
Console.WriteLine(assembly.GetName().Version);

It prints out 3.6.1.0

@Lanayx
Copy link
Member

Lanayx commented Nov 30, 2024

I can't reproduce the error with this example in 3.6.1. You can add a PR with failing integration test to the integration test suite, so it will be possible to fix

@Lanayx
Copy link
Member

Lanayx commented Dec 9, 2024

@adrianotr I've just published 3.6.2. version with all tags specified, so you should no longer see exception with empty tag like
The stream with Id ddbb6215-cb6a-4a05-bd81-09ee1c0c41b9 and Tag is disposed., rather the concrete tag will be shown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants