Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Pulsar cluster level auto failover (PIP-121) #217

Open
LeoSht opened this issue Jan 17, 2023 · 5 comments
Open

Add support for Pulsar cluster level auto failover (PIP-121) #217

LeoSht opened this issue Jan 17, 2023 · 5 comments
Labels
enhancement New feature or request

Comments

@LeoSht
Copy link
Contributor

LeoSht commented Jan 17, 2023

Please add support for cluster-level auto failover similar to Java client.
Related information:
Cluster Level Fail Over PIP-121
Java Client MR

@Lanayx
Copy link
Member

Lanayx commented Jan 17, 2023

Hi! I'm not actively using Pulsar and not adding new functionality to the library myself (only doing bug fixes), but always happy to support if someone wants to add smth, so if you need this feature - you can send a PR

@fjod
Copy link
Contributor

fjod commented Jul 16, 2023

So far I get to the point:
in java code when the cluster address changed, they do this:

   pulsarClient.updateServiceUrl(serviceUrl);
   pulsarClient.reloadLookUp();

On update service url:

 log.info("Updating service URL to {}", serviceUrl);
        conf.setServiceUrl(serviceUrl);
        lookup.updateServiceUrl(serviceUrl);
        cnxPool.closeAllConnections();

and on reload lookup:

 if (conf.getServiceUrl().startsWith("http")) {
            lookup = new HttpLookupService(conf, eventLoopGroup);
        } else {
            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
                    externalExecutorProvider.getExecutor());
        }

So far I did same things in pulsarClient

 backgroundTask {
       let q = { config with ServiceAddresses = addresses } 
       do! connectionPool.CloseAsync()       
       connectionPool <- ConnectionPool(q)
       lookupService <- BinaryLookupService(q, connectionPool)       
       }

but all producers are still connected to old address. It looks like in Java code when all connections are closed, producers/consumers will go to new lookup to get new addresses(?), but in F# they do not.

@Lanayx
Copy link
Member

Lanayx commented Jul 16, 2023

I think you need to ensure that once connection is dropped consumers/producers should already have new address for reconnection. In your code you make connection drop first, so probably reconnect happens immediately to the same address

@fjod
Copy link
Contributor

fjod commented Aug 13, 2023

once connection is dropped consumers/producers should already have new address for reconnection

I tried to rewrite like this:

 backgroundTask {
       let q = { config with ServiceAddresses = addresses }
       let oldPool = connectionPool
       connectionPool <- ConnectionPool(q)
       lookupService <- BinaryLookupService(q, connectionPool)
       do! oldPool.CloseAsync()      
       }

Interesting, if I start demo app + 2 clusters on 6650 and 6651 addresses, and a webapp which tells pulsar to swtich from 6650 to 6651 - it works :) But if I start with 6650, then boot up the app to broadcast new address 6651, it fails to reconnect tp 6651 with inner exception

Unhandled exception. System.TimeoutException: Could not send message to broker within given timeout
  at <StartupCode$Pulsar-Client>[email protected]() in F:\work\pulsar-client-dotnet\src\Pulsar.Client\Internal\ProducerImpl.fs:line 767

So, it seems like if there is an established connection to broker, my code fails to change the address. But if there is no connection yet (address is switched before client is connected to broker), it works ok.

@Lanayx
Copy link
Member

Lanayx commented Aug 13, 2023

@fjod I think you can create a PR already so I could try it myself and help

@Lanayx Lanayx unassigned fjod Oct 4, 2023
@Lanayx Lanayx added enhancement New feature or request and removed developer needed labels May 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants