Skip to content

fix: Potential fix for failing to reconnect to the broker#341

Closed
chriscameron-vertexinc wants to merge 1 commit intofsprojects:developfrom
chriscameron-vertexinc:fix-connection-pool
Closed

fix: Potential fix for failing to reconnect to the broker#341
chriscameron-vertexinc wants to merge 1 commit intofsprojects:developfrom
chriscameron-vertexinc:fix-connection-pool

Conversation

@chriscameron-vertexinc
Copy link
Contributor

My project has been struggling with intermittent issues where, after losing connection to the broker, the Pulsar client is unable to recover. We have to restart the application to get a good connection to the broker again.

Our best guess is that the DNS for our broker stays the same, but the physical address of the broker changes between deployments. Pulsar.Client may be recycling stale connections for the "old" broker. Unfortunately I've been unable to reproduce the problem in isolation, and our live systems will run for weeks without failure until suddenly we see these problems again.

Our logs are filled with variations of:

Pulsar.Client.Api.TimeoutException: clientCnx(60, LogicalAddres Unspecified/pulsar-prod-us-east-2-broker-0.pulsar-prod-us-east-2-broker-headless:6650) request 19986 type Producer timed out after 30491.6917ms
   at <StartupCode$Pulsar-Client>.$ProducerImpl.-ctor@506-48.MoveNext()

And:

System.IO.IOException: Unable to write data to the transport connection: Broken pipe.
 ---> System.Net.Sockets.SocketException (32): Broken pipe
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.CreateException(SocketError error, Boolean forAsyncThrow)
   at System.Net.Sockets.NetworkStream.WriteAsync(ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at System.Net.Security.SslStream.WriteSingleChunk[TIOAdapter](ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at System.Net.Security.SslStream.WriteAsyncInternal[TIOAdapter](ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at Pipelines.Sockets.Unofficial.StreamConnection.AsyncStreamPipe.CopyFromWritePipeToStream()
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1.AsyncStateMachineBox`1.MoveNext(Thread threadPoolThread)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
--- End of stack trace from previous location ---

   --- End of inner exception stack trace ---
   at System.Net.Security.SslStream.<WriteSingleChunk>g__CompleteWriteAsync|157_1[TIOAdapter](ValueTask writeTask, Byte[] bufferToReturn)
   at System.Net.Security.SslStream.WriteAsyncInternal[TIOAdapter](ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at Pipelines.Sockets.Unofficial.StreamConnection.AsyncStreamPipe.CopyFromWritePipeToStream() in /_/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncStreamPipe.cs:line 148
   at System.IO.Pipelines.Pipe.FlushAsync(CancellationToken cancellationToken)
   at System.IO.Pipelines.PipeWriter.CopyFromAsync(Stream source, CancellationToken cancellationToken)
   at Pulsar.Client.Common.Commands.serializeSimpleCommand@44-1.MoveNext()
   at <StartupCode$Pulsar-Client>.$ClientCnx.clo@303-21.MoveNext()

This PR is a stab in the dark at trying to remove stale connections in the ConnectionPool.GetConnection method. It simply adds a check to ensure the connection coming out of the pool is active, otherwise it gets removed.

@Lanayx
Copy link
Member

Lanayx commented Jan 21, 2026

This PR is a stab in the dark at trying to remove stale connections in the ConnectionPool.GetConnection method.

I don't think it's a good strategy. I think good strategy would be to make desired change, compile binaries from source code and use it for several weeks on staging environment or even in production and then create a PR.

As for the change itself, I don't see how it helps because setting ClientCnx IsActive to false happens at the same time as removing it from the pool: https://github.com/fsprojects/pulsar-client-dotnet/blob/develop/src/Pulsar.Client/Internal/ClientCnx.fs#L203-L204

@Lanayx
Copy link
Member

Lanayx commented Jan 28, 2026

Closing as there is no evidence that this change fixes any issues

@Lanayx Lanayx closed this Jan 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants