Skip to content

Commit

Permalink
Changes to use an address resolver which resolves the next address to…
Browse files Browse the repository at this point in the history
… attempt for a connection. This is towards issue jhalterman#17 (jhalterman#17)
  • Loading branch information
Srinath C committed Nov 22, 2013
1 parent 3de6e43 commit 755be9e
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 37 deletions.
13 changes: 13 additions & 0 deletions src/main/java/net/jodah/lyra/AddressResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package net.jodah.lyra;

import com.rabbitmq.client.Address;

/**
* Address resolver to resolve the addresses
*
* @author Srinath C
*/
public interface AddressResolver {

Address nextAddress();
}
86 changes: 49 additions & 37 deletions src/main/java/net/jodah/lyra/ConnectionOptions.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
package net.jodah.lyra;

import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.ConnectionFactory;
import net.jodah.lyra.internal.util.Addresses;
import net.jodah.lyra.internal.util.Assert;
import net.jodah.lyra.util.Duration;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.ConnectionFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* Connection options. Changes will not effect connections that have already been created.
*
*
* @author Jonathan Halterman
*/
public class ConnectionOptions {
private ConnectionFactory factory;
private String host = "localhost";
private Address[] addresses;
private AddressResolver addressResolver;
private String name;
private ExecutorService executor;

Expand All @@ -34,7 +32,7 @@ public ConnectionOptions() {

/**
* Creates a new Options object for the {@code connectionFactory}.
*
*
* @throws NullPointerException if {@code connectionFactory} is null
*/
public ConnectionOptions(ConnectionFactory connectionFactory) {
Expand All @@ -57,7 +55,7 @@ private ConnectionOptions(ConnectionOptions options) {
factory.setSaslConfig(options.factory.getSaslConfig());
factory.setSocketFactory(options.factory.getSocketFactory());
host = options.host;
addresses = options.addresses;
addressResolver = options.addressResolver;
name = options.name;
executor = options.executor;
}
Expand All @@ -71,14 +69,15 @@ public ConnectionOptions copy() {

/**
* Returns the addresses to attempt connections to, in round-robin order.
*
*
* @see #withAddresses(Address...)
* @see #withAddresses(String)
* @see #withHost(String)
* @see #withHosts(String...)
*/
public Address[] getAddresses() {
return addresses == null ? new Address[] { new Address(host, factory.getPort()) } : addresses;
Address address = new Address(host, factory.getPort());
return addressResolver == null ? new Address[] {address} : new Address[] {addressResolver.nextAddress()};
}

/**
Expand All @@ -90,7 +89,7 @@ public ConnectionFactory getConnectionFactory() {

/**
* Returns the consumer executor.
*
*
* @see #withConsumerExecutor(ExecutorService)
*/
public ExecutorService getConsumerExecutor() {
Expand All @@ -101,30 +100,42 @@ public String getName() {
return name;
}

/**
* Sets the {@code addressResolver} to resolve addresses
*
* @throws NullPointerException if {@code addressResolver} is null
*/
public ConnectionOptions withAddressResolver(AddressResolver addressResolver) {
this.addressResolver = Assert.notNull(addressResolver, "addressResolver");
return this;
}

/**
* Sets the {@code addresses} to attempt connections to, in round-robin order.
*
*
* @throws NullPointerException if {@code addresses} is null
*/
public ConnectionOptions withAddresses(Address... addresses) {
this.addresses = Assert.notNull(addresses, "addresses");
Address[] _addresses = Assert.notNull(addresses, "addresses");
this.addressResolver = new DefaultAddressResolver(_addresses);
return this;
}

/**
* Sets the {@code addresses}.
*
*
* @param addresses formatted as "host1[:port],host2[:port]", etc.
* @throws NullPointerException if {@code addresses} is null
*/
public ConnectionOptions withAddresses(String addresses) {
this.addresses = Address.parseAddresses(Assert.notNull(addresses, "addresses"));
Address[] _addresses = Address.parseAddresses(Assert.notNull(addresses, "addresses"));
this.addressResolver = new DefaultAddressResolver(_addresses);
return this;
}

/**
* Sets the client properties.
*
*
* @throws NullPointerException if {@code clientProperties} is null
*/
public ConnectionOptions withClientProperties(Map<String, Object> clientProperties) {
Expand All @@ -134,7 +145,7 @@ public ConnectionOptions withClientProperties(Map<String, Object> clientProperti

/**
* Sets the {@code connectionFactory}.
*
*
* @throws NullPointerException if {@code connectionFactory} is null
*/
public ConnectionOptions withConnectionFactory(ConnectionFactory connectionFactory) {
Expand All @@ -144,7 +155,7 @@ public ConnectionOptions withConnectionFactory(ConnectionFactory connectionFacto

/**
* Set the connection timeout, zero for infinite, for an individual connection attempt.
*
*
* @throws NullPointerException if {@code connectionTimeout} is null
*/
public ConnectionOptions withConnectionTimeout(Duration connectionTimeout) {
Expand All @@ -155,7 +166,7 @@ public ConnectionOptions withConnectionTimeout(Duration connectionTimeout) {
/**
* Sets the executor used to handle consumer callbacks. The {@code executor} will not be shutdown
* when a connection is closed.
*
*
* @throws NullPointerException if {@code executor} is null
*/
public ConnectionOptions withConsumerExecutor(ExecutorService executor) {
Expand All @@ -165,7 +176,7 @@ public ConnectionOptions withConsumerExecutor(ExecutorService executor) {

/**
* Sets the {@code host}.
*
*
* @throws NullPointerException if {@code host} is null
*/
public ConnectionOptions withHost(String host) {
Expand All @@ -175,17 +186,18 @@ public ConnectionOptions withHost(String host) {

/**
* Sets the {@code hosts} to attempt connections to, in round-robin order.
*
*
* @throws NullPointerException if {@code hosts} is null
*/
public ConnectionOptions withHosts(String... hosts) {
this.addresses = Addresses.addressesFor(Assert.notNull(hosts, "hosts"), 5672);
Address[] _addresses = Addresses.addressesFor(Assert.notNull(hosts, "hosts"), 5672);
this.addressResolver = new DefaultAddressResolver(_addresses);
return this;
}

/**
* Sets the connection name. Used for logging and consumer thread naming.
*
*
* @throws NullPointerException if {@code name} is null
*/
public ConnectionOptions withName(String name) {
Expand All @@ -211,7 +223,7 @@ public ConnectionOptions withPort(int port) {

/**
* Set the requested heartbeat, zero for none.
*
*
* @throws NullPointerException if {@code requestedHeartbeat} is null
*/
public ConnectionOptions withRequestedHeartbeat(Duration requestedHeartbeat) {
Expand All @@ -221,7 +233,7 @@ public ConnectionOptions withRequestedHeartbeat(Duration requestedHeartbeat) {

/**
* Sets the SocketFactory to create connections with.
*
*
* @throws NullPointerException if {@code hosts} is null
*/
public ConnectionOptions withSocketFactory(SocketFactory socketFactory) {
Expand All @@ -239,7 +251,7 @@ public ConnectionOptions withSsl() throws NoSuchAlgorithmException, KeyManagemen

/**
* Sets the initialized {@code sslContext} to use.
*
*
* @throws NullPointerException if {@code sslContext} is null
*/
public ConnectionOptions withSslProtocol(SSLContext sslContext) {
Expand All @@ -249,7 +261,7 @@ public ConnectionOptions withSslProtocol(SSLContext sslContext) {

/**
* Sets the {@code sslProtocol} to use.
*
*
* @throws NullPointerException if {@code sslProtocol} is null
*/
public ConnectionOptions withSslProtocol(String sslProtocol) throws NoSuchAlgorithmException,
Expand All @@ -260,7 +272,7 @@ public ConnectionOptions withSslProtocol(String sslProtocol) throws NoSuchAlgori

/**
* Sets the {@code sslProtocol} and {@code trustManager} to use.
*
*
* @throws NullPointerException if {@code sslProtocol} or {@code trustManager} are null
*/
public ConnectionOptions withSslProtocol(String sslProtocol, TrustManager trustManager)
Expand All @@ -272,7 +284,7 @@ public ConnectionOptions withSslProtocol(String sslProtocol, TrustManager trustM

/**
* Sets the username.
*
*
* @throws NullPointerException if {@code username} is null
*/
public ConnectionOptions withUsername(String username) {
Expand All @@ -282,7 +294,7 @@ public ConnectionOptions withUsername(String username) {

/**
* Sets the virtual host.
*
*
* @throws NullPointerException if {@code virtualHost} is null
*/
public ConnectionOptions withVirtualHost(String virtualHost) {
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/net/jodah/lyra/DefaultAddressResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package net.jodah.lyra;

import com.rabbitmq.client.Address;

/**
* Address resolver to resolve the addresses
*
* @author Srinath C
*/
public class DefaultAddressResolver implements AddressResolver {

private final Address[] addresses;

private int nextIndex = 0;

public DefaultAddressResolver(Address[] addresses) {
this.addresses = addresses;
}

@Override
public Address nextAddress() {
if (addresses == null || addresses.length > 0) {
return null;
}
if (nextIndex >= addresses.length ) {
nextIndex = 0;
}
return addresses[nextIndex++];
}
}

0 comments on commit 755be9e

Please sign in to comment.