diff --git a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java index 7ef739c8048..7edc4c507af 100644 --- a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java @@ -41,6 +41,9 @@ @Internal public abstract class BootstrapperImpl extends Bootstrapper { + public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK = + "GRPC_EXPERIMENTAL_XDS_FALLBACK"; + // Client features. @VisibleForTesting public static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING = @@ -59,11 +62,18 @@ protected BootstrapperImpl() { logger = XdsLogger.withLogId(InternalLogId.allocate("bootstrapper", null)); } + // Delayed initialization of xdsFallbackEnabled to allow for flag initialization. + private static boolean isEnabledXdsFallback() { + return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, false); + } + protected abstract String getJsonContent() throws IOException, XdsInitializationException; protected abstract Object getImplSpecificConfig(Map serverConfig, String serverUri) throws XdsInitializationException; + + /** * Reads and parses bootstrap config. The config is expected to be in JSON format. */ @@ -102,6 +112,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map rawData) throw new XdsInitializationException("Invalid bootstrap: 'xds_servers' does not exist."); } List servers = parseServerInfos(rawServerConfigs, logger); + if (servers.size() > 1 && !isEnabledXdsFallback()) { + servers = ImmutableList.of(servers.get(0)); + } builder.servers(servers); Node.Builder nodeBuilder = Node.newBuilder(); @@ -208,6 +221,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map rawData) if (rawAuthorityServers == null || rawAuthorityServers.isEmpty()) { authorityServers = servers; } else { + if (rawAuthorityServers.size() > 1 && !isEnabledXdsFallback()) { + rawAuthorityServers = ImmutableList.of(rawAuthorityServers.get(0)); + } authorityServers = parseServerInfos(rawAuthorityServers, logger); } authorityInfoMapBuilder.put( diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 3074d1120ad..25056789f1d 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -16,7 +16,6 @@ package io.grpc.xds.client; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -37,10 +36,8 @@ import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.EnvoyProtoData.Node; import io.grpc.xds.client.XdsClient.ProcessingTracker; -import io.grpc.xds.client.XdsClient.ResourceStore; import io.grpc.xds.client.XdsClient.XdsResponseHandler; import io.grpc.xds.client.XdsLogger.XdsLogLevel; -import io.grpc.xds.client.XdsTransportFactory.EventHandler; import io.grpc.xds.client.XdsTransportFactory.StreamingCall; import io.grpc.xds.client.XdsTransportFactory.XdsTransport; import java.util.Collection; @@ -67,12 +64,11 @@ final class ControlPlaneClient { private final ServerInfo serverInfo; private final XdsTransport xdsTransport; private final XdsResponseHandler xdsResponseHandler; - private final ResourceStore resourceStore; + private final XdsClient.ResourceStore resourceStore; private final ScheduledExecutorService timeService; private final BackoffPolicy.Provider backoffPolicyProvider; private final Stopwatch stopwatch; private final Node bootstrapNode; - private final XdsClient xdsClient; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -80,13 +76,16 @@ final class ControlPlaneClient { private final Map, String> versions = new HashMap<>(); private boolean shutdown; + private boolean hasBeenActive; + private boolean lastStateWasReady; + private boolean startingUp; @Nullable private AdsStream adsStream; @Nullable private BackoffPolicy retryBackoffPolicy; @Nullable private ScheduledHandle rpcRetryTimer; - private MessagePrettyPrinter messagePrinter; + private final MessagePrettyPrinter messagePrinter; /** An entity that manages ADS RPCs over a single channel. */ ControlPlaneClient( @@ -94,13 +93,12 @@ final class ControlPlaneClient { ServerInfo serverInfo, Node bootstrapNode, XdsResponseHandler xdsResponseHandler, - ResourceStore resourceStore, + XdsClient.ResourceStore resourceStore, ScheduledExecutorService timeService, SynchronizationContext syncContext, BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier, - XdsClient xdsClient, MessagePrettyPrinter messagePrinter) { this.serverInfo = checkNotNull(serverInfo, "serverInfo"); this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport"); @@ -110,7 +108,6 @@ final class ControlPlaneClient { this.timeService = checkNotNull(timeService, "timeService"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.xdsClient = checkNotNull(xdsClient, "xdsClient"); this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter"); stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); logId = InternalLogId.allocate("xds-client", serverInfo.target()); @@ -140,6 +137,10 @@ public String toString() { return logId.toString(); } + public ServerInfo getServerInfo() { + return serverInfo; + } + /** * Updates the resource subscription for the given resource type. */ @@ -150,7 +151,15 @@ void adjustResourceSubscription(XdsResourceType resourceType) { } if (adsStream == null) { startRpcStream(); + // when the stream becomes ready, it will send the discovery requests + return; + } + + // We will do the rest of the method as part of the readyHandler when the stream is ready. + if (!lastStateWasReady) { + return; } + Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); if (resources == null) { resources = Collections.emptyList(); @@ -200,7 +209,7 @@ void nackResponse(XdsResourceType type, String nonce, String errorDetail) { */ // Must be synchronized. boolean isInBackoff() { - return rpcRetryTimer != null && rpcRetryTimer.isPending(); + return rpcRetryTimer != null || (hasBeenActive && !lastStateWasReady); } // Must be synchronized. @@ -208,6 +217,14 @@ boolean isReady() { return adsStream != null && adsStream.call != null && adsStream.call.isReady(); } + boolean isResponseRecieved() { + return adsStream != null && adsStream.responseReceived; + } + + boolean isConnected() { + return lastStateWasReady; + } + /** * Starts a timer for each requested resource that hasn't been responded to and * has been waiting for the channel to get ready. @@ -218,12 +235,22 @@ void readyHandler() { return; } - if (isInBackoff()) { + if (rpcRetryTimer != null) { rpcRetryTimer.cancel(); rpcRetryTimer = null; } - xdsClient.startSubscriberTimersIfNeeded(serverInfo); + hasBeenActive = true; + if (!lastStateWasReady) { + lastStateWasReady = true; + xdsResponseHandler.handleStreamRestarted(serverInfo); + } + } + + void connect() { + if (adsStream == null) { + startRpcStream(); + } } /** @@ -234,27 +261,63 @@ void readyHandler() { private void startRpcStream() { checkState(adsStream == null, "Previous adsStream has not been cleared yet"); adsStream = new AdsStream(); + adsStream.start(); logger.log(XdsLogLevel.INFO, "ADS stream started"); stopwatch.reset().start(); } + void sendDiscoveryRequests() { + if (adsStream == null) { + startRpcStream(); + // when the stream becomes ready, it will send the discovery requests + return; + } + + if (isConnected()) { + adjustAllResourceSubscriptions(); + } + } + + void adjustAllResourceSubscriptions() { + if (isInBackoff()) { + return; + } + + Set> subscribedResourceTypes = + new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values()); + + for (XdsResourceType type : subscribedResourceTypes) { + adjustResourceSubscription(type); + } + } + + public boolean isStartingUp() { + return startingUp; + } + + void markStartingUp() { + startingUp = true; + } + + public void startingUpCompleted() { + startingUp = false; + } + @VisibleForTesting public final class RpcRetryTask implements Runnable { @Override public void run() { - if (shutdown) { + logger.log(XdsLogLevel.DEBUG, "Retry timeout. Restart ADS stream {0}", logId); + if (shutdown || isReady()) { return; } - startRpcStream(); - Set> subscribedResourceTypes = - new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values()); - for (XdsResourceType type : subscribedResourceTypes) { - Collection resources = resourceStore.getSubscribedResources(serverInfo, type); - if (resources != null) { - adsStream.sendDiscoveryRequest(type, resources); - } + + if (adsStream == null) { + startingUp = true; + startRpcStream(); } - xdsResponseHandler.handleStreamRestarted(serverInfo); + + // handling CPC management is triggered in readyHandler } } @@ -264,7 +327,7 @@ XdsResourceType fromTypeUrl(String typeUrl) { return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl); } - private class AdsStream implements EventHandler { + private class AdsStream implements XdsTransportFactory.EventHandler { private boolean responseReceived; private boolean closed; // Response nonce for the most recently received discovery responses of each resource type. @@ -281,6 +344,9 @@ private class AdsStream implements EventHandler { private AdsStream() { this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(), methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller()); + } + + void start() { call.start(this); } @@ -326,6 +392,11 @@ final void sendDiscoveryRequest(XdsResourceType type, Collection reso @Override public void onReady() { + logger.log(XdsLogLevel.DEBUG, "ADS stream ready {0}", logId); + if (shutdown || closed) { + return; + } + syncContext.execute(ControlPlaneClient.this::readyHandler); } @@ -357,12 +428,10 @@ public void run() { @Override public void onStatusReceived(final Status status) { + boolean inRetry = hasBeenActive && !lastStateWasReady && isStartingUp(); + lastStateWasReady = false; syncContext.execute(() -> { - if (status.isOk()) { - handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); - } else { - handleRpcStreamClosed(status); - } + handleRpcStreamClosed(status, inRetry); }); } @@ -381,8 +450,8 @@ final void handleRpcResponse(XdsResourceType type, String versionInfo, ListMust be synchronized. + */ void handleStreamRestarted(ServerInfo serverInfo); } public interface ResourceStore { + /** - * Returns the collection of resources currently subscribing to or {@code null} if not - * subscribing to any resources for the given type. + * Returns the collection of resources currently subscribing to with the specified authority + * or {@code null} if not subscribing to any resources for this authority for the given type. * *

Note an empty collection indicates subscribing to resources of the given type with * wildcard mode. + * + * @param serverInfo the xds server to get the resources from + * @param type the type of the resources that should be retrieved */ - // Must be synchronized. @Nullable - Collection getSubscribedResources(ServerInfo serverInfo, - XdsResourceType type); + Collection getSubscribedResources( + ServerInfo serverInfo, XdsResourceType type); Map> getSubscribedResourceTypesWithTypeUrl(); } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 79147cd9862..320a6adf34a 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME; +import static io.grpc.xds.client.ControlPlaneClient.CLOSED_BY_SERVER; import static io.grpc.xds.client.XdsResourceType.ParsedResource; import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate; @@ -26,8 +27,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; @@ -40,13 +41,14 @@ import io.grpc.internal.TimeProvider; import io.grpc.xds.client.Bootstrapper.AuthorityInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; -import io.grpc.xds.client.XdsClient.ResourceStore; -import io.grpc.xds.client.XdsClient.XdsResponseHandler; import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -54,13 +56,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** * XdsClient implementation. */ @Internal -public final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore { +public final class XdsClientImpl extends XdsClient implements XdsClient.ResourceStore { // Longest time to wait, since the subscription to some resource, for concluding its absence. @VisibleForTesting @@ -74,21 +77,25 @@ public void uncaughtException(Thread t, Throwable e) { XdsLogLevel.ERROR, "Uncaught exception in XdsClient SynchronizationContext. Panic!", e); - // TODO(chengyuanzhang): better error handling. + // TODO: better error handling. throw new AssertionError(e); } }); - private final Map loadStatsManagerMap = - new HashMap<>(); - final Map serverLrsClientMap = - new HashMap<>(); - + private final Map loadStatsManagerMap = new HashMap<>(); + final Map serverLrsClientMap = new HashMap<>(); + /** Map of authority to its activated control plane client (affected by xds fallback). + * The last entry in the list for each value is the "active" CPC for the matching key */ + private final Map> activatedCpClients = new HashMap<>(); private final Map serverCpClientMap = new HashMap<>(); + + /** Maps resource type to the corresponding map of subscribers (keyed by resource name). */ private final Map, Map>> resourceSubscribers = new HashMap<>(); + /** Maps typeUrl to the corresponding XdsResourceType. */ private final Map> subscribedResourceTypeUrls = new HashMap<>(); + private final XdsTransportFactory xdsTransportFactory; private final Bootstrapper.BootstrapInfo bootstrapInfo; private final ScheduledExecutorService timeService; @@ -123,48 +130,6 @@ public XdsClientImpl( logger.log(XdsLogLevel.INFO, "Created"); } - @Override - public void handleResourceResponse( - XdsResourceType xdsResourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce, ProcessingTracker processingTracker) { - checkNotNull(xdsResourceType, "xdsResourceType"); - syncContext.throwIfNotInThisSynchronizationContext(); - Set toParseResourceNames = - xdsResourceType.shouldRetrieveResourceKeysForArgs() - ? getResourceKeys(xdsResourceType) - : null; - XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce, - bootstrapInfo, securityConfig, toParseResourceNames); - handleResourceUpdate(args, resources, xdsResourceType, processingTracker); - } - - @Override - public void handleStreamClosed(Status error) { - syncContext.throwIfNotInThisSynchronizationContext(); - cleanUpResourceTimers(); - for (Map> subscriberMap : - resourceSubscribers.values()) { - for (ResourceSubscriber subscriber : subscriberMap.values()) { - if (!subscriber.hasResult()) { - subscriber.onError(error, null); - } - } - } - } - - @Override - public void handleStreamRestarted(ServerInfo serverInfo) { - syncContext.throwIfNotInThisSynchronizationContext(); - for (Map> subscriberMap : - resourceSubscribers.values()) { - for (ResourceSubscriber subscriber : subscriberMap.values()) { - if (subscriber.serverInfo.equals(serverInfo)) { - subscriber.restartTimer(); - } - } - } - } - @Override public void shutdown() { syncContext.execute( @@ -181,7 +146,8 @@ public void run() { for (final LoadReportClient lrsClient : serverLrsClientMap.values()) { lrsClient.stopLoadReporting(); } - cleanUpResourceTimers(); + cleanUpResourceTimers(null); + activatedCpClients.clear(); } }); } @@ -196,19 +162,42 @@ public Map> getSubscribedResourceTypesWithTypeUrl() { return Collections.unmodifiableMap(subscribedResourceTypeUrls); } + private ControlPlaneClient getActiveCpc(String authority) { + List controlPlaneClients = activatedCpClients.get(authority); + if (controlPlaneClients == null || controlPlaneClients.isEmpty()) { + return null; + } + + return controlPlaneClients.get(controlPlaneClients.size() - 1); + } + + @VisibleForTesting + public Object getActiveCpcForTest(String authority) { + return getActiveCpc(authority); + } + @Nullable @Override - public Collection getSubscribedResources(ServerInfo serverInfo, - XdsResourceType type) { + public Collection getSubscribedResources( + ServerInfo serverInfo, XdsResourceType type) { + ControlPlaneClient targetCpc = serverCpClientMap.get(serverInfo); + if (targetCpc == null) { + return null; + } + + Set authorities = activatedCpClients.entrySet().stream() + .filter(entry -> entry.getValue().contains(targetCpc)) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + Map> resources = resourceSubscribers.getOrDefault(type, Collections.emptyMap()); - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (String key : resources.keySet()) { - if (resources.get(key).serverInfo.equals(serverInfo)) { - builder.add(key); - } - } - Collection retVal = builder.build(); + + Collection retVal = resources.entrySet().stream() + .filter(entry -> authorities.contains(entry.getValue().authority)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + return retVal.isEmpty() ? null : retVal; } @@ -225,7 +214,7 @@ public void run() { // A map from a "resource type" to a map ("resource name": "resource metadata") ImmutableMap.Builder, Map> metadataSnapshot = ImmutableMap.builder(); - for (XdsResourceType resourceType: resourceSubscribers.keySet()) { + for (XdsResourceType resourceType : resourceSubscribers.keySet()) { ImmutableMap.Builder metadataMap = ImmutableMap.builder(); for (Map.Entry> resourceEntry : resourceSubscribers.get(resourceType).entrySet()) { @@ -246,9 +235,9 @@ public Object getSecurityConfig() { @Override public void watchXdsResource(XdsResourceType type, - String resourceName, - ResourceWatcher watcher, - Executor watcherExecutor) { + String resourceName, + ResourceWatcher watcher, + Executor watcherExecutor) { syncContext.execute(new Runnable() { @Override @SuppressWarnings("unchecked") @@ -259,35 +248,119 @@ public void run() { } ResourceSubscriber subscriber = (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); + if (subscriber == null) { logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName); subscriber = new ResourceSubscriber<>(type, resourceName); resourceSubscribers.get(type).put(resourceName, subscriber); - if (subscriber.controlPlaneClient != null) { - subscriber.controlPlaneClient.adjustResourceSubscription(type); + subscriber.addWatcher(watcher, watcherExecutor); + if (subscriber.errorDescription != null) { + logger.log(XdsLogLevel.WARNING, + "Attempt to watch resource {0}/{1} which has no Xds servers", + type.typeName(), resourceName); + return; + } + + CpcWithFallbackState cpcToUse = manageControlPlaneClient(subscriber); + if (cpcToUse.cpc != null) { + subscriber.restartTimer(); } + } else { + subscriber.addWatcher(watcher, watcherExecutor); } - subscriber.addWatcher(watcher, watcherExecutor); } }); } + /** + * Gets a ControlPlaneClient for the subscriber's authority, creating one if necessary. + * If there already was an active CPC for this authority, and it is different from the one + * identified, then do fallback to the identified one (cpcToUse). + * + * @return identified CPC or {@code null} (if there are no valid ServerInfos associated with the + * subscriber's authority or CPC's for all are in backoff), and whether did a fallback. + */ + private CpcWithFallbackState manageControlPlaneClient( + ResourceSubscriber subscriber) { + + ControlPlaneClient cpcToUse; + boolean didFallback = false; + try { + cpcToUse = getOrCreateControlPlaneClient(subscriber.authority); + } catch (IllegalArgumentException e) { + subscriber.errorDescription = "Bad configuration: " + e.getMessage(); + + subscriber.onError( + Status.INVALID_ARGUMENT.withDescription(subscriber.errorDescription), null); + return new CpcWithFallbackState(null, false); + } catch (IOException e) { + logger.log(XdsLogLevel.WARNING, + "Could not create a control plane client for authority {0}", + subscriber.authority); + return new CpcWithFallbackState(null, false); + } + + if (!cpcToUse.isConnected()) { + cpcToUse.connect(); // Make sure we are at least trying to connect + } + + ControlPlaneClient activeCpClient = getActiveCpc(subscriber.authority); + if (cpcToUse != activeCpClient) { + if (activeCpClient != null) { + didFallback = fallBackToCpc(cpcToUse, subscriber.authority, activeCpClient); + } + + addCpcToAuthority(subscriber.authority, cpcToUse); // makes it active + } + cpcToUse.adjustResourceSubscription(subscriber.type); + + return new CpcWithFallbackState(cpcToUse, didFallback); + } + + private void addCpcToAuthority(String authority, ControlPlaneClient cpcToUse) { + List controlPlaneClients = + activatedCpClients.computeIfAbsent(authority, k -> new ArrayList<>()); + + if (controlPlaneClients.contains(cpcToUse)) { + return; + } + + // if there are any missing CPCs between the last one and cpcToUse, add them + add cpcToUse + ImmutableList serverInfos = getServerInfos(authority); + for (int i = controlPlaneClients.size(); i < serverInfos.size(); i++) { + ServerInfo serverInfo = serverInfos.get(i); + ControlPlaneClient cpc = serverCpClientMap.get(serverInfo); + controlPlaneClients.add(cpc); + logger.log(XdsLogLevel.DEBUG, "Adding control plane client {0} to authority {1}", + cpc, authority); + if (cpc == cpcToUse) { + break; + } + } + } + @Override public void cancelXdsResourceWatch(XdsResourceType type, - String resourceName, - ResourceWatcher watcher) { + String resourceName, + ResourceWatcher watcher) { syncContext.execute(new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { ResourceSubscriber subscriber = (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); + if (subscriber == null) { + logger.log(XdsLogLevel.WARNING, "double cancel of resource watch for {0}:{1}", + type.typeName(), resourceName); + return; + } subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.cancelResourceWatch(); resourceSubscribers.get(type).remove(resourceName); - if (subscriber.controlPlaneClient != null) { - subscriber.controlPlaneClient.adjustResourceSubscription(type); + ControlPlaneClient controlPlaneClient = getActiveCpc(subscriber.authority); + if (controlPlaneClient != null) { + controlPlaneClient.adjustResourceSubscription(type); } if (resourceSubscribers.get(type).isEmpty()) { resourceSubscribers.remove(type); @@ -341,28 +414,12 @@ public String toString() { return logId.toString(); } - @Override - protected void startSubscriberTimersIfNeeded(ServerInfo serverInfo) { - if (isShutDown()) { - return; + @VisibleForTesting + public boolean isCpcBlobConnected(Object cpcBlob) { + if (cpcBlob instanceof ControlPlaneClient) { + return ((ControlPlaneClient) cpcBlob).isConnected(); } - - syncContext.execute(new Runnable() { - @Override - public void run() { - if (isShutDown()) { - return; - } - - for (Map> subscriberMap : resourceSubscribers.values()) { - for (ResourceSubscriber subscriber : subscriberMap.values()) { - if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) { - subscriber.restartTimer(); - } - } - } - } - }); + throw new IllegalArgumentException("Blob must be a ControlPlaneClient"); } private Set getResourceKeys(XdsResourceType xdsResourceType) { @@ -373,33 +430,65 @@ private Set getResourceKeys(XdsResourceType xdsResourceType) { return resourceSubscribers.get(xdsResourceType).keySet(); } - private void cleanUpResourceTimers() { + // cpcForThisStream is null when doing shutdown + private void cleanUpResourceTimers(ControlPlaneClient cpcForThisStream) { + List authoritiesForCpc = getActiveAuthorities(cpcForThisStream); + for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { - subscriber.stopTimer(); + if (cpcForThisStream == null || authoritiesForCpc.contains(subscriber.authority)) { + subscriber.stopTimer(); + } + } + } + } + + private ControlPlaneClient getOrCreateControlPlaneClient(String authority) throws IOException { + // Optimize for the common case of a working ads stream already exists for the authority + ControlPlaneClient activeCpc = getActiveCpc(authority); + if (activeCpc != null && !activeCpc.isInBackoff()) { + return activeCpc; + } + + ImmutableList serverInfos = getServerInfos(authority); + if (serverInfos == null) { + throw new IllegalArgumentException("No xds servers found for authority " + authority); + } + + for (ServerInfo serverInfo : serverInfos) { + ControlPlaneClient cpc = getOrCreateControlPlaneClient(serverInfo); + if (cpc.isInBackoff()) { + continue; } + return cpc; } + + // Everything existed and is in backoff so throw + throw new IOException("All xds transports for authority " + authority + " are in backoff"); } - public ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) { + private ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) { syncContext.throwIfNotInThisSynchronizationContext(); if (serverCpClientMap.containsKey(serverInfo)) { return serverCpClientMap.get(serverInfo); } + logger.log(XdsLogLevel.DEBUG, "Creating control plane client for {0}", serverInfo.target()); XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo); + ControlPlaneClient controlPlaneClient = new ControlPlaneClient( xdsTransport, serverInfo, bootstrapInfo.node(), - this, + new ResponseHandler(serverInfo), this, timeService, syncContext, backoffPolicyProvider, stopwatchSupplier, - this, - messagePrinter); + messagePrinter + ); + serverCpClientMap.put(serverInfo, controlPlaneClient); LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier); @@ -418,21 +507,31 @@ public Map getServerLrsClientMap() { return ImmutableMap.copyOf(serverLrsClientMap); } - @Nullable - private ServerInfo getServerInfo(String resource) { + private String getAuthority(String resource) { + String authority; if (resource.startsWith(XDSTP_SCHEME)) { URI uri = URI.create(resource); - String authority = uri.getAuthority(); + authority = uri.getAuthority(); if (authority == null) { authority = ""; } + } else { + authority = null; + } + + return authority; + } + + @Nullable + private ImmutableList getServerInfos(String authority) { + if (authority != null) { AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority); if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) { return null; } - return authorityInfo.xdsServers().get(0); + return authorityInfo.xdsServers(); } else { - return bootstrapInfo.servers().get(0); // use first server + return bootstrapInfo.servers(); } } @@ -440,25 +539,31 @@ private ServerInfo getServerInfo(String resource) { private void handleResourceUpdate( XdsResourceType.Args args, List resources, XdsResourceType xdsResourceType, ProcessingTracker processingTracker) { + ControlPlaneClient controlPlaneClient = serverCpClientMap.get(args.serverInfo); + ValidatedResourceUpdate result = xdsResourceType.parse(args, resources); logger.log(XdsLogger.XdsLogLevel.INFO, "Received {0} Response version {1} nonce {2}. Parsed resources: {3}", - xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources); + xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources); Map> parsedResources = result.parsedResources; Set invalidResources = result.invalidResources; List errors = result.errors; String errorDetail = null; if (errors.isEmpty()) { checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); - serverCpClientMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo, + controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, args.nonce); + if (controlPlaneClient.isStartingUp()) { + controlPlaneClient.startingUpCompleted(); + } } else { errorDetail = Joiner.on('\n').join(errors); logger.log(XdsLogLevel.WARNING, "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail); - serverCpClientMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail); + controlPlaneClient.nackResponse(xdsResourceType, args.nonce, errorDetail); } + shutdownLowerPriorityCpcs(controlPlaneClient); long updateTime = timeProvider.currentTimeNanos(); Map> subscribedResources = @@ -496,60 +601,124 @@ private void handleResourceUpdate( // For State of the World services, notify watchers when their watched resource is missing // from the ADS update. Note that we can only do this if the resource update is coming from // the same xDS server that the ResourceSubscriber is subscribed to. - if (subscriber.serverInfo.equals(args.serverInfo)) { + if (activatedCpClients.get(subscriber.authority).contains(controlPlaneClient)) { subscriber.onAbsent(processingTracker); } } } + private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) { + // For each authority, remove any control plane clients, with lower priority than the activated + // one, from activatedCpClients storing them all in cpcsToShutdown. + Set cpcsToShutdown = new HashSet<>(); + for ( List cpcsForAuth : activatedCpClients.values()) { + if (cpcsForAuth == null) { + continue; + } + int index = cpcsForAuth.indexOf(activatedCpc); + if (index > -1) { + cpcsToShutdown.addAll(cpcsForAuth.subList(index + 1, cpcsForAuth.size())); + cpcsForAuth.subList(index + 1, cpcsForAuth.size()).clear(); // remove lower priority cpcs + } + } + + // Shutdown any lower priority control plane clients identified above that aren't still being + // used by another authority. If they are still being used let the XDS server know that we + // no longer are interested in subscriptions for authorities we are no longer responsible for. + for (ControlPlaneClient cpc : cpcsToShutdown) { + if (activatedCpClients.values().stream().noneMatch(list -> list.contains(cpc))) { + cpc.shutdown(); + serverCpClientMap.remove(cpc.getServerInfo()); + } else { + cpc.adjustAllResourceSubscriptions(); + } + } + } + + private boolean fallBackToCpc(ControlPlaneClient fallbackCpc, String authority, + ControlPlaneClient oldCpc) { + boolean didFallback = false; + if (fallbackCpc != null && ! fallbackCpc.isInBackoff()) { + logger.log(XdsLogLevel.INFO, "Falling back to XDS server {0}", + fallbackCpc.getServerInfo().target()); + + // Get authorities that aren't falling back + // If we don't already have a cached LDS resource, cache the current data value + + addCpcToAuthority(authority, fallbackCpc); + restartMatchingSubscriberTimers(authority); + fallbackCpc.sendDiscoveryRequests(); + didFallback = true; + } else { + logger.log(XdsLogLevel.WARNING, "No working fallback XDS Servers found from {0}", + oldCpc.getServerInfo().target()); + } + return didFallback; + } + + private void restartMatchingSubscriberTimers(String authority) { + // Restart the timers for all the watched resources that are associated with this stream + for (Map> subscriberMap : + resourceSubscribers.values()) { + for (ResourceSubscriber subscriber : subscriberMap.values()) { + if (!subscriber.hasResult() && Objects.equals(subscriber.authority, authority)) { + subscriber.restartTimer(); + } + } + } + } + + /** * Tracks a single subscribed resource. */ private final class ResourceSubscriber { - @Nullable private final ServerInfo serverInfo; - @Nullable private final ControlPlaneClient controlPlaneClient; + @Nullable + private final String authority; private final XdsResourceType type; private final String resource; private final Map, Executor> watchers = new HashMap<>(); - @Nullable private T data; + @Nullable + private T data; private boolean absent; // Tracks whether the deletion has been ignored per bootstrap server feature. // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md private boolean resourceDeletionIgnored; - @Nullable private ScheduledHandle respTimer; - @Nullable private ResourceMetadata metadata; - @Nullable private String errorDescription; + @Nullable + private ScheduledHandle respTimer; + @Nullable + private ResourceMetadata metadata; + @Nullable + private String errorDescription; ResourceSubscriber(XdsResourceType type, String resource) { syncContext.throwIfNotInThisSynchronizationContext(); this.type = type; this.resource = resource; - this.serverInfo = getServerInfo(resource); - if (serverInfo == null) { + this.authority = getAuthority(resource); + if (getServerInfos(authority) == null) { this.errorDescription = "Wrong configuration: xds server does not exist for resource " + resource; - this.controlPlaneClient = null; return; } + // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, // is created but not yet requested because the client is in backoff. this.metadata = ResourceMetadata.newResourceMetadataUnknown(); + } - ControlPlaneClient controlPlaneClient = null; - try { - controlPlaneClient = getOrCreateControlPlaneClient(serverInfo); - if (controlPlaneClient.isInBackoff()) { - return; - } - } catch (IllegalArgumentException e) { - controlPlaneClient = null; - this.errorDescription = "Bad configuration: " + e.getMessage(); - return; - } finally { - this.controlPlaneClient = controlPlaneClient; - } - - restartTimer(); + @Override + public String toString() { + return "ResourceSubscriber{" + + "resource='" + resource + '\'' + + ", authority='" + authority + '\'' + + ", type=" + type + + ", watchers=" + watchers.size() + + ", data=" + data + + ", absent=" + absent + + ", resourceDeletionIgnored=" + resourceDeletionIgnored + + ", errorDescription='" + errorDescription + '\'' + + '}'; } void addWatcher(ResourceWatcher watcher, Executor watcherExecutor) { @@ -570,7 +739,7 @@ void addWatcher(ResourceWatcher watcher, Executor watcherExecutor) { }); } - void removeWatcher(ResourceWatcher watcher) { + void removeWatcher(ResourceWatcher watcher) { checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher); watchers.remove(watcher); } @@ -579,7 +748,9 @@ void restartTimer() { if (data != null || absent) { // resource already resolved return; } - if (!controlPlaneClient.isReady()) { // When client becomes ready, it triggers a restartTimer + ControlPlaneClient activeCpc = getActiveCpc(authority); + if (activeCpc == null || !activeCpc.isReady()) { + // When client becomes ready, it triggers a restartTimer for all relevant subscribers. return; } @@ -601,6 +772,9 @@ public String toString() { // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. metadata = ResourceMetadata.newResourceMetadataRequested(); + if (respTimer != null) { + respTimer.cancel(); + } respTimer = syncContext.schedule( new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); @@ -624,8 +798,7 @@ void cancelResourceWatch() { message += " for which we previously ignored a deletion"; logLevel = XdsLogLevel.FORCE_INFO; } - logger.log(logLevel, message, type, resource, - serverInfo != null ? serverInfo.target() : "unknown"); + logger.log(logLevel, message, type, resource, getTarget()); } boolean isWatched() { @@ -650,7 +823,7 @@ void onData(ParsedResource parsedResource, String version, long updateTime, if (resourceDeletionIgnored) { logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version " + "of resource for which we previously ignored a deletion: type {1} name {2}", - serverInfo != null ? serverInfo.target() : "unknown", type, resource); + getTarget(), type, resource); resourceDeletionIgnored = false; } if (!Objects.equals(oldData, data)) { @@ -667,6 +840,13 @@ void onData(ParsedResource parsedResource, String version, long updateTime, } } + private String getTarget() { + ControlPlaneClient activeCpc = getActiveCpc(authority); + return (activeCpc != null) + ? activeCpc.getServerInfo().target() + : "unknown"; + } + void onAbsent(@Nullable ProcessingTracker processingTracker) { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; @@ -674,13 +854,14 @@ void onAbsent(@Nullable ProcessingTracker processingTracker) { // Ignore deletion of State of the World resources when this feature is on, // and the resource is reusable. + ControlPlaneClient activeCpc = getActiveCpc(authority); boolean ignoreResourceDeletionEnabled = - serverInfo != null && serverInfo.ignoreResourceDeletion(); + activeCpc != null && activeCpc.getServerInfo().ignoreResourceDeletion(); if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) { if (!resourceDeletionIgnored) { logger.log(XdsLogLevel.FORCE_WARNING, "xds server {0}: ignoring deletion for resource type {1} name {2}}", - serverInfo.target(), type, resource); + activeCpc.getServerInfo().target(), type, resource); resourceDeletionIgnored = true; } return; @@ -747,4 +928,128 @@ private void notifyWatcher(ResourceWatcher watcher, T update) { } } + private class ResponseHandler implements XdsResponseHandler { + final ServerInfo serverInfo; + + ResponseHandler(ServerInfo serverInfo) { + this.serverInfo = serverInfo; + } + + @Override + public void handleResourceResponse( + XdsResourceType xdsResourceType, ServerInfo serverInfo, String versionInfo, + List resources, String nonce, ProcessingTracker processingTracker) { + checkNotNull(xdsResourceType, "xdsResourceType"); + syncContext.throwIfNotInThisSynchronizationContext(); + Set toParseResourceNames = + xdsResourceType.shouldRetrieveResourceKeysForArgs() + ? getResourceKeys(xdsResourceType) + : null; + XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce, + bootstrapInfo, securityConfig, toParseResourceNames); + handleResourceUpdate(args, resources, xdsResourceType, processingTracker); + } + + @Override + public void handleStreamClosed(Status status, boolean inRetry) { + syncContext.throwIfNotInThisSynchronizationContext(); + + ControlPlaneClient cpcClosed = serverCpClientMap.get(serverInfo); + if (cpcClosed == null) { + return; + } + + if (!inRetry) { + cleanUpResourceTimers(cpcClosed); + } + if (cpcClosed.isResponseRecieved()) { + return; // Not considered an error + } + + Status error = status.isOk() ? Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER) : status; + boolean checkForFallback = !status.isOk() && !inRetry; + + List authoritiesForClosedCpc = getActiveAuthorities(cpcClosed); + for (Map> subscriberMap : + resourceSubscribers.values()) { + for (ResourceSubscriber subscriber : subscriberMap.values()) { + if (subscriber.hasResult() || !authoritiesForClosedCpc.contains(subscriber.authority)) { + continue; + } + + // try to fallback to lower priority control plane client + if (checkForFallback && manageControlPlaneClient(subscriber).didFallback) { + authoritiesForClosedCpc.remove(subscriber.authority); + if (authoritiesForClosedCpc.isEmpty()) { + return; // optimization: no need to continue once all authorities have done fallback + } + continue; // since we did fallback, don't consider it an error + } + + subscriber.onError(error, null); + } + } + } + + @Override + public void handleStreamRestarted(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); + + if (isShutDown()) { + return; + } + + ControlPlaneClient controlPlaneClient = serverCpClientMap.get(serverInfo); + if (controlPlaneClient == null) { + return; + } + + boolean needToSendDiscoveryRequests = false; + for (Map.Entry> me : activatedCpClients.entrySet()) { + if (me.getValue().contains(controlPlaneClient)) { + needToSendDiscoveryRequests |= internalHandleStreamReady(controlPlaneClient, me.getKey()); + } + } + if (needToSendDiscoveryRequests) { + controlPlaneClient.sendDiscoveryRequests(); + } + } + + private boolean internalHandleStreamReady(ControlPlaneClient readyCpc, String authority) { + ControlPlaneClient activeCpClient = getActiveCpc(authority); + + if (activeCpClient.isReady() + && !activatedCpClients.get(authority).contains(readyCpc)) { + logger.log(XdsLogLevel.INFO, "Ignoring stream restart for lower priority server {0}", + readyCpc.getServerInfo().target()); + return false; + } + + if (activeCpClient != readyCpc) { + readyCpc.markStartingUp(); + } + + restartMatchingSubscriberTimers(authority); + return true; + } + } + + private static class CpcWithFallbackState { + ControlPlaneClient cpc; + boolean didFallback; + + private CpcWithFallbackState(ControlPlaneClient cpc, boolean didFallback) { + this.cpc = cpc; + this.didFallback = didFallback; + } + } + + private List getActiveAuthorities(ControlPlaneClient cpc) { + return activatedCpClients.entrySet().stream() + .filter(entry -> !entry.getValue().isEmpty() + && cpc == entry.getValue().get(entry.getValue().size() - 1)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java index ec700bd6dc9..d6974f57d97 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsTransportFactory.java @@ -36,6 +36,7 @@ StreamingCall createStreamingCall( MethodDescriptor.Marshaller respMarshaller); void shutdown(); + } /** diff --git a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java index 1ddf9620434..5421504b35b 100644 --- a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java +++ b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java @@ -55,6 +55,7 @@ import io.grpc.InsecureServerCredentials; import io.grpc.NameResolverRegistry; import io.grpc.Server; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -86,9 +87,15 @@ public class ControlPlaneRule extends TestWatcher { private XdsTestControlPlaneService controlPlaneService; private XdsTestLoadReportingService loadReportingService; private XdsNameResolverProvider nameResolverProvider; + private final int port; public ControlPlaneRule() { + this(0); + } + + public ControlPlaneRule(int port) { serverHostName = "test-server"; + this.port = port; } public ControlPlaneRule setServerHostName(String serverHostName) { @@ -115,11 +122,7 @@ public Server getServer() { try { controlPlaneService = new XdsTestControlPlaneService(); loadReportingService = new XdsTestLoadReportingService(); - server = Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()) - .addService(controlPlaneService) - .addService(loadReportingService) - .build() - .start(); + createAndStartXdsServer(); } catch (Exception e) { throw new AssertionError("unable to start the control plane server", e); } @@ -144,6 +147,38 @@ public Server getServer() { NameResolverRegistry.getDefaultRegistry().deregister(nameResolverProvider); } + /** + * Will shutdown existing server if needed. + * Then creates a new server in the same way as {@link #starting(Description)} and starts it. + */ + public void restartTdServer() { + + if (getServer() != null && !getServer().isShutdown()) { + getServer().shutdownNow(); + try { + if (!getServer().awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } catch (InterruptedException e) { + throw new AssertionError("unable to shut down control plane server", e); + } + } + + try { + createAndStartXdsServer(); + } catch (Exception e) { + throw new AssertionError("unable to restart the control plane server", e); + } + } + + private void createAndStartXdsServer() throws IOException { + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(controlPlaneService) + .addService(loadReportingService) + .build() + .start(); + } + /** * For test purpose, use boostrapOverride to programmatically provide bootstrap info. */ @@ -173,44 +208,67 @@ void setLdsConfig(Listener serverListener, Listener clientListener) { } void setRdsConfig(RouteConfiguration routeConfiguration) { - getService().setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of(RDS_NAME, routeConfiguration)); + setRdsConfig(RDS_NAME, routeConfiguration); + } + + public void setRdsConfig(String rdsName, RouteConfiguration routeConfiguration) { + getService().setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of(rdsName, routeConfiguration)); } void setCdsConfig(Cluster cluster) { + setCdsConfig(CLUSTER_NAME, cluster); + } + + void setCdsConfig(String clusterName, Cluster cluster) { getService().setXdsConfig(ADS_TYPE_URL_CDS, - ImmutableMap.of(CLUSTER_NAME, cluster)); + ImmutableMap.of(clusterName, cluster)); } void setEdsConfig(ClusterLoadAssignment clusterLoadAssignment) { + setEdsConfig(EDS_NAME, clusterLoadAssignment); + } + + void setEdsConfig(String edsName, ClusterLoadAssignment clusterLoadAssignment) { getService().setXdsConfig(ADS_TYPE_URL_EDS, - ImmutableMap.of(EDS_NAME, clusterLoadAssignment)); + ImmutableMap.of(edsName, clusterLoadAssignment)); } /** * Builds a new default RDS configuration. */ static RouteConfiguration buildRouteConfiguration(String authority) { - io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHost = VirtualHost.newBuilder() + return buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME); + } + + static RouteConfiguration buildRouteConfiguration(String authority, String rdsName, + String clusterName) { + VirtualHost.Builder vhBuilder = VirtualHost.newBuilder() + .setName(rdsName) .addDomains(authority) .addRoutes( Route.newBuilder() .setMatch( RouteMatch.newBuilder().setPrefix("/").build()) .setRoute( - RouteAction.newBuilder().setCluster(CLUSTER_NAME).build()).build()).build(); - return RouteConfiguration.newBuilder().setName(RDS_NAME).addVirtualHosts(virtualHost).build(); + RouteAction.newBuilder().setCluster(clusterName).build()).build()); + VirtualHost virtualHost = vhBuilder.build(); + return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); } /** * Builds a new default CDS configuration. */ static Cluster buildCluster() { + return buildCluster(CLUSTER_NAME, EDS_NAME); + } + + static Cluster buildCluster(String clusterName, String edsName) { return Cluster.newBuilder() - .setName(CLUSTER_NAME) + .setName(clusterName) .setType(Cluster.DiscoveryType.EDS) .setEdsClusterConfig( Cluster.EdsClusterConfig.newBuilder() - .setServiceName(EDS_NAME) + .setServiceName(edsName) .setEdsConfig( ConfigSource.newBuilder() .setAds(AggregatedConfigSource.newBuilder().build()) @@ -224,6 +282,11 @@ static Cluster buildCluster() { * Builds a new default EDS configuration. */ static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, int port) { + return buildClusterLoadAssignment(hostName, port, EDS_NAME); + } + + static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, int port, + String edsName) { Address address = Address.newBuilder() .setSocketAddress( SocketAddress.newBuilder().setAddress(hostName).setPortValue(port).build()).build(); @@ -237,7 +300,7 @@ static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, int por .setHealthStatus(HealthStatus.HEALTHY) .build()).build(); return ClusterLoadAssignment.newBuilder() - .setClusterName(EDS_NAME) + .setClusterName(edsName) .addEndpoints(endpoints) .build(); } @@ -246,8 +309,17 @@ static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, int por * Builds a new client listener. */ static Listener buildClientListener(String name) { + return buildClientListener(name, "terminal-filter"); + } + + + static Listener buildClientListener(String name, String identifier) { + return buildClientListener(name, identifier, RDS_NAME); + } + + static Listener buildClientListener(String name, String identifier, String rdsName) { HttpFilter httpFilter = HttpFilter.newBuilder() - .setName("terminal-filter") + .setName(identifier) .setTypedConfig(Any.pack(Router.newBuilder().build())) .setIsOptional(true) .build(); @@ -256,7 +328,7 @@ static Listener buildClientListener(String name) { .HttpConnectionManager.newBuilder() .setRds( Rds.newBuilder() - .setRouteConfigName(RDS_NAME) + .setRouteConfigName(rdsName) .setConfigSource( ConfigSource.newBuilder() .setAds(AggregatedConfigSource.getDefaultInstance()))) @@ -312,4 +384,5 @@ static Listener buildServerListener() { .addFilterChains(filterChain) .build(); } + } diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java index 63b9cda043c..867a9590b4f 100644 --- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java +++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java @@ -107,7 +107,7 @@ public void setUp() { // because true->false return mutation prevents fetchClientStatus from completing the request. csdsStub = ClientStatusDiscoveryServiceGrpc .newBlockingStub(grpcServerRule.getChannel()) - .withDeadline(Deadline.after(3, TimeUnit.SECONDS)); + .withDeadline(Deadline.after(30, TimeUnit.SECONDS)); csdsAsyncStub = ClientStatusDiscoveryServiceGrpc.newStub(grpcServerRule.getChannel()); } @@ -497,8 +497,8 @@ public BootstrapInfo getBootstrapInfo() { @Nullable @Override - public Collection getSubscribedResources(ServerInfo serverInfo, - XdsResourceType type) { + public Collection getSubscribedResources( + ServerInfo serverInfo, XdsResourceType type) { return null; } @@ -506,13 +506,11 @@ public Collection getSubscribedResources(ServerInfo serverInfo, public Map> getSubscribedResourceTypesWithTypeUrl() { return ImmutableMap.of(); } - } private static class FakeXdsClientPoolFactory implements XdsClientPoolFactory { private final Map xdsClientMap = new HashMap<>(); - private boolean isOldStyle - ; + private boolean isOldStyle; private FakeXdsClientPoolFactory(@Nullable XdsClient xdsClient) { if (xdsClient != null) { diff --git a/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java b/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java index 30ea76b54f2..5a701c4b045 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java @@ -61,10 +61,12 @@ public class GrpcBootstrapperImplTest { private String originalBootstrapPathFromSysProp; private String originalBootstrapConfigFromEnvVar; private String originalBootstrapConfigFromSysProp; + private String originalExperimentalXdsFallbackFlag; @Before public void setUp() { saveEnvironment(); + System.setProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK, "true"); bootstrapper.bootstrapPathFromEnvVar = BOOTSTRAP_FILE_PATH; } @@ -73,6 +75,8 @@ private void saveEnvironment() { originalBootstrapPathFromSysProp = bootstrapper.bootstrapPathFromSysProp; originalBootstrapConfigFromEnvVar = bootstrapper.bootstrapConfigFromEnvVar; originalBootstrapConfigFromSysProp = bootstrapper.bootstrapConfigFromSysProp; + originalExperimentalXdsFallbackFlag = + System.getProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK); } @After @@ -81,6 +85,12 @@ public void restoreEnvironment() { bootstrapper.bootstrapPathFromSysProp = originalBootstrapPathFromSysProp; bootstrapper.bootstrapConfigFromEnvVar = originalBootstrapConfigFromEnvVar; bootstrapper.bootstrapConfigFromSysProp = originalBootstrapConfigFromSysProp; + if (originalExperimentalXdsFallbackFlag != null) { + System.setProperty( + BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK, originalExperimentalXdsFallbackFlag); + } else { + System.clearProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK); + } } @Test diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index d41630cdb4a..37165f824f8 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -3406,6 +3406,11 @@ public void streamClosedAndRetryWithBackoff() { call.verifyRequest(RDS, RDS_RESOURCE, "5", "6764", NODE); call.sendError(Status.DEADLINE_EXCEEDED.asException()); + fakeClock.forwardNanos(100L); + call = resourceDiscoveryCalls.poll(); + call.sendError(Status.DEADLINE_EXCEEDED.asException()); + + // Already received LDS and RDS, so they only error twice. verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); @@ -3415,11 +3420,10 @@ public void streamClosedAndRetryWithBackoff() { // Reset backoff sequence and retry after backoff. inOrder.verify(backoffPolicyProvider).get(); - inOrder.verify(backoffPolicy2).nextBackoffNanos(); + inOrder.verify(backoffPolicy2, times(2)).nextBackoffNanos(); retryTask = Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); - assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(20L); - fakeClock.forwardNanos(20L); + fakeClock.forwardNanos(retryTask.getDelay(TimeUnit.NANOSECONDS)); call = resourceDiscoveryCalls.poll(); call.verifyRequest(LDS, LDS_RESOURCE, "63", "", NODE); call.verifyRequest(RDS, RDS_RESOURCE, "5", "", NODE); @@ -3490,7 +3494,8 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { } @Test - public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedResources() { + public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedResources() + throws InterruptedException { xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); @@ -3516,11 +3521,20 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe assertThat(edsResourceTimeout.isCancelled()).isTrue(); verify(ldsResourceWatcher, never()).onError(errorCaptor.capture()); verify(rdsResourceWatcher, never()).onError(errorCaptor.capture()); + verify(cdsResourceWatcher, never()).onError(errorCaptor.capture()); // We had a response + + fakeClock.forwardTime(5, TimeUnit.SECONDS); + DiscoveryRpcCall call2 = resourceDiscoveryCalls.poll(); + call2.sendError(Status.UNAVAILABLE.asException()); verify(cdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(edsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + fakeClock.forwardTime(5, TimeUnit.SECONDS); + DiscoveryRpcCall call3 = resourceDiscoveryCalls.poll(); + assertThat(call3).isNotNull(); + fakeClock.forwardNanos(10L); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0); assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java new file mode 100644 index 00000000000..82dd5ab6437 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -0,0 +1,386 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static junit.framework.TestCase.assertFalse; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.grpc.Status; +import io.grpc.internal.ObjectPool; +import io.grpc.xds.client.BootstrapperImpl; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsInitializationException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class XdsClientFallbackTest { + private static final Logger log = Logger.getLogger(XdsClientFallbackTest.class.getName()); + + private static final String MAIN_SERVER = "main-server"; + private static final String FALLBACK_SERVER = "fallback-server"; + private static final String DUMMY_TARGET = "TEST_TARGET"; + private static final String RDS_NAME = "route-config.googleapis.com"; + private static final String FALLBACK_RDS_NAME = "fallback-" + RDS_NAME; + private static final String CLUSTER_NAME = "cluster0"; + private static final String FALLBACK_CLUSTER_NAME = "fallback-" + CLUSTER_NAME; + private static final String EDS_NAME = "eds-service-0"; + private static final String FALLBACK_EDS_NAME = "fallback-" + EDS_NAME; + private static final HttpConnectionManager MAIN_HTTP_CONNECTION_MANAGER = + HttpConnectionManager.forRdsName(0, RDS_NAME, ImmutableList.of( + new Filter.NamedFilterConfig(MAIN_SERVER, RouterFilter.ROUTER_CONFIG))); + private static final HttpConnectionManager FALLBACK_HTTP_CONNECTION_MANAGER = + HttpConnectionManager.forRdsName(0, RDS_NAME, ImmutableList.of( + new Filter.NamedFilterConfig(FALLBACK_SERVER, RouterFilter.ROUTER_CONFIG))); + private ObjectPool xdsClientPool; + private XdsClient xdsClient; + + private XdsClient.ResourceWatcher raalLdsWatcher = + new XdsClient.ResourceWatcher() { + + @Override + public void onChanged(XdsListenerResource.LdsUpdate update) { + log.log(Level.FINE, "LDS update: " + update); + } + + @Override + public void onError(Status error) { + log.log(Level.FINE, "LDS update error: " + error.getDescription()); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + log.log(Level.FINE, "LDS resource does not exist: " + resourceName); + } + }; + + @SuppressWarnings("unchecked") + private XdsClient.ResourceWatcher ldsWatcher = + mock(XdsClient.ResourceWatcher.class, delegatesTo(raalLdsWatcher)); + @Mock + private XdsClient.ResourceWatcher ldsWatcher2; + + @Mock + private XdsClient.ResourceWatcher rdsWatcher; + @Mock + private XdsClient.ResourceWatcher rdsWatcher2; + @Mock + private XdsClient.ResourceWatcher rdsWatcher3; + + private XdsClient.ResourceWatcher raalCdsWatcher = + new XdsClient.ResourceWatcher() { + + @Override + public void onChanged(XdsClusterResource.CdsUpdate update) { + log.log(Level.FINE, "CDS update: " + update); + } + + @Override + public void onError(Status error) { + log.log(Level.FINE, "CDS update error: " + error.getDescription()); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + log.log(Level.FINE, "CDS resource does not exist: " + resourceName); + } + }; + + @SuppressWarnings("unchecked") + private XdsClient.ResourceWatcher cdsWatcher = + mock(XdsClient.ResourceWatcher.class, delegatesTo(raalCdsWatcher)); + @Mock + private XdsClient.ResourceWatcher cdsWatcher2; + + @Rule(order = 0) + public ControlPlaneRule mainTdServer = + new ControlPlaneRule(8090).setServerHostName(MAIN_SERVER); + + @Rule(order = 1) + public ControlPlaneRule fallbackServer = + new ControlPlaneRule(8095).setServerHostName(MAIN_SERVER); + + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + @Before + public void setUp() throws XdsInitializationException { + System.setProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK, "true"); + if (mainTdServer == null) { + throw new XdsInitializationException("Failed to create ControlPlaneRule for main TD server"); + } + setAdsConfig(mainTdServer, MAIN_SERVER); + setAdsConfig(fallbackServer, FALLBACK_SERVER); + + SharedXdsClientPoolProvider clientPoolProvider = new SharedXdsClientPoolProvider(); + clientPoolProvider.setBootstrapOverride(defaultBootstrapOverride()); + xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET); + } + + @After + public void cleanUp() { + if (xdsClientPool != null) { + xdsClientPool.returnObject(xdsClient); + } + } + + private static void setAdsConfig(ControlPlaneRule controlPlane, String serverName) { + InetSocketAddress edsInetSocketAddress = + (InetSocketAddress) controlPlane.getServer().getListenSockets().get(0); + boolean isMainServer = serverName.equals(MAIN_SERVER); + String rdsName = isMainServer + ? RDS_NAME + : FALLBACK_RDS_NAME; + String clusterName = isMainServer ? CLUSTER_NAME : FALLBACK_CLUSTER_NAME; + String edsName = isMainServer ? EDS_NAME : FALLBACK_EDS_NAME; + + controlPlane.setLdsConfig(ControlPlaneRule.buildServerListener(), + ControlPlaneRule.buildClientListener(MAIN_SERVER, serverName)); + + controlPlane.setRdsConfig(rdsName, + ControlPlaneRule.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName)); + controlPlane.setCdsConfig(clusterName, ControlPlaneRule.buildCluster(clusterName, edsName)); + + controlPlane.setEdsConfig(edsName, + ControlPlaneRule.buildClusterLoadAssignment(edsInetSocketAddress.getHostName(), + edsInetSocketAddress.getPort(), edsName)); + log.log(Level.FINE, + String.format("Set ADS config for %s with address %s", serverName, edsInetSocketAddress)); + } + + private void restartServer(TdServerType type) { + switch (type) { + case MAIN: + mainTdServer.restartTdServer(); + setAdsConfig(mainTdServer, MAIN_SERVER); + break; + case FALLBACK: + fallbackServer.restartTdServer(); + setAdsConfig(fallbackServer, FALLBACK_SERVER); + break; + default: + throw new IllegalArgumentException("Unknown server type: " + type); + } + } + + // This is basically a control test to make sure everything is set up correctly. + @Test + public void everything_okay() { + restartServer(TdServerType.MAIN); + restartServer(TdServerType.FALLBACK); + xdsClient = xdsClientPool.getObject(); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener( + MAIN_HTTP_CONNECTION_MANAGER)); + + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + verify(rdsWatcher, timeout(5000)).onChanged(any()); + } + + @Test + public void mainServerDown_fallbackServerUp() { + mainTdServer.getServer().shutdownNow(); + restartServer(TdServerType.FALLBACK); + xdsClient = xdsClientPool.getObject(); + log.log(Level.FINE, "Fallback port = " + fallbackServer.getServer().getPort()); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener( + FALLBACK_HTTP_CONNECTION_MANAGER)); + } + + @Test + public void both_down_restart_main() { + mainTdServer.getServer().shutdownNow(); + fallbackServer.getServer().shutdownNow(); + xdsClient = xdsClientPool.getObject(); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + verify(ldsWatcher, timeout(5000).times(0)).onChanged(any()); + + restartServer(TdServerType.MAIN); + + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + + verify(ldsWatcher, timeout(300000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + mainTdServer.getServer().shutdownNow(); + } + + @Test + public void mainDown_fallbackUp_restart_main() { + mainTdServer.getServer().shutdownNow(); + fallbackServer.restartTdServer(); + xdsClient = xdsClientPool.getObject(); + InOrder inOrder = inOrder(ldsWatcher, rdsWatcher, cdsWatcher, cdsWatcher2); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + inOrder.verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), FALLBACK_CLUSTER_NAME, cdsWatcher); + inOrder.verify(cdsWatcher, timeout(5000)).onChanged(any()); + Object fallbackCpcBlob = ((XdsClientImpl) xdsClient).getActiveCpcForTest(null); + + restartServer(TdServerType.MAIN); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + inOrder.verify(rdsWatcher, timeout(5000)).onChanged(any()); + + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher2); + inOrder.verify(cdsWatcher2, timeout(5000)).onChanged(any()); + + // verify that connection to fallback server is closed + assertFalse("Should have disconnected from fallback server", + ((XdsClientImpl) xdsClient).isCpcBlobConnected(fallbackCpcBlob)); + + } + + // This test takes a long time because of the 16 sec timeout for non-existent resource + @Test + public void connect_then_mainServerDown_fallbackServerUp() throws InterruptedException { + restartServer(TdServerType.MAIN); + restartServer(TdServerType.FALLBACK); + xdsClient = xdsClientPool.getObject(); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + verify(rdsWatcher, timeout(5000)).onChanged(any()); + + mainTdServer.getServer().shutdownNow(); + TimeUnit.SECONDS.sleep(5); // TODO(lsafran) Use FakeClock so test runs faster + + // Shouldn't do fallback since all watchers are loaded + verify(ldsWatcher, never()).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + + // Should just get from cache + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher2); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher2); + verify(ldsWatcher2, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + verify(ldsWatcher, never()).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + // Make sure that rdsWatcher wasn't called again + verify(rdsWatcher, times(1)).onChanged(any()); + verify(rdsWatcher2, timeout(5000)).onChanged(any()); + + // Asking for something not in cache should force a fallback + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), FALLBACK_CLUSTER_NAME, cdsWatcher); + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + verify(ldsWatcher2, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + verify(cdsWatcher, timeout(16000)).onChanged(any()); + + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), FALLBACK_RDS_NAME, rdsWatcher3); + verify(rdsWatcher3, timeout(5000)).onChanged(any()); + + // Test that resource defined in main but not fallback is handled correctly + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher2); + verify(cdsWatcher2, timeout(16000)).onResourceDoesNotExist(eq(CLUSTER_NAME)); + } + + @Test + public void connect_then_mainServerRestart_fallbackServerdown() { + restartServer(TdServerType.MAIN); + xdsClient = xdsClientPool.getObject(); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + + mainTdServer.getServer().shutdownNow(); + fallbackServer.getServer().shutdownNow(); + + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher); + + restartServer(TdServerType.MAIN); + + verify(cdsWatcher, timeout(5000)).onChanged(any()); + verify(ldsWatcher, timeout(5000).atLeastOnce()).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + } + + private Map defaultBootstrapOverride() { + return ImmutableMap.of( + "node", ImmutableMap.of( + "id", UUID.randomUUID().toString(), + "cluster", CLUSTER_NAME), + "xds_servers", ImmutableList.of( + ImmutableMap.of( + "server_uri", "localhost:" + mainTdServer.getServer().getPort(), + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ), + ImmutableMap.of( + "server_uri", "localhost:" + fallbackServer.getServer().getPort(), + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ) + ), + "fallback-policy", "fallback" + ); + } + + private enum TdServerType { + MAIN, + FALLBACK + } +}