diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
index b0f15582..00edafbb 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
@@ -3,6 +3,7 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Mono;
@@ -14,7 +15,7 @@ public interface Cluster {
*
* @return cluster address
*/
- Address address();
+ List
addresses();
/**
* Send a msg from this member (src) to target member (specified in parameters).
@@ -34,6 +35,15 @@ public interface Cluster {
*/
Mono send(Address address, Message message);
+ /**
+ * Send a msg from this member (src) to target member (specified in parameters).
+ *
+ * @param addresses target addresses
+ * @param message msg
+ * @return promise telling success or failure
+ */
+ Mono send(List addresses, Message message);
+
/**
* Sends message to the given address. It will issue connect in case if no transport channel by
* given transport {@code address} exists already. Send is an async operation and expecting a
diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java
index b9798687..42b958c9 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java
@@ -7,6 +7,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
@@ -42,6 +43,18 @@ public Member(String id, String alias, List addresses, String namespace
this.namespace = Objects.requireNonNull(namespace, "namespace");
}
+ /**
+ * Constructor.
+ *
+ * @param id member id
+ * @param alias member alias (optional)
+ * @param address member address
+ * @param namespace namespace
+ */
+ public Member(String id, String alias, Address address, String namespace) {
+ this(id, alias, Collections.singletonList(address), namespace);
+ }
+
/**
* Returns cluster member local id.
*
diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
index d51a7526..d3e6ab3d 100644
--- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
+++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
@@ -5,6 +5,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
@@ -213,15 +214,40 @@ public InboundSettings inboundSettings(Address destination) {
return inboundSettings.getOrDefault(destination, defaultInboundSettings);
}
+ /**
+ * Returns network inbound settings applied to the given destination.
+ *
+ * @param destinations addresses of target endpoint
+ * @return network inbound settings
+ */
+ public InboundSettings inboundSettings(List destinations) {
+ if (destinations.isEmpty()) {
+ return defaultInboundSettings;
+ }
+
+ for (Address destination : destinations) {
+ InboundSettings inboundSettings = this.inboundSettings.get(destination);
+
+ if (inboundSettings != null) {
+ return inboundSettings;
+ }
+ }
+
+ return defaultInboundSettings;
+ }
+
/**
* Setter for network emulator inbound settings for specific destination.
*
* @param shallPass shallPass inbound flag
*/
- public void inboundSettings(Address destination, boolean shallPass) {
+ public void inboundSettings(List destinations, boolean shallPass) {
InboundSettings settings = new InboundSettings(shallPass);
- inboundSettings.put(destination, settings);
- LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
+
+ destinations.forEach(destination -> {
+ inboundSettings.put(destination, settings);
+ LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
+ });
}
/**
diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java
index 381042c5..c75ab24f 100644
--- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java
+++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java
@@ -3,6 +3,7 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
+import java.util.Collections;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -83,6 +84,6 @@ public Flux listen() {
}
private Message enhanceWithSender(Message message) {
- return Message.with(message).sender(transport.address()).build();
+ return Message.with(message).sender(Collections.singletonList(transport.address())).build();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
index f38f5e97..a530fd98 100644
--- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
@@ -16,6 +16,7 @@
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import io.scalecube.utils.ServiceLoaderUtil;
import java.io.Serializable;
@@ -243,7 +244,8 @@ private Mono doStart0() {
.flatMap(
boundTransport -> {
localMember = createLocalMember(boundTransport.address());
- transport = new SenderAwareTransport(boundTransport, localMember.address());
+
+ transport = new SenderAwareTransport(boundTransport, localMember.addresses());
final String name =
"sc-cluster-" + Integer.toHexString(System.identityHashCode(this));
@@ -379,7 +381,7 @@ private Member createLocalMember(Address address) {
// First address comes as "fair" listen address
memberAddresses.add(address);
- // Tail goes as externalHosts, if the exist
+ // Tail goes as externalHosts, if exists
final List externalHosts = config.externalHosts();
if (externalHosts != null) {
for (String externalHost : externalHosts) {
@@ -396,13 +398,13 @@ private Member createLocalMember(Address address) {
}
@Override
- public Address address() {
- return member().address();
+ public List addresses() {
+ return member().addresses();
}
@Override
public Mono send(Member member, Message message) {
- return send(member.address(), message);
+ return TransportWrapper.send(transport, member.addresses(), message);
}
@Override
@@ -410,6 +412,11 @@ public Mono send(Address address, Message message) {
return transport.send(address, message);
}
+ @Override
+ public Mono send(List addresses, Message message) {
+ return TransportWrapper.send(transport, addresses, message);
+ }
+
@Override
public Mono requestResponse(Address address, Message request) {
return transport.requestResponse(address, request);
@@ -417,7 +424,7 @@ public Mono requestResponse(Address address, Message request) {
@Override
public Mono requestResponse(Member member, Message request) {
- return transport.requestResponse(member.address(), request);
+ return TransportWrapper.requestResponse(transport, member.addresses(), request);
}
@Override
@@ -526,11 +533,11 @@ public Mono onShutdown() {
private static class SenderAwareTransport implements Transport {
private final Transport transport;
- private final Address address;
+ private final List addresses;
- private SenderAwareTransport(Transport transport, Address address) {
+ private SenderAwareTransport(Transport transport, List addresses) {
this.transport = Objects.requireNonNull(transport);
- this.address = Objects.requireNonNull(address);
+ this.addresses = Objects.requireNonNull(addresses);
}
@Override
@@ -569,7 +576,7 @@ public Flux listen() {
}
private Message enhanceWithSender(Message message) {
- return Message.with(message).sender(address).build();
+ return Message.with(message).sender(addresses).build();
}
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
index ac270c4e..719437b5 100644
--- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
@@ -8,6 +8,7 @@
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
@@ -42,6 +43,8 @@ public final class FailureDetectorImpl implements FailureDetector {
private final Transport transport;
private final FailureDetectorConfig config;
+ private final TransportWrapper transportWrapper;
+
// State
private final List pingMembers = new ArrayList<>();
@@ -81,6 +84,8 @@ public FailureDetectorImpl(
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);
+ this.transportWrapper = new TransportWrapper(this.transport);
+
// Subscribe
actionsDisposables.addAll(
Arrays.asList(
@@ -145,9 +150,9 @@ private void doPing() {
Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build();
LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember);
- Address address = pingMember.address();
- transport
- .requestResponse(address, pingMsg)
+ List addresses = pingMember.addresses();
+ transportWrapper
+ .requestResponse(addresses, pingMsg)
.timeout(Duration.ofMillis(config.pingTimeout()), scheduler)
.publishOn(scheduler)
.subscribe(
@@ -189,8 +194,8 @@ private void doPingReq(
Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout());
pingReqMembers.forEach(
member ->
- transport
- .requestResponse(member.address(), pingReqMsg)
+ transportWrapper
+ .requestResponse(member.addresses(), pingReqMsg)
.timeout(timeout, scheduler)
.publishOn(scheduler)
.subscribe(
@@ -232,7 +237,7 @@ private void onMessage(Message message) {
/** Listens to PING message and answers with ACK. */
private void onPing(Message message) {
long period = this.currentPeriod;
- Address sender = message.sender();
+ List sender = message.sender();
LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender);
PingData data = message.data();
data = data.withAckType(AckType.DEST_OK);
@@ -249,10 +254,10 @@ private void onPing(Message message) {
String correlationId = message.correlationId();
Message ackMessage =
Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build();
- Address address = data.getFrom().address();
- LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address);
- transport
- .send(address, ackMessage)
+ List addresses = data.getFrom().addresses();
+ LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses);
+ transportWrapper
+ .send(addresses, ackMessage)
.subscribe(
null,
ex ->
@@ -260,7 +265,7 @@ private void onPing(Message message) {
"[{}][{}] Failed to send PingAck to {}, cause: {}",
localMember,
period,
- address,
+ addresses,
ex.toString()));
}
@@ -275,10 +280,10 @@ private void onPingReq(Message message) {
PingData pingReqData = new PingData(localMember, target, originalIssuer);
Message pingMessage =
Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build();
- Address address = target.address();
- LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address);
- transport
- .send(address, pingMessage)
+ List addresses = target.addresses();
+ LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, addresses);
+ transportWrapper
+ .send(addresses, pingMessage)
.subscribe(
null,
ex ->
@@ -286,7 +291,7 @@ private void onPingReq(Message message) {
"[{}][{}] Failed to send transit Ping to {}, cause: {}",
localMember,
period,
- address,
+ addresses,
ex.toString()));
}
@@ -305,10 +310,10 @@ private void onTransitPingAck(Message message) {
PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType);
Message originalAckMessage =
Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build();
- Address address = target.address();
- LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address);
- transport
- .send(address, originalAckMessage)
+ List addresses = target.addresses();
+ LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, addresses);
+ transportWrapper
+ .send(addresses, originalAckMessage)
.subscribe(
null,
ex ->
@@ -316,7 +321,7 @@ private void onTransitPingAck(Message message) {
"[{}][{}] Failed to resend transit PingAck to {}, cause: {}",
localMember,
period,
- address,
+ addresses,
ex.toString()));
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
index 2f24beb8..1406dad9 100644
--- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
@@ -7,6 +7,7 @@
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import java.util.ArrayList;
import java.util.Arrays;
@@ -287,14 +288,13 @@ private void spreadGossipsTo(long period, Member member) {
}
// Send gossip request
- Address address = member.address();
+ List addresses = member.addresses();
gossips.stream()
.map(this::buildGossipRequestMessage)
.forEach(
message ->
- transport
- .send(address, message)
+ TransportWrapper.send(transport, addresses, message)
.subscribe(
null,
ex ->
@@ -303,7 +303,7 @@ private void spreadGossipsTo(long period, Member member) {
localMember,
period,
message,
- address,
+ addresses,
ex.toString())));
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
index 53fed376..af16480e 100644
--- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
@@ -15,6 +15,7 @@
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
@@ -39,7 +40,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
@@ -80,6 +80,8 @@ private enum MembershipUpdateReason {
private final GossipProtocol gossipProtocol;
private final MetadataStore metadataStore;
+ private final TransportWrapper transportWrapper;
+
// State
private final Map membershipTable = new HashMap<>();
@@ -127,6 +129,8 @@ public MembershipProtocolImpl(
this.membershipConfig = Objects.requireNonNull(config).membershipConfig();
this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig();
+ this.transportWrapper = new TransportWrapper(this.transport);
+
// Prepare seeds
seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers());
@@ -170,24 +174,31 @@ private List cleanUpSeedMembers(Collection seedMembers) {
String hostAddress = localIpAddress.getHostAddress();
String hostName = localIpAddress.getHostName();
- Address memberAddr = localMember.address();
Address transportAddr = transport.address();
- Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port());
Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port());
- Address memberAddByHostName = Address.create(hostName, memberAddr.port());
Address transportAddrByHostName = Address.create(hostName, transportAddr.port());
return new LinkedHashSet<>(seedMembers)
.stream()
- .filter(addr -> checkAddressesNotEqual(addr, memberAddr))
+ .filter(addr -> checkAddressesNotEqual(addr, localMember, hostAddress, hostName))
.filter(addr -> checkAddressesNotEqual(addr, transportAddr))
- .filter(addr -> checkAddressesNotEqual(addr, memberAddrByHostAddress))
.filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostAddress))
- .filter(addr -> checkAddressesNotEqual(addr, memberAddByHostName))
.filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostName))
.collect(Collectors.toList());
}
+ private boolean checkAddressesNotEqual(
+ Address smAddress, Member localMember, String hostAddress, String hostName) {
+ return localMember.addresses().stream()
+ .allMatch(
+ memberAddress ->
+ checkAddressesNotEqual(smAddress, memberAddress)
+ && checkAddressesNotEqual(
+ smAddress, Address.create(hostAddress, memberAddress.port()))
+ && checkAddressesNotEqual(
+ smAddress, Address.create(hostName, memberAddress.port())));
+ }
+
private boolean checkAddressesNotEqual(Address address0, Address address1) {
if (!address0.equals(address1)) {
return true;
@@ -330,27 +341,29 @@ public Optional member(String id) {
@Override
public Optional member(Address address) {
- return new ArrayList<>(members.values())
- .stream().filter(member -> member.address().equals(address)).findFirst();
+ return members.values().stream()
+ .filter(member -> member.addresses().stream().anyMatch(address::equals))
+ .findFirst();
}
private void doSync() {
- Address address = selectSyncAddress().orElse(null);
- if (address == null) {
+ List addresses = selectSyncAddress();
+
+ if (addresses.isEmpty()) {
return;
}
Message message = prepareSyncDataMsg(SYNC, null);
- LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address);
- transport
- .send(address, message)
+ LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses);
+ transportWrapper
+ .send(addresses, message)
.subscribe(
null,
ex ->
LOGGER.debug(
"[{}][doSync] Failed to send Sync to {}, cause: {}",
localMember,
- address,
+ addresses,
ex.toString()));
}
@@ -394,13 +407,13 @@ private Mono onSyncAck(Message syncAckMsg, boolean onStart) {
private Mono onSync(Message syncMsg) {
return Mono.defer(
() -> {
- final Address sender = syncMsg.sender();
+ final List sender = syncMsg.sender();
LOGGER.debug("[{}] Received Sync from {}", localMember, sender);
return syncMembership(syncMsg.data(), false)
.doOnSuccess(
avoid -> {
Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
- transport
+ transportWrapper
.send(sender, message)
.subscribe(
null,
@@ -429,16 +442,16 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
// Alive won't override SUSPECT so issue instead extra sync with member to force it spread
// alive with inc + 1
Message syncMsg = prepareSyncDataMsg(SYNC, null);
- Address address = fdEvent.member().address();
- transport
- .send(address, syncMsg)
+ List addresses = fdEvent.member().addresses();
+ transportWrapper
+ .send(addresses, syncMsg)
.subscribe(
null,
ex ->
LOGGER.debug(
"[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}",
localMember,
- address,
+ addresses,
ex.toString()));
} else {
MembershipRecord record =
@@ -468,17 +481,24 @@ private void onMembershipGossip(Message message) {
}
}
- private Optional selectSyncAddress() {
- List addresses =
- Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address))
- .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new));
- Collections.shuffle(addresses);
- if (addresses.isEmpty()) {
- return Optional.empty();
- } else {
- int i = ThreadLocalRandom.current().nextInt(addresses.size());
- return Optional.of(addresses.get(i));
+ private List selectSyncAddress() {
+ Collection otherMembers = otherMembers();
+
+ if (seedMembers.isEmpty() && otherMembers.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ int totalSize = seedMembers.size() + otherMembers.size();
+ int randomIndex = ThreadLocalRandom.current().nextInt(totalSize);
+
+ if (randomIndex < seedMembers.size()) {
+ return Collections.singletonList(seedMembers.get(randomIndex));
}
+
+ List otherMembersList = new ArrayList<>(otherMembers);
+ Member member = otherMembersList.get(randomIndex - seedMembers.size());
+
+ return member.addresses();
}
// ================================================
@@ -593,7 +613,7 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason
}
// If received updated for local member then increase incarnation and spread Alive gossip
- if (r1.member().address().equals(localMember.address())) {
+ if (r1.member().addresses().equals(localMember.addresses())) {
if (r1.member().id().equals(localMember.id())) {
return onSelfMemberDetected(r0, r1, reason);
} else {
diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
index 35ba5328..1ace90eb 100644
--- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
@@ -4,10 +4,12 @@
import io.scalecube.cluster.Member;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.net.Address;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -37,6 +39,8 @@ public class MetadataStoreImpl implements MetadataStore {
private final Transport transport;
private final ClusterConfig config;
+ private final TransportWrapper transportWrapper;
+
// State
private final Map membersMetadata = new HashMap<>();
@@ -69,6 +73,8 @@ public MetadataStoreImpl(
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);
this.localMetadata = localMetadata; // optional
+
+ this.transportWrapper = new TransportWrapper(this.transport);
}
@Override
@@ -148,7 +154,6 @@ public Mono fetchMetadata(Member member) {
return Mono.defer(
() -> {
final String cid = UUID.randomUUID().toString();
- final Address targetAddress = member.address();
LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member);
@@ -159,17 +164,18 @@ public Mono fetchMetadata(Member member) {
.data(new GetMetadataRequest(member))
.build();
- return transport
- .requestResponse(targetAddress, request)
+ // TODO. Make transport abstraction around this logic
+
+ List addresses = member.addresses();
+
+ return transportWrapper
+ .requestResponse(addresses, request)
.timeout(Duration.ofMillis(config.metadataTimeout()), scheduler)
.publishOn(scheduler)
.doOnSuccess(
s ->
LOGGER.debug(
- "[{}][{}] Received GetMetadataResp from {}",
- localMember,
- cid,
- targetAddress))
+ "[{}][{}] Received GetMetadataResp from {}", localMember, cid, addresses))
.map(Message::data)
.map(GetMetadataResponse::getMetadata)
.doOnError(
@@ -179,7 +185,7 @@ public Mono fetchMetadata(Member member) {
+ "from {} within {} ms, cause: {}",
localMember,
cid,
- targetAddress,
+ addresses,
config.metadataTimeout(),
th.toString()));
});
@@ -196,7 +202,7 @@ private void onMessage(Message message) {
}
private void onMetadataRequest(Message message) {
- final Address sender = message.sender();
+ final List sender = message.sender(); // TODO. Log
LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender);
GetMetadataRequest reqData = message.data();
@@ -224,7 +230,7 @@ private void onMetadataRequest(Message message) {
.build();
LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender);
- transport
+ transportWrapper
.send(sender, response)
.subscribe(
null,
diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java
index bbcbd1ab..6c41fd3e 100644
--- a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java
@@ -66,14 +66,14 @@ public void testSeparateEmptyNamespaces() {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root1"))
- .membership(opts -> opts.seedMembers(root.address()))
+ .membership(opts -> opts.seedMembers(root.addresses()))
.startAwait();
Cluster root2 =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root2"))
- .membership(opts -> opts.seedMembers(root.address()))
+ .membership(opts -> opts.seedMembers(root.addresses()))
.startAwait();
assertThat(root.otherMembers(), iterableWithSize(0));
@@ -93,21 +93,21 @@ public void testSeparateNonEmptyNamespaces() {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root"))
- .membership(opts -> opts.seedMembers(root.address()))
+ .membership(opts -> opts.seedMembers(root.addresses()))
.startAwait();
Cluster carol =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root"))
- .membership(opts -> opts.seedMembers(root.address(), bob.address()))
+ .membership(opts -> opts.seedMembers(root.addresses().get(0), bob.addresses().get(0)))
.startAwait();
Cluster root2 =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("root2"))
- .membership(opts -> opts.seedMembers(root.address()))
+ .membership(opts -> opts.seedMembers(root.addresses()))
.startAwait();
Cluster dan =
@@ -117,7 +117,7 @@ public void testSeparateNonEmptyNamespaces() {
.membership(
opts ->
opts.seedMembers(
- root.address(), root2.address(), bob.address(), carol.address()))
+ root.addresses().get(0), root2.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0)))
.startAwait();
Cluster eve =
@@ -127,11 +127,11 @@ public void testSeparateNonEmptyNamespaces() {
.membership(
opts ->
opts.seedMembers(
- root.address(),
- root2.address(),
- dan.address(),
- bob.address(),
- carol.address()))
+ root.addresses().get(0),
+ root2.addresses().get(0),
+ dan.addresses().get(0),
+ bob.addresses().get(0),
+ carol.addresses().get(0)))
.startAwait();
assertThat(root.otherMembers(), containsInAnyOrder(bob.member(), carol.member()));
@@ -155,14 +155,14 @@ public void testSimpleNamespacesHierarchy() {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop"))
- .membership(opts -> opts.seedMembers(rootDevelop.address()))
+ .membership(opts -> opts.seedMembers(rootDevelop.addresses()))
.startAwait();
Cluster carol =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop"))
- .membership(opts -> opts.seedMembers(rootDevelop.address(), bob.address()))
+ .membership(opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0)))
.startAwait();
Cluster dan =
@@ -170,7 +170,7 @@ public void testSimpleNamespacesHierarchy() {
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("develop/develop-2"))
.membership(
- opts -> opts.seedMembers(rootDevelop.address(), bob.address(), carol.address()))
+ opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0)))
.startAwait();
Cluster eve =
@@ -180,7 +180,7 @@ public void testSimpleNamespacesHierarchy() {
.membership(
opts ->
opts.seedMembers(
- rootDevelop.address(), bob.address(), carol.address(), dan.address()))
+ rootDevelop.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0), dan.addresses().get(0)))
.startAwait();
assertThat(
@@ -206,14 +206,14 @@ public void testIsolatedParentNamespaces() {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/1/c"))
- .membership(opts -> opts.seedMembers(parent1.address()))
+ .membership(opts -> opts.seedMembers(parent1.addresses()))
.startAwait();
Cluster carol =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.membership(opts -> opts.namespace("a/1/c"))
- .membership(opts -> opts.seedMembers(parent1.address(), bob.address()))
+ .membership(opts -> opts.seedMembers(parent1.addresses().get(0), bob.addresses().get(0)))
.startAwait();
Cluster parent2 =
@@ -229,7 +229,7 @@ public void testIsolatedParentNamespaces() {
.membership(
opts ->
opts.seedMembers(
- parent1.address(), parent2.address(), bob.address(), carol.address()))
+ parent1.addresses().get(0), parent2.addresses().get(0), bob.addresses().get(0), carol.addresses().get(0)))
.startAwait();
//noinspection unused
@@ -240,11 +240,11 @@ public void testIsolatedParentNamespaces() {
.membership(
opts ->
opts.seedMembers(
- parent1.address(),
- parent2.address(),
- bob.address(),
- carol.address(),
- dan.address()))
+ parent1.addresses().get(0),
+ parent2.addresses().get(0),
+ bob.addresses().get(0),
+ carol.addresses().get(0),
+ dan.addresses().get(0)))
.startAwait();
assertThat(parent1.otherMembers(), containsInAnyOrder(bob.member(), carol.member()));
diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
index b99f6189..906d7e2f 100644
--- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
@@ -92,7 +92,7 @@ public void testMembersAccessFromScheduler() {
Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait();
Cluster otherNode =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -101,8 +101,8 @@ public void testMembersAccessFromScheduler() {
// Members by address
- Optional otherNodeOnSeedNode = seedNode.member(otherNode.address());
- Optional seedNodeOnOtherNode = otherNode.member(seedNode.address());
+ Optional otherNodeOnSeedNode = seedNode.member(otherNode.addresses().get(0));
+ Optional seedNodeOnOtherNode = otherNode.member(seedNode.addresses().get(0));
assertEquals(otherNode.member(), otherNodeOnSeedNode.orElse(null));
assertEquals(seedNode.member(), seedNodeOnOtherNode.orElse(null));
@@ -181,7 +181,7 @@ public void testJoinDynamicPort() {
for (int i = 0; i < membersNum; i++) {
otherNodes.add(
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait());
}
@@ -212,7 +212,7 @@ public void testUpdateMetadata() throws Exception {
metadataNode =
new ClusterImpl()
.config(opts -> opts.metadata(metadata))
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -221,7 +221,7 @@ public void testUpdateMetadata() throws Exception {
.flatMap(
integer ->
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster ->
@@ -285,7 +285,7 @@ public void testUpdateMetadataProperty() throws Exception {
metadataNode =
new ClusterImpl()
.config(opts -> opts.metadata(metadata))
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -294,7 +294,7 @@ public void testUpdateMetadataProperty() throws Exception {
.flatMap(
integer ->
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster ->
@@ -363,7 +363,7 @@ public void testRemoveMetadataProperty() throws Exception {
metadataNode =
new ClusterImpl()
.config(opts -> opts.metadata(metadata))
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -372,7 +372,7 @@ public void testRemoveMetadataProperty() throws Exception {
.flatMap(
integer ->
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster ->
@@ -452,19 +452,19 @@ public void onMembershipEvent(MembershipEvent event) {
// Start nodes
final Cluster node1 =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(cluster -> listener)
.startAwait();
final Cluster node2 =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(cluster -> listener)
.startAwait();
final Cluster node3 =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(cluster -> listener)
.startAwait();
@@ -506,7 +506,7 @@ public void onMembershipEvent(MembershipEvent event) {
final Cluster node1 =
new ClusterImpl()
.config(opts -> opts.metadata(node1Metadata))
- .membership(opts -> opts.seedMembers(seedNode.address()))
+ .membership(opts -> opts.seedMembers(seedNode.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster ->
@@ -559,9 +559,11 @@ public void testJoinSeedClusterWithNoExistingSeedMember() {
// Start seed node
Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait();
- Address nonExistingSeed1 = Address.from("localhost:1234");
- Address nonExistingSeed2 = Address.from("localhost:5678");
- Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()};
+ List seeds = new ArrayList<>();
+
+ seeds.add(Address.from("localhost:1234")); // Not existent
+ seeds.add(Address.from("localhost:5678")); // Not existent
+ seeds.addAll(seedNode.addresses());
Cluster otherNode =
new ClusterImpl()
diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java
index 316f814e..bf762570 100644
--- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java
@@ -445,7 +445,8 @@ private void assertStatus(
events.stream()
.filter(event -> event.status() == status)
.map(FailureDetectorEvent::member)
- .map(Member::address)
+ .map(Member::addresses)
+ .flatMap(Collection::stream)
.collect(Collectors.toList());
String msg1 =
@@ -472,7 +473,8 @@ private Future> listenNextEventFor(
List> resultFuture = new ArrayList<>();
for (final Address member : addresses) {
final CompletableFuture future = new CompletableFuture<>();
- fd.listen().filter(event -> event.member().address() == member).subscribe(future::complete);
+ fd.listen()
+ .filter(event -> event.member().addresses().contains(member)).subscribe(future::complete);
resultFuture.add(future);
}
diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java
index ef559256..0678bd45 100644
--- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java
@@ -750,19 +750,19 @@ public void testOverrideMemberAddress() throws UnknownHostException {
NetworkEmulatorTransport e = createTransport();
MembershipProtocolImpl cmA =
- createMembership(a, testConfig(Collections.emptyList()).externalHost(localAddress));
+ createMembership(a, testConfig(Collections.emptyList()).externalHosts(localAddress));
MembershipProtocolImpl cmB =
createMembership(
- b, testConfig(Collections.singletonList(a.address())).externalHost(localAddress));
+ b, testConfig(Collections.singletonList(a.address())).externalHosts(localAddress));
MembershipProtocolImpl cmC =
createMembership(
- c, testConfig(Collections.singletonList(a.address())).externalHost(localAddress));
+ c, testConfig(Collections.singletonList(a.address())).externalHosts(localAddress));
MembershipProtocolImpl cmD =
createMembership(
- d, testConfig(Collections.singletonList(b.address())).externalHost(localAddress));
+ d, testConfig(Collections.singletonList(b.address())).externalHosts(localAddress));
MembershipProtocolImpl cmE =
createMembership(
- e, testConfig(Collections.singletonList(b.address())).externalHost(localAddress));
+ e, testConfig(Collections.singletonList(b.address())).externalHosts(localAddress));
try {
awaitSeconds(3);
diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java
index cff607f2..b2312f15 100644
--- a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java
+++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java
@@ -30,7 +30,7 @@ public static void main(String[] args) {
Cluster bob =
new ClusterImpl()
.config(opts -> opts.memberAlias("Bob"))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -39,7 +39,7 @@ public static void main(String[] args) {
Cluster carol =
new ClusterImpl()
.config(opts -> opts.memberAlias("Carol").metadata(metadata))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -47,7 +47,7 @@ public static void main(String[] args) {
ClusterConfig configWithFixedPort =
new ClusterConfig()
.memberAlias("Dan")
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transport(opts -> opts.port(3000));
Cluster dan =
new ClusterImpl(configWithFixedPort)
@@ -61,10 +61,10 @@ public static void main(String[] args) {
.membership(
opts ->
opts.seedMembers(
- alice.address(),
- bob.address(),
- carol.address(),
- dan.address()) // won't join anyway
+ alice.addresses().get(0),
+ bob.addresses().get(0),
+ carol.addresses().get(0),
+ dan.addresses().get(0)) // won't join anyway
.namespace("another-cluster"));
Cluster eve =
new ClusterImpl(configWithSyncGroup)
@@ -75,31 +75,31 @@ public static void main(String[] args) {
System.out.println(
"Alice ("
- + alice.address()
+ + alice.addresses()
+ ") cluster: "
+ alice.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Bob ("
- + bob.address()
+ + bob.addresses()
+ ") cluster: "
+ bob.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Carol ("
- + carol.address()
+ + carol.addresses()
+ ") cluster: "
+ carol.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Dan ("
- + dan.address()
+ + dan.addresses()
+ ") cluster: "
+ dan.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Eve ("
- + eve.address()
+ + eve.addresses()
+ ") cluster: " // alone in cluster
+ eve.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
}
diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java
index bb1910b1..611c5cfa 100644
--- a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java
+++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java
@@ -24,7 +24,7 @@ public static void main(String[] args) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Bob"))
.membership(opts -> opts.namespace("alice/bob-and-carol"))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -33,7 +33,7 @@ public static void main(String[] args) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Carol"))
.membership(opts -> opts.namespace("alice/bob-and-carol"))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -41,7 +41,7 @@ public static void main(String[] args) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Bob-and-Carol-Child-1"))
.membership(opts -> opts.namespace("alice/bob-and-carol/child-1"))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -49,7 +49,7 @@ public static void main(String[] args) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Bob-and-Carol-Child-2"))
.membership(opts -> opts.namespace("alice/bob-and-carol/child-2"))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -58,7 +58,7 @@ public static void main(String[] args) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Dan"))
.membership(opts -> opts.namespace("alice/dan-and-eve"))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -67,7 +67,7 @@ public static void main(String[] args) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Eve"))
.membership(opts -> opts.namespace("alice/dan-and-eve"))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
@@ -75,37 +75,37 @@ public static void main(String[] args) {
System.out.println(
"Alice ("
- + alice.address()
+ + alice.addresses()
+ ") cluster: "
+ alice.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Bob ("
- + bob.address()
+ + bob.addresses()
+ ") cluster: "
+ bob.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Carol ("
- + carol.address()
+ + carol.addresses()
+ ") cluster: "
+ carol.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Dan ("
- + dan.address()
+ + dan.addresses()
+ ") cluster: "
+ dan.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Eve ("
- + eve.address()
+ + eve.addresses()
+ ") cluster: " // alone in cluster
+ eve.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n")));
System.out.println(
"Bob-And-Carol-Child-1 ("
- + bobAndCarolChild1.address()
+ + bobAndCarolChild1.addresses()
+ ") cluster: " // alone in cluster
+ bobAndCarolChild1.members().stream()
.map(Member::toString)
@@ -113,7 +113,7 @@ public static void main(String[] args) {
System.out.println(
"Bob-And-Carol-Child-2 ("
- + carolChild2.address()
+ + carolChild2.addresses()
+ ") cluster: " // alone in cluster
+ carolChild2.members().stream()
.map(Member::toString)
diff --git a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java
index 95824c93..14ce4056 100644
--- a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java
+++ b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java
@@ -30,7 +30,7 @@ public static void main(String[] args) throws Exception {
Cluster joe =
new ClusterImpl()
.config(opts -> opts.metadata(Collections.singletonMap("name", "Joe")))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
diff --git a/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java b/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java
index 90604932..6dd1f2fe 100644
--- a/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java
+++ b/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java
@@ -24,7 +24,7 @@ public static void main(String[] args) throws Exception {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(123L))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.startAwait();
System.out.println(
"[" + joe.member().id() + "] Joe's metadata: " + joe.metadata().orElse(null));
@@ -33,7 +33,7 @@ public static void main(String[] args) throws Exception {
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
.config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(456L))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.startAwait();
System.out.println(
"[" + bob.member().id() + "] Bob's metadata: " + bob.metadata().orElse(null));
diff --git a/examples/src/main/java/io/scalecube/examples/GossipExample.java b/examples/src/main/java/io/scalecube/examples/GossipExample.java
index 2135880c..d4919541 100644
--- a/examples/src/main/java/io/scalecube/examples/GossipExample.java
+++ b/examples/src/main/java/io/scalecube/examples/GossipExample.java
@@ -34,7 +34,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster bob =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
@@ -50,7 +50,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster carol =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
@@ -66,7 +66,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster dan =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
@@ -82,7 +82,7 @@ public void onGossip(Message gossip) {
// Start cluster node Eve that joins cluster and spreads gossip
Cluster eve =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.startAwait();
eve.spreadGossip(Message.fromData("Gossip from Eve"))
diff --git a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java
index 2040588f..b68bb3b0 100644
--- a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java
+++ b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java
@@ -47,7 +47,7 @@ public void onMembershipEvent(MembershipEvent event) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Bob"))
.config(opts -> opts.metadata(Collections.singletonMap("name", "Bob")))
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
@@ -66,7 +66,7 @@ public void onMembershipEvent(MembershipEvent event) {
new ClusterImpl()
.config(opts -> opts.memberAlias("Carol"))
.config(opts -> opts.metadata(Collections.singletonMap("name", "Carol")))
- .membership(opts -> opts.seedMembers(bob.address()))
+ .membership(opts -> opts.seedMembers(bob.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
diff --git a/examples/src/main/java/io/scalecube/examples/MessagingExample.java b/examples/src/main/java/io/scalecube/examples/MessagingExample.java
index b747b229..37dd445a 100644
--- a/examples/src/main/java/io/scalecube/examples/MessagingExample.java
+++ b/examples/src/main/java/io/scalecube/examples/MessagingExample.java
@@ -39,7 +39,7 @@ public void onMessage(Message msg) {
// messages
Cluster bob =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
@@ -58,7 +58,7 @@ public void onMessage(Message msg) {
// Join cluster node Carol to cluster with Alice and Bob
Cluster carol =
new ClusterImpl()
- .membership(opts -> opts.seedMembers(alice.address(), bob.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses().get(0), bob.addresses().get(0)))
.transportFactory(TcpTransportFactory::new)
.handler(
cluster -> {
diff --git a/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java b/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java
index 7049e1a1..cb888573 100644
--- a/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java
+++ b/examples/src/main/java/io/scalecube/examples/WebsocketMessagingExample.java
@@ -40,7 +40,7 @@ public void onMessage(Message msg) {
Cluster bob =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
- .membership(opts -> opts.seedMembers(alice.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
@@ -59,15 +59,13 @@ public void onMessage(Message msg) {
Cluster carol =
new ClusterImpl()
.transportFactory(WebsocketTransportFactory::new)
- .membership(opts -> opts.seedMembers(alice.address(), bob.address()))
+ .membership(opts -> opts.seedMembers(alice.addresses().get(0), bob.addresses().get(0)))
.handler(
- cluster -> {
- return new ClusterMessageHandler() {
- @Override
- public void onMessage(Message msg) {
- System.out.println("Carol received: " + msg.data());
- }
- };
+ cluster -> new ClusterMessageHandler() {
+ @Override
+ public void onMessage(Message msg) {
+ System.out.println("Carol received: " + msg.data());
+ }
})
.startAwait();
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
index b5e3e879..e9a3c7a9 100644
--- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
@@ -5,13 +5,15 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.Optional;
import java.util.StringJoiner;
+import java.util.stream.Collectors;
/**
* The Class Message introduces generic protocol used for point to point communication by transport.
@@ -36,7 +38,8 @@ public final class Message implements Externalizable {
* This header represents sender address of type {@link Address}. It's an address of message
* originator. This header is optional.
*/
- public static final String HEADER_SENDER = "sender";
+ public static final String HEADER_SENDER =
+ "sender"; // TODO. Value should be list of addresses (comma separated)
private Map headers = Collections.emptyMap();
private Object data;
@@ -190,8 +193,17 @@ public T data() {
*
* @return address
*/
- public Address sender() {
- return Optional.ofNullable(header(HEADER_SENDER)).map(Address::from).orElse(null);
+ public List sender() {
+ String headerValue = header(HEADER_SENDER);
+
+ if (headerValue == null) {
+ return Collections.emptyList();
+ }
+
+ return Arrays.stream(headerValue.split(","))
+ .map(String::trim) // Removes leading and trailing spaces.
+ .map(Address::from)
+ .collect(Collectors.toList());
}
@Override
@@ -281,8 +293,16 @@ public Builder correlationId(String correlationId) {
return header(HEADER_CORRELATION_ID, correlationId);
}
- public Builder sender(Address sender) {
- return header(HEADER_SENDER, sender.toString());
+ /**
+ * Setter for header.
+ *
+ * @param addresses addresses
+ * @return builder
+ */
+ public Builder sender(List addresses) {
+ return header(
+ HEADER_SENDER,
+ addresses.stream().map(Address::toString).collect(Collectors.joining(",")));
}
public Message build() {
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java
new file mode 100644
index 00000000..2d025a8b
--- /dev/null
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java
@@ -0,0 +1,53 @@
+package io.scalecube.cluster.transport.api;
+
+import io.scalecube.net.Address;
+import java.util.List;
+import reactor.core.publisher.Mono;
+
+public class TransportWrapper {
+
+ private final Transport transport;
+
+ public TransportWrapper(Transport transport) {
+ this.transport = transport;
+ }
+
+ public Mono requestResponse(List addresses, Message request) {
+ return requestResponse(transport, addresses, 0, request);
+ }
+
+ public static Mono requestResponse(
+ Transport transport, List addresses, Message request) {
+ return requestResponse(transport, addresses, 0, request);
+ }
+
+ private static Mono requestResponse(
+ Transport transport, List addresses, int currentIndex, Message request) {
+ if (currentIndex >= addresses.size()) {
+ return Mono.error(new RuntimeException("All addresses have been tried and failed."));
+ }
+
+ return transport
+ .requestResponse(addresses.get(currentIndex), request)
+ .onErrorResume(th -> requestResponse(transport, addresses, currentIndex + 1, request));
+ }
+
+ public Mono send(List addresses, Message request) {
+ return send(transport, addresses, 0, request);
+ }
+
+ public static Mono send(Transport transport, List addresses, Message request) {
+ return send(transport, addresses, 0, request);
+ }
+
+ private static Mono send(
+ Transport transport, List addresses, int currentIndex, Message request) {
+ if (currentIndex >= addresses.size()) {
+ return Mono.error(new RuntimeException("All addresses have been tried and failed."));
+ }
+
+ return transport
+ .send(addresses.get(currentIndex), request)
+ .onErrorResume(th -> send(transport, addresses, currentIndex + 1, request));
+ }
+}
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java
index 3d07e84d..3f24d0bd 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java
@@ -3,11 +3,14 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.cluster.utils.NetworkEmulatorTransport;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
import io.scalecube.transport.netty.websocket.WebsocketTransportFactory;
import java.time.Duration;
+import java.util.List;
+
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
@@ -15,6 +18,8 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
+import javax.sql.rowset.spi.TransactionalWriter;
+
/** Base test class. */
public class BaseTest {
@@ -50,6 +55,25 @@ protected Mono send(Transport transport, Address to, Message msg) {
th.toString()));
}
+ /**
+ * Sending message from src to destination.
+ *
+ * @param transport src
+ * @param to destinations
+ * @param msg request
+ */
+ protected Mono send(Transport transport, List to, Message msg) {
+ return TransportWrapper.send(transport, to, msg)
+ .doOnError(
+ th ->
+ LOGGER.error(
+ "Failed to send {} to {} from transport: {}, cause: {}",
+ msg,
+ to,
+ transport,
+ th.toString()));
+ }
+
/**
* Stopping transport.
*
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java
index f8bb8daa..2bd14e96 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java
@@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.cluster.utils.NetworkEmulatorTransport;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.BaseTest;
@@ -94,9 +95,11 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception {
.listen()
.subscribe(
message -> {
- Address address = message.sender();
- assertEquals(client.address(), address, "Expected clientAddress");
- send(server, address, Message.fromQualifier("hi client")).subscribe();
+ List addresses = message.sender();
+ assertTrue(
+ addresses.stream().anyMatch(a -> client.address().equals(a)),
+ "Expected clientAddress");
+ send(server, addresses, Message.fromQualifier("hi client")).subscribe();
});
CompletableFuture messageFuture = new CompletableFuture<>();
@@ -145,8 +148,7 @@ public void testPingPongOnSingleChannel() throws Exception {
messages -> {
for (Message message : messages) {
Message echo = Message.withData("echo/" + message.qualifier()).build();
- server
- .send(message.sender(), echo)
+ TransportWrapper.send(server, message.sender(), echo)
.subscribe(null, th -> LOGGER.error("Failed to send message", th));
}
});
@@ -215,8 +217,7 @@ public void testPingPongOnSeparateChannel() throws Exception {
messages -> {
for (Message message : messages) {
Message echo = Message.withData("echo/" + message.qualifier()).build();
- server
- .send(message.sender(), echo)
+ TransportWrapper.send(server, message.sender(), echo)
.subscribe(null, th -> LOGGER.error("Failed to send message", th));
}
});
@@ -280,8 +281,7 @@ public void testObserverThrowsException() throws Exception {
}
if (qualifier.startsWith("q")) {
Message echo = Message.withData("echo/" + message.qualifier()).build();
- server
- .send(message.sender(), echo)
+ TransportWrapper.send(server, message.sender(), echo)
.subscribe(null, th -> LOGGER.error("Failed to send message", th));
}
},
@@ -320,7 +320,9 @@ public void testBlockAndUnblockTraffic() throws Exception {
client = createTcpTransport();
server = createTcpTransport();
- server.listen().subscribe(message -> server.send(message.sender(), message).subscribe());
+ server
+ .listen()
+ .subscribe(message -> TransportWrapper.send(server, message.sender(), message).subscribe());
Sinks.Many responses = Sinks.many().replay().all();
client
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java
index 050474af..da9b00d3 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java
@@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.TransportWrapper;
import io.scalecube.cluster.utils.NetworkEmulatorTransport;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.BaseTest;
@@ -94,9 +95,11 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception {
.listen()
.subscribe(
message -> {
- Address address = message.sender();
- assertEquals(client.address(), address, "Expected clientAddress");
- send(server, address, Message.fromQualifier("hi client")).subscribe();
+ List addresses = message.sender();
+ assertTrue(
+ addresses.stream().anyMatch(a -> client.address().equals(a)),
+ "Expected clientAddress");
+ send(server, addresses, Message.fromQualifier("hi client")).subscribe();
});
CompletableFuture messageFuture = new CompletableFuture<>();
@@ -145,8 +148,7 @@ public void testPingPongOnSingleChannel() throws Exception {
messages -> {
for (Message message : messages) {
Message echo = Message.withData("echo/" + message.qualifier()).build();
- server
- .send(message.sender(), echo)
+ TransportWrapper.send(server, message.sender(), echo)
.subscribe(null, th -> LOGGER.error("Failed to send message", th));
}
});
@@ -215,8 +217,7 @@ public void testPingPongOnSeparateChannel() throws Exception {
messages -> {
for (Message message : messages) {
Message echo = Message.withData("echo/" + message.qualifier()).build();
- server
- .send(message.sender(), echo)
+ TransportWrapper.send(server, message.sender(), echo)
.subscribe(null, th -> LOGGER.error("Failed to send message", th));
}
});
@@ -280,8 +281,7 @@ public void testObserverThrowsException() throws Exception {
}
if (qualifier.startsWith("q")) {
Message echo = Message.withData("echo/" + message.qualifier()).build();
- server
- .send(message.sender(), echo)
+ TransportWrapper.send(server, message.sender(), echo)
.subscribe(null, th -> LOGGER.error("Failed to send message", th));
}
},
@@ -320,7 +320,9 @@ public void testBlockAndUnblockTraffic() throws Exception {
client = createWebsocketTransport();
server = createWebsocketTransport();
- server.listen().subscribe(message -> server.send(message.sender(), message).subscribe());
+ server
+ .listen()
+ .subscribe(message -> TransportWrapper.send(server, message.sender(), message).subscribe());
Sinks.Many responses = Sinks.many().replay().all();
client