From 7f57054e1fcaace9fe1fb22b93e1dd8a388f9d56 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sat, 2 Sep 2023 17:08:16 +0300 Subject: [PATCH] Added `externalHosts` config setting, refactored RSocketClientTransport to support connection on multiple addresses --- .../scalecube/services/ServiceEndpoint.java | 39 ++++--- .../scalecube/services/ServiceReference.java | 12 +- .../transport/StaticAddressRouter.java | 2 +- .../rsocket/RSocketClientChannel.java | 16 +-- .../rsocket/RSocketClientTransport.java | 108 +++++++++++------- .../io/scalecube/services/Microservices.java | 45 +++----- .../services/ServiceCallLocalTest.java | 2 +- .../services/ServiceCallRemoteTest.java | 2 +- 8 files changed, 128 insertions(+), 98 deletions(-) diff --git a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java index ea7a1d4d0..4032f51ad 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java @@ -6,6 +6,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -23,7 +24,7 @@ public class ServiceEndpoint implements Externalizable { private static final long serialVersionUID = 1L; private String id; - private Address address; + private List
addresses; private Set contentTypes; private Map tags; private Collection serviceRegistrations; @@ -38,7 +39,8 @@ public ServiceEndpoint() {} private ServiceEndpoint(Builder builder) { this.id = Objects.requireNonNull(builder.id, "ServiceEndpoint.id is required"); - this.address = Objects.requireNonNull(builder.address, "ServiceEndpoint.address is required"); + this.addresses = + Objects.requireNonNull(builder.addresses, "ServiceEndpoint.addresses is required"); this.contentTypes = Collections.unmodifiableSet(new HashSet<>(builder.contentTypes)); this.tags = Collections.unmodifiableMap(new HashMap<>(builder.tags)); this.serviceRegistrations = @@ -57,8 +59,8 @@ public String id() { return id; } - public Address address() { - return address; + public List
addresses() { + return addresses; } public Set contentTypes() { @@ -88,7 +90,7 @@ public Collection serviceReferences() { public String toString() { return new StringJoiner(", ", ServiceEndpoint.class.getSimpleName() + "[", "]") .add("id=" + id) - .add("address=" + address) + .add("addresses=" + addresses) .add("contentTypes=" + contentTypes) .add("tags=" + tags) .add("serviceRegistrations(" + serviceRegistrations.size() + ")") @@ -100,8 +102,11 @@ public void writeExternal(ObjectOutput out) throws IOException { // id out.writeUTF(id); - // address - out.writeUTF(address.toString()); + // addresses + out.writeInt(addresses.size()); + for (Address address : addresses) { + out.writeUTF(address.toString()); + } // contentTypes out.writeInt(contentTypes.size()); @@ -128,8 +133,12 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept // id id = in.readUTF(); - // address - address = Address.from(in.readUTF()); + // addresses + final int addressesSize = in.readInt(); + addresses = new ArrayList<>(addressesSize); + for (int i = 0; i < addressesSize; i++) { + addresses.add(Address.from(in.readUTF())); + } // contentTypes int contentTypesSize = in.readInt(); @@ -161,7 +170,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept public static class Builder { private String id; - private Address address = Address.NULL_ADDRESS; + private List
addresses = new ArrayList<>(); private Set contentTypes = Collections.emptySet(); private Map tags = Collections.emptyMap(); private Collection serviceRegistrations = new ArrayList<>(); @@ -170,7 +179,7 @@ private Builder() {} private Builder(ServiceEndpoint other) { this.id = other.id; - this.address = other.address; + this.addresses = new ArrayList<>(other.addresses); this.contentTypes = new HashSet<>(other.contentTypes); this.tags = new HashMap<>(other.tags); this.serviceRegistrations = new ArrayList<>(other.serviceRegistrations); @@ -181,8 +190,12 @@ public Builder id(String id) { return this; } - public Builder address(Address address) { - this.address = Objects.requireNonNull(address, "address"); + public Builder addresses(Address... addresses) { + return addresses(Arrays.asList(addresses)); + } + + public Builder addresses(List
addresses) { + this.addresses = new ArrayList<>(Objects.requireNonNull(addresses, "addresses")); return this; } diff --git a/services-api/src/main/java/io/scalecube/services/ServiceReference.java b/services-api/src/main/java/io/scalecube/services/ServiceReference.java index 57238479e..7f35c3a13 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceReference.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceReference.java @@ -2,8 +2,10 @@ import io.scalecube.net.Address; import io.scalecube.services.api.Qualifier; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.StringJoiner; @@ -20,7 +22,7 @@ public class ServiceReference { private final Set contentTypes; private final Map tags; private final String action; - private final Address address; + private final List
addresses; private final boolean isSecured; /** @@ -40,7 +42,7 @@ public ServiceReference( this.tags = mergeTags(serviceMethodDefinition, serviceRegistration, serviceEndpoint); this.action = serviceMethodDefinition.action(); this.qualifier = Qualifier.asString(namespace, action); - this.address = serviceEndpoint.address(); + this.addresses = new ArrayList<>(serviceEndpoint.addresses()); this.isSecured = serviceMethodDefinition.isSecured(); } @@ -72,8 +74,8 @@ public String action() { return action; } - public Address address() { - return this.address; + public List
addresses() { + return addresses; } public boolean isSecured() { @@ -95,7 +97,7 @@ private Map mergeTags( public String toString() { return new StringJoiner(", ", ServiceReference.class.getSimpleName() + "[", "]") .add("endpointId=" + endpointId) - .add("address=" + address) + .add("addresses=" + addresses) .add("qualifier=" + qualifier) .add("contentTypes=" + contentTypes) .add("tags=" + tags) diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java index 4979ff48c..1c8d76e2c 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java @@ -28,7 +28,7 @@ public StaticAddressRouter(Address address) { new ServiceMethodDefinition(UUID.randomUUID().toString()), new ServiceRegistration( UUID.randomUUID().toString(), Collections.emptyMap(), Collections.emptyList()), - ServiceEndpoint.builder().id(UUID.randomUUID().toString()).address(address).build()); + ServiceEndpoint.builder().id(UUID.randomUUID().toString()).addresses(address).build()); } @Override diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java index c339c4ad4..854f6c432 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java @@ -8,27 +8,23 @@ import io.scalecube.services.transport.api.ClientChannel; import java.lang.reflect.Type; import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.channel.AbortedException; public class RSocketClientChannel implements ClientChannel { - private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientChannel.class); - - private final Mono rsocket; + private final Mono promise; private final ServiceMessageCodec messageCodec; - public RSocketClientChannel(Mono rsocket, ServiceMessageCodec codec) { - this.rsocket = rsocket; + public RSocketClientChannel(Mono promise, ServiceMessageCodec codec) { + this.promise = promise; this.messageCodec = codec; } @Override public Mono requestResponse(ServiceMessage message, Type responseType) { - return rsocket + return promise .flatMap(rsocket -> rsocket.requestResponse(toPayload(message))) .map(this::toMessage) .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) @@ -37,7 +33,7 @@ public Mono requestResponse(ServiceMessage message, Type respons @Override public Flux requestStream(ServiceMessage message, Type responseType) { - return rsocket + return promise .flatMapMany(rsocket -> rsocket.requestStream(toPayload(message))) .map(this::toMessage) .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) @@ -47,7 +43,7 @@ public Flux requestStream(ServiceMessage message, Type responseT @Override public Flux requestChannel( Publisher publisher, Type responseType) { - return rsocket + return promise .flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload))) .map(this::toMessage) .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java index 99e97f95d..ac3ceeafe 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java @@ -21,8 +21,10 @@ import io.scalecube.utils.MaskUtil; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -31,7 +33,7 @@ public class RSocketClientTransport implements ClientTransport { private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class); - private final ThreadLocal>> rsockets = + private final ThreadLocal>> connections = ThreadLocal.withInitial(ConcurrentHashMap::new); private final CredentialsSupplier credentialsSupplier; @@ -64,17 +66,72 @@ public RSocketClientTransport( @Override public ClientChannel create(ServiceReference serviceReference) { - final Map> monoMap = rsockets.get(); // keep reference for threadsafety - final Address address = serviceReference.address(); - Mono mono = - monoMap.computeIfAbsent( - address, + final String endpointId = serviceReference.endpointId(); + final Map> connections = this.connections.get(); + + Mono promise = + connections.computeIfAbsent( + endpointId, key -> - getCredentials(serviceReference) - .flatMap(creds -> connect(key, creds, monoMap)) + connect(serviceReference, connections) .cache() - .doOnError(ex -> monoMap.remove(key))); - return new RSocketClientChannel(mono, new ServiceMessageCodec(headersCodec, dataCodecs)); + .doOnError(ex -> connections.remove(key))); + + return new RSocketClientChannel(promise, new ServiceMessageCodec(headersCodec, dataCodecs)); + } + + private Mono connect( + ServiceReference serviceReference, Map> connections) { + return Mono.defer( + () -> { + final String endpointId = serviceReference.endpointId(); + final List
addresses = serviceReference.addresses(); + final AtomicInteger currentIndex = new AtomicInteger(0); + + return Mono.defer( + () -> { + final Address address = addresses.get(currentIndex.get()); + return connect(serviceReference, connections, address, endpointId); + }) + .doOnError(ex -> currentIndex.incrementAndGet()) + .retry(addresses.size() - 1) + .doOnError( + th -> + LOGGER.warn( + "Failed to connect ({}/{}), cause: {}", + endpointId, + addresses, + th.toString())); + }); + } + + private Mono connect( + ServiceReference serviceReference, + Map> connections, + Address address, + String endpointId) { + return getCredentials(serviceReference) + .flatMap( + creds -> + RSocketConnector.create() + .payloadDecoder(PayloadDecoder.DEFAULT) + .setupPayload(encodeConnectionSetup(new ConnectionSetup(creds))) + .connect(() -> clientTransportFactory.clientTransport(address))) + .doOnSuccess( + rsocket -> { + LOGGER.debug("[{}] Connected successfully", address); + // Setup shutdown hook + rsocket + .onClose() + .doFinally( + s -> { + connections.remove(endpointId); + LOGGER.debug("[{}] Connection closed", address); + }) + .doOnError( + th -> LOGGER.warn("[{}] Exception on close: {}", address, th.toString())) + .subscribe(); + }); } private Mono> getCredentials(ServiceReference serviceReference) { @@ -103,37 +160,6 @@ private Mono> getCredentials(ServiceReference serviceReferen }); } - private Mono connect( - Address address, Map creds, Map> monoMap) { - return RSocketConnector.create() - .payloadDecoder(PayloadDecoder.DEFAULT) - .setupPayload(encodeConnectionSetup(new ConnectionSetup(creds))) - .connect(() -> clientTransportFactory.clientTransport(address)) - .doOnSuccess( - rsocket -> { - LOGGER.debug("[rsocket][client][{}] Connected successfully", address); - // setup shutdown hook - rsocket - .onClose() - .doFinally( - s -> { - monoMap.remove(address); - LOGGER.debug("[rsocket][client][{}] Connection closed", address); - }) - .doOnError( - th -> - LOGGER.warn( - "[rsocket][client][{}][onClose] Exception occurred: {}", - address, - th.toString())) - .subscribe(); - }) - .doOnError( - th -> - LOGGER.warn( - "[rsocket][client][{}] Failed to connect, cause: {}", address, th.toString())); - } - private Payload encodeConnectionSetup(ConnectionSetup connectionSetup) { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); try { diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index c437dd52d..44ef52cbd 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -136,8 +136,7 @@ public final class Microservices implements AutoCloseable { private final Sinks.One shutdown = Sinks.one(); private final Sinks.One onShutdown = Sinks.one(); private ServiceEndpoint serviceEndpoint; - private final String externalHost; - private final Integer externalPort; + private final List externalHosts; private Microservices(Builder builder) { this.tags = Collections.unmodifiableMap(new HashMap<>(builder.tags)); @@ -152,8 +151,7 @@ private Microservices(Builder builder) { this.defaultDataDecoder = builder.defaultDataDecoder; this.defaultContentType = builder.defaultContentType; this.defaultPrincipalMapper = builder.defaultPrincipalMapper; - this.externalHost = builder.externalHost; - this.externalPort = builder.externalPort; + this.externalHosts = new ArrayList<>(builder.externalHosts); // Setup cleanup shutdown @@ -182,28 +180,26 @@ private Mono start() { .flatMap( transportBootstrap -> { final ServiceCall serviceCall = call(); - final Address serviceAddress = transportBootstrap.transportAddress; - final ServiceEndpoint.Builder serviceEndpointBuilder = + final ServiceEndpoint.Builder builder = ServiceEndpoint.builder() .id(id) - .address(serviceAddress) .contentTypes(DataCodec.getAllContentTypes()) .tags(tags); // invoke service providers and register services - List serviceInstances = + final List serviceInstances = serviceProviders.stream() .flatMap(serviceProvider -> serviceProvider.provide(serviceCall).stream()) .peek(this::registerService) .peek( serviceInfo -> - serviceEndpointBuilder.appendServiceRegistrations( + builder.appendServiceRegistrations( ServiceScanner.scanServiceInfo(serviceInfo))) .map(ServiceInfo::serviceInstance) .collect(Collectors.toList()); - serviceEndpoint = newServiceEndpoint(serviceEndpointBuilder.build()); + serviceEndpoint = newServiceEndpoint(builder); return concludeDiscovery( this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint)) @@ -217,18 +213,16 @@ this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint)) .onErrorResume(ex -> Mono.defer(this::shutdown).then(Mono.error(ex))); } - private ServiceEndpoint newServiceEndpoint(ServiceEndpoint serviceEndpoint) { - ServiceEndpoint.Builder builder = ServiceEndpoint.from(serviceEndpoint); + private ServiceEndpoint newServiceEndpoint(ServiceEndpoint.Builder builder) { + final List
addresses = new ArrayList<>(); - int port = Optional.ofNullable(externalPort).orElse(serviceEndpoint.address().port()); + addresses.add(transportBootstrap.transportAddress); - // calculate local service endpoint address - Address newAddress = - Optional.ofNullable(externalHost) - .map(host -> Address.create(host, port)) - .orElseGet(() -> Address.create(serviceEndpoint.address().host(), port)); + for (String externalHost : externalHosts) { + addresses.add(Address.create(externalHost, transportBootstrap.transportAddress.port())); + } - return builder.address(newAddress).build(); + return builder.addresses(addresses).build(); } private Mono startGateway(GatewayOptions options) { @@ -381,10 +375,10 @@ public static final class Builder { .orElse((message, dataType) -> message); private String defaultContentType = ServiceMessage.DEFAULT_DATA_FORMAT; private PrincipalMapper defaultPrincipalMapper = null; - private String externalHost; - private Integer externalPort; + private List externalHosts = new ArrayList<>(); public Mono start() { + //noinspection resource return Mono.defer(() -> new Microservices(this).start()); } @@ -421,13 +415,12 @@ public Builder services(ServiceProvider serviceProvider) { return this; } - public Builder externalHost(String externalHost) { - this.externalHost = externalHost; - return this; + public Builder externalHosts(String... externalHosts) { + return externalHosts(Arrays.asList(externalHosts)); } - public Builder externalPort(Integer externalPort) { - this.externalPort = externalPort; + public Builder externalHosts(List externalHosts) { + this.externalHosts = new ArrayList<>(externalHosts); return this; } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java index aa3f5c7f1..2d297f120 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java @@ -217,7 +217,7 @@ private static Optional route( new ServiceRegistration("ns", Collections.emptyMap(), Collections.emptyList()), ServiceEndpoint.builder() .id(UUID.randomUUID().toString()) - .address(provider.serviceAddress()) + .addresses(provider.serviceAddress()) .build())); } } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index 668722b7f..9a8774c78 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -294,7 +294,7 @@ private static Optional route( new ServiceRegistration("ns", Collections.emptyMap(), Collections.emptyList()), ServiceEndpoint.builder() .id(UUID.randomUUID().toString()) - .address(provider.serviceAddress()) + .addresses(provider.serviceAddress()) .build())); }