From f5090ab62f0276466b8485b67ba2142e4f6075a2 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Tue, 3 Dec 2024 18:30:02 -0800 Subject: [PATCH] Framework definition to support A74 --- xds/build.gradle | 2 + xds/src/main/java/io/grpc/xds/XdsConfig.java | 142 +++++ .../io/grpc/xds/XdsDependencyManager.java | 600 ++++++++++++++++++ .../grpc/xds/XdsRouteConfigureResource.java | 2 +- .../java/io/grpc/xds/ControlPlaneRule.java | 21 +- .../io/grpc/xds/XdsClientFallbackTest.java | 2 +- .../io/grpc/xds/XdsDependencyManagerTest.java | 178 ++++++ .../test/java/io/grpc/xds/XdsTestUtils.java | 413 ++++++++++++ .../client/CommonBootstrapperTestUtils.java | 5 + 9 files changed, 1343 insertions(+), 22 deletions(-) create mode 100644 xds/src/main/java/io/grpc/xds/XdsConfig.java create mode 100644 xds/src/main/java/io/grpc/xds/XdsDependencyManager.java create mode 100644 xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java create mode 100644 xds/src/test/java/io/grpc/xds/XdsTestUtils.java diff --git a/xds/build.gradle b/xds/build.gradle index e09b42d06a9..ec5c21bb2d4 100644 --- a/xds/build.gradle +++ b/xds/build.gradle @@ -46,6 +46,7 @@ dependencies { thirdpartyImplementation project(':grpc-protobuf'), project(':grpc-stub') compileOnly sourceSets.thirdparty.output + testCompileOnly sourceSets.thirdparty.output implementation project(':grpc-stub'), project(':grpc-core'), project(':grpc-util'), @@ -59,6 +60,7 @@ dependencies { libraries.protobuf.java.util def nettyDependency = implementation project(':grpc-netty') + testImplementation project(':grpc-api') testImplementation project(':grpc-rls') testImplementation project(':grpc-inprocess') testImplementation testFixtures(project(':grpc-core')), diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java new file mode 100644 index 00000000000..12c88c29011 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.grpc.StatusOr; +import io.grpc.xds.XdsClusterResource.CdsUpdate; +import io.grpc.xds.XdsEndpointResource.EdsUpdate; +import io.grpc.xds.XdsListenerResource.LdsUpdate; +import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents the xDS configuration tree for a specified Listener. + */ +public class XdsConfig { + final LdsUpdate listener; + final RdsUpdate route; + final Map> clusters; + private final int hashCode; + + XdsConfig(LdsUpdate listener, RdsUpdate route, Map> clusters) { + this.listener = listener; + this.route = route; + this.clusters = clusters; + + hashCode = Objects.hash(listener, route, clusters); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof XdsConfig)) { + return false; + } + + XdsConfig o = (XdsConfig) obj; + + return Objects.equals(listener, o.listener) && Objects.equals(route, o.route) + && Objects.equals(clusters, o.clusters); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("XdsConfig{listener=").append(listener) + .append(", route=").append(route) + .append(", clusters={").append(clusters).append("}}"); + return builder.toString(); + } + + public static class XdsClusterConfig { + final String clusterName; + final CdsUpdate clusterResource; + final StatusOr endpoint; + + XdsClusterConfig(String clusterName, CdsUpdate clusterResource, + StatusOr endpoint) { + this.clusterName = clusterName; + this.clusterResource = clusterResource; + this.endpoint = endpoint; + } + + @Override + public int hashCode() { + return clusterName.hashCode() + clusterResource.hashCode() + endpoint.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof XdsClusterConfig)) { + return false; + } + XdsClusterConfig o = (XdsClusterConfig) obj; + return Objects.equals(clusterName, o.clusterName) + && Objects.equals(clusterResource, o.clusterResource) + && Objects.equals(endpoint, o.endpoint); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("XdsClusterConfig{clusterName=").append(clusterName) + .append(", clusterResource=").append(clusterResource) + .append(", endpoint=").append(endpoint).append("}"); + return builder.toString(); + } + } + + static class XdsConfigBuilder { + private LdsUpdate listener; + private RdsUpdate route; + private Map> clusters = new HashMap<>(); + + XdsConfigBuilder setListener(LdsUpdate listener) { + this.listener = listener; + return this; + } + + XdsConfigBuilder setRoute(RdsUpdate route) { + this.route = route; + return this; + } + + XdsConfigBuilder addCluster(String name, StatusOr clusterConfig) { + clusters.put(name, clusterConfig); + return this; + } + + XdsConfig build() { + checkNotNull(listener, "listener"); + checkNotNull(route, "route"); + return new XdsConfig(listener, route, clusters); + } + } + + public interface XdsClusterSubscriptionRegistry { + Closeable subscribeToCluster(String clusterName); + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java new file mode 100644 index 00000000000..35acda9a65f --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -0,0 +1,600 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.client.XdsClient.ResourceUpdate; +import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; + +import com.google.common.collect.Sets; +import io.grpc.InternalLogId; +import io.grpc.Status; +import io.grpc.StatusOr; +import io.grpc.SynchronizationContext; +import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; +import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClient.ResourceWatcher; +import io.grpc.xds.client.XdsLogger; +import io.grpc.xds.client.XdsResourceType; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + +/** + * This class acts as a layer of indirection between the XdsClient and the NameResolver. It + * maintains the watchers for the xds resources and when an update is received, it either requests + * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher. Each instance + * applies to a single data plane authority. + */ +@SuppressWarnings("unused") // TODO remove when changes for A74 are fully implemented +final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry { + public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance(); + public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance(); + private final XdsClient xdsClient; + private final XdsConfigWatcher xdsConfigWatcher; + private final SynchronizationContext syncContext; + private final String dataPlaneAuthority; + private final Map> clusterSubscriptions = new HashMap<>(); + + private final InternalLogId logId; + private final XdsLogger logger; + private XdsConfig lastXdsConfig = null; + private final Map, TypeWatchers> resourceWatchers = new HashMap<>(); + + XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, + SynchronizationContext syncContext, String dataPlaneAuthority, + String listenerName) { + logId = InternalLogId.allocate("xds-dependency-manager", listenerName); + logger = XdsLogger.withLogId(logId); + this.xdsClient = xdsClient; + this.xdsConfigWatcher = xdsConfigWatcher; + this.syncContext = syncContext; + this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority"); + + // start the ball rolling + addWatcher(new LdsWatcher(listenerName)); + } + + @Override + public ClusterSubscription subscribeToCluster(String clusterName) { + + checkNotNull(clusterName, "clusterName"); + ClusterSubscription subscription = new ClusterSubscription(clusterName); + + Set localSubscriptions = + clusterSubscriptions.computeIfAbsent(clusterName, k -> new HashSet<>()); + localSubscriptions.add(subscription); + addWatcher(new CdsWatcher(clusterName)); + + return subscription; + } + + private boolean hasWatcher(XdsResourceType type, String resourceName) { + TypeWatchers typeWatchers = resourceWatchers.get(type); + return typeWatchers != null && typeWatchers.watchers.containsKey(resourceName); + } + + @SuppressWarnings("unchecked") + private void addWatcher(XdsWatcherBase watcher) { + XdsResourceType type = watcher.type; + String resourceName = watcher.resourceName; + + this.syncContext.execute(() -> { + TypeWatchers typeWatchers = (TypeWatchers)resourceWatchers.get(type); + if (typeWatchers == null) { + typeWatchers = new TypeWatchers<>(type); + resourceWatchers.put(type, typeWatchers); + } + + typeWatchers.add(resourceName, watcher); + xdsClient.watchXdsResource(type, resourceName, watcher); + }); + } + + @SuppressWarnings("unchecked") + private void cancelWatcher(XdsWatcherBase watcher) { + if (watcher == null) { + return; + } + + XdsResourceType type = watcher.type; + String resourceName = watcher.resourceName; + + this.syncContext.execute(() -> { + TypeWatchers typeWatchers = (TypeWatchers)resourceWatchers.get(type); + if (typeWatchers == null) { + logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher); + return; + } + + typeWatchers.watchers.remove(resourceName); + xdsClient.cancelXdsResourceWatch(type, resourceName, watcher); + }); + + } + + public void shutdown() { + for (TypeWatchers watchers : resourceWatchers.values()) { + shutdownWatchersForType(watchers); + } + resourceWatchers.clear(); + } + + private void shutdownWatchersForType(TypeWatchers watchers) { + for (Map.Entry> watcherEntry : watchers.watchers.entrySet()) { + xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(), + watcherEntry.getValue()); + } + } + + private void releaseSubscription(ClusterSubscription subscription) { + checkNotNull(subscription, "subscription"); + String clusterName = subscription.getClusterName(); + Set subscriptions = clusterSubscriptions.get(clusterName); + if (subscriptions == null) { + logger.log(DEBUG, "Subscription already released for {0}", clusterName); + return; + } + + subscriptions.remove(subscription); + if (subscriptions.isEmpty()) { + clusterSubscriptions.remove(clusterName); + XdsWatcherBase cdsWatcher = + resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName); + cancelClusterWatcherTree((CdsWatcher) cdsWatcher); + + maybePublishConfig(); + } + } + + private void cancelClusterWatcherTree(CdsWatcher root) { + checkNotNull(root, "root"); + cancelWatcher(root); + + if (root.getData() == null || !root.getData().hasValue()) { + return; + } + + XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue(); + switch (cdsUpdate.clusterType()) { + case EDS: + String edsServiceName = cdsUpdate.edsServiceName(); + EdsWatcher edsWatcher = + (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName); + cancelWatcher(edsWatcher); + break; + case AGGREGATE: + for (String cluster : cdsUpdate.prioritizedClusterNames()) { + CdsWatcher clusterWatcher = + (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster); + if (clusterWatcher != null) { + cancelClusterWatcherTree(clusterWatcher); + } + } + break; + case LOGICAL_DNS: + // no eds needed + break; + default: + throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType()); + } + } + + /** + * Check if all resources have results, and if so, generate a new XdsConfig and send it to all + * the watchers. + */ + private void maybePublishConfig() { + syncContext.execute(() -> { + boolean waitingOnResource = resourceWatchers.values().stream() + .flatMap(typeWatchers -> typeWatchers.watchers.values().stream()) + .anyMatch(watcher -> !watcher.hasResult()); + if (waitingOnResource) { + return; + } + + buildConfig(); + xdsConfigWatcher.onUpdate(lastXdsConfig); + }); + } + + private void buildConfig() { + XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); + + // Iterate watchers and build the XdsConfig + + // Will only be 1 listener and 1 route resource + resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values().stream() + .map(watcher -> (LdsWatcher) watcher) + .forEach(watcher -> builder.setListener(watcher.getData().getValue())); + + resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream() + .map(watcher -> (RdsWatcher) watcher) + .forEach(watcher -> builder.setRoute(watcher.getData().getValue())); + + Map> edsWatchers = + resourceWatchers.get(ENDPOINT_RESOURCE).watchers; + Map> cdsWatchers = + resourceWatchers.get(CLUSTER_RESOURCE).watchers; + + // Iterate CDS watchers + for (XdsWatcherBase watcher : cdsWatchers.values()) { + CdsWatcher cdsWatcher = (CdsWatcher) watcher; + String clusterName = cdsWatcher.resourceName(); + StatusOr cdsUpdate = cdsWatcher.getData(); + if (cdsUpdate.hasValue()) { + XdsConfig.XdsClusterConfig clusterConfig; + String edsName = cdsUpdate.getValue().edsServiceName(); + EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName); + assert edsWatcher != null; + clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(), + edsWatcher.getData()); + builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig)); + } else { + builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus())); + } + } + + lastXdsConfig = builder.build(); + } + + @Override + public String toString() { + return logId.toString(); + } + + private static class TypeWatchers { + final Map> watchers = new HashMap<>(); + final XdsResourceType resourceType; + + TypeWatchers(XdsResourceType resourceType) { + this.resourceType = resourceType; + } + + public void add(String resourceName, XdsWatcherBase watcher) { + watchers.put(resourceName, watcher); + } + } + + public interface XdsConfigWatcher { + + void onUpdate(XdsConfig config); + + // These 2 methods are invoked when there is an error or + // does-not-exist on LDS or RDS only. The context will be a + // human-readable string indicating the scope in which the error + // occurred (e.g., the resource type and name). + void onError(String resourceContext, Status status); + + void onResourceDoesNotExist(String resourceContext); + } + + private class ClusterSubscription implements Closeable { + String clusterName; + + public ClusterSubscription(String clusterName) { + this.clusterName = clusterName; + } + + public String getClusterName() { + return clusterName; + } + + @Override + public void close() throws IOException { + releaseSubscription(this); + } + } + + @SuppressWarnings("ClassCanBeStatic") + private abstract class XdsWatcherBase + implements ResourceWatcher { + private final XdsResourceType type; + private final String resourceName; + @Nullable + protected StatusOr data; + protected boolean transientError = false; + + + private XdsWatcherBase(XdsResourceType type, String resourceName) { + this.type = type; + this.resourceName = resourceName; + } + + @Override + public void onError(Status error) { + checkNotNull(error, "error"); + data = StatusOr.fromStatus(error); + transientError = true; + } + + protected void handleDoesNotExist(String resourceName) { + checkArgument(this.resourceName.equals(resourceName), "Resource name does not match"); + data = StatusOr.fromStatus( + Status.UNAVAILABLE.withDescription("No " + type + " resource: " + resourceName)); + transientError = false; + } + + boolean hasResult() { + return data != null; + } + + @Nullable + StatusOr getData() { + return data; + } + + String resourceName() { + return resourceName; + } + + protected void setData(T data) { + checkNotNull(data, "data"); + this.data = StatusOr.fromValue(data); + transientError = false; + } + + boolean isTransientError() { + return data != null && !data.hasValue() && transientError; + } + + String toContextString() { + return type + " resource: " + resourceName; + } + } + + private class LdsWatcher extends XdsWatcherBase { + String rdsName; + XdsListenerResource.LdsUpdate currentLdsUpdate; + + private LdsWatcher(String resourceName) { + super(XdsListenerResource.getInstance(), resourceName); + } + + @Override + public void onChanged(XdsListenerResource.LdsUpdate update) { + HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); + List virtualHosts = httpConnectionManager.virtualHosts(); + String rdsName = httpConnectionManager.rdsName(); + + syncContext.execute(() -> { + boolean changedRdsName = rdsName != null && !rdsName.equals(this.rdsName); + if (changedRdsName) { + cleanUpRdsWatcher(); + } + + if (virtualHosts != null) { + updateRoutes(virtualHosts); + } else if (changedRdsName) { + this.rdsName = rdsName; + addWatcher(new RdsWatcher(rdsName)); + logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); + } + + setData(update); + maybePublishConfig(); + }); + } + + @Override + public void onError(Status error) { + super.onError(error); + xdsConfigWatcher.onError(toContextString(), error); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(resourceName); + xdsConfigWatcher.onResourceDoesNotExist(toContextString()); + } + + private void cleanUpRdsWatcher() { + TypeWatchers watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); + if (watchers == null) { + return; + } + RdsWatcher oldRdsWatcher = (RdsWatcher) watchers.watchers.remove(rdsName); + if (oldRdsWatcher != null) { + cancelWatcher(oldRdsWatcher); + } + } + } + + private class RdsWatcher extends XdsWatcherBase { + + public RdsWatcher(String resourceName) { + super(XdsRouteConfigureResource.getInstance(), resourceName); + } + + @Override + public void onChanged(RdsUpdate update) { + setData(update); + syncContext.execute(() -> { + updateRoutes(update.virtualHosts); + maybePublishConfig(); + }); + } + + @Override + public void onError(Status error) { + super.onError(error); + xdsConfigWatcher.onError(toContextString(), error); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(resourceName); + xdsConfigWatcher.onResourceDoesNotExist(toContextString()); + } + } + + private class CdsWatcher extends XdsWatcherBase { + + CdsWatcher(String resourceName) { + super(CLUSTER_RESOURCE, resourceName); + } + + @Override + public void onChanged(XdsClusterResource.CdsUpdate update) { + syncContext.execute(() -> { + switch (update.clusterType()) { + case EDS: + setData(update); + if (!hasWatcher(ENDPOINT_RESOURCE, update.edsServiceName())) { + addWatcher(new EdsWatcher(update.edsServiceName())); + } else { + maybePublishConfig(); + } + break; + case LOGICAL_DNS: + setData(update); + maybePublishConfig(); + // no eds needed + break; + case AGGREGATE: + if (data.hasValue()) { + TypeWatchers cdsWatchers = resourceWatchers.get(CLUSTER_RESOURCE); + Set oldNames = new HashSet<>(data.getValue().prioritizedClusterNames()); + Set newNames = new HashSet<>(update.prioritizedClusterNames()); + + setData(update); + + Set addedClusters = Sets.difference(newNames, oldNames); + Set deletedClusters = Sets.difference(oldNames, newNames); + addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster))); + deletedClusters.forEach((cluster) -> cancelWatcher(getCluster(cluster))); + + if (!addedClusters.isEmpty()) { + maybePublishConfig(); + } + } else { + setData(update); + update.prioritizedClusterNames().forEach(name -> addWatcher(new CdsWatcher(name))); + } + break; + default: + throw new AssertionError("Unknown cluster type: " + update.clusterType()); + } + }); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(resourceName); + } + + } + + private class EdsWatcher extends XdsWatcherBase { + private EdsWatcher(String resourceName) { + super(ENDPOINT_RESOURCE, resourceName); + } + + @Override + public void onChanged(XdsEndpointResource.EdsUpdate update) { + syncContext.execute(() -> { + setData(update); + maybePublishConfig(); + }); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(resourceName); + } + } + + private void updateRoutes(List virtualHosts) { + String authority = dataPlaneAuthority; + + VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority); + if (virtualHost == null) { + String error = "Failed to find virtual host matching hostname: " + authority; + logger.log(XdsLogger.XdsLogLevel.WARNING, error); + cleanUpRoutes(error); + xdsConfigWatcher.onError( + "xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error)); + return; + } + + // Get all cluster names to which requests can be routed through the virtual host. + Set clusters = new HashSet<>(); + for (VirtualHost.Route route : virtualHost.routes()) { + VirtualHost.Route.RouteAction action = route.routeAction(); + if (action == null) { + continue; + } + if (action.cluster() != null) { + clusters.add(action.cluster()); + } else if (action.weightedClusters() != null) { + for (ClusterWeight weighedCluster : action.weightedClusters()) { + clusters.add(weighedCluster.name()); + } + } + } + + // Get existing cluster names + TypeWatchers clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE); + Set oldClusters = + (clusterWatchers != null) ? clusterWatchers.watchers.keySet() : Collections.emptySet(); + + // Calculate diffs. + Set addedClusters = + oldClusters == null ? clusters : Sets.difference(clusters, oldClusters); + Set deletedClusters = + oldClusters == null ? Collections.emptySet() : Sets.difference(oldClusters, clusters); + + addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster))); + deletedClusters.forEach(watcher -> cancelWatcher(getCluster(watcher))); + } + + // Must be in SyncContext + private void cleanUpRoutes(String error) { + // Remove RdsWatcher & CDS Watchers + TypeWatchers rdsWatcher = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); + if (rdsWatcher != null) { + for (XdsWatcherBase watcher : rdsWatcher.watchers.values()) { + cancelWatcher(watcher); + } + } + + // Remove all CdsWatchers + TypeWatchers cdsWatcher = resourceWatchers.get(CLUSTER_RESOURCE); + if (cdsWatcher != null) { + for (XdsWatcherBase watcher : cdsWatcher.watchers.values()) { + cancelWatcher(watcher); + } + } + } + + private static String prefixedClusterName(String name) { + return "cluster:" + name; + } + + private XdsWatcherBase getCluster(String clusterName) { + return resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName); + } + +} diff --git a/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java b/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java index 0e065c6ba9a..6fa825a64bf 100644 --- a/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsRouteConfigureResource.java @@ -134,7 +134,7 @@ protected RdsUpdate doParse(XdsResourceType.Args args, Message unpackedMessage) (RouteConfiguration) unpackedMessage, FilterRegistry.getDefaultRegistry(), args); } - private static RdsUpdate processRouteConfiguration( + static RdsUpdate processRouteConfiguration( RouteConfiguration routeConfig, FilterRegistry filterRegistry, XdsResourceType.Args args) throws ResourceInvalidException { return new RdsUpdate(extractVirtualHosts(routeConfig, filterRegistry, args)); diff --git a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java index ac1c4829c74..39761912ea5 100644 --- a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java +++ b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.protobuf.Any; -import com.google.protobuf.BoolValue; import com.google.protobuf.Message; import com.google.protobuf.UInt32Value; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -45,7 +44,6 @@ import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.NonForwardingAction; import io.envoyproxy.envoy.config.route.v3.Route; -import io.envoyproxy.envoy.config.route.v3.RouteAction; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.config.route.v3.RouteMatch; import io.envoyproxy.envoy.config.route.v3.VirtualHost; @@ -239,24 +237,7 @@ void setEdsConfig(String edsName, ClusterLoadAssignment clusterLoadAssignment) { * Builds a new default RDS configuration. */ static RouteConfiguration buildRouteConfiguration(String authority) { - return buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME); - } - - static RouteConfiguration buildRouteConfiguration(String authority, String rdsName, - String clusterName) { - VirtualHost.Builder vhBuilder = VirtualHost.newBuilder() - .setName(rdsName) - .addDomains(authority) - .addRoutes( - Route.newBuilder() - .setMatch( - RouteMatch.newBuilder().setPrefix("/").build()) - .setRoute( - RouteAction.newBuilder().setCluster(clusterName) - .setAutoHostRewrite(BoolValue.newBuilder().setValue(true).build()) - .build())); - VirtualHost virtualHost = vhBuilder.build(); - return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); + return XdsTestUtils.buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME); } /** diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java index 6df27db0450..17055a9e1e2 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -200,7 +200,7 @@ private static void setAdsConfig(ControlPlaneRule controlPlane, String serverNam ControlPlaneRule.buildClientListener(MAIN_SERVER, serverName)); controlPlane.setRdsConfig(rdsName, - ControlPlaneRule.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName)); + XdsTestUtils.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName)); controlPlane.setCdsConfig(clusterName, ControlPlaneRule.buildCluster(clusterName, edsName)); controlPlane.setEdsConfig(edsName, diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java new file mode 100644 index 00000000000..e056226649d --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -0,0 +1,178 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import io.grpc.BindableService; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.internal.FakeClock; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.client.CommonBootstrapperTestUtils; +import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClientMetricReporter; +import io.grpc.xds.client.XdsTransportFactory; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** Unit tests for {@link XdsNameResolverProvider}. */ +@RunWith(JUnit4.class) +public class XdsDependencyManagerTest { + private static final Logger log = Logger.getLogger(XdsDependencyManagerTest.class.getName()); + + @Mock + private XdsClientMetricReporter xdsClientMetricReporter; + + private final SynchronizationContext syncContext = + new SynchronizationContext(mock(Thread.UncaughtExceptionHandler.class)); + + private ManagedChannel channel; + private XdsClientImpl xdsClient; + private XdsDependencyManager xdsDependencyManager; + private TestWatcher xdsConfigWatcher; + private Server xdsServer; + + private final FakeClock fakeClock = new FakeClock(); + private final BlockingDeque resourceDiscoveryCalls = + new LinkedBlockingDeque<>(1); + private final String serverName = InProcessServerBuilder.generateName(); + private final Queue loadReportCalls = new ArrayDeque<>(); + private final AtomicBoolean adsEnded = new AtomicBoolean(true); + private final AtomicBoolean lrsEnded = new AtomicBoolean(true); + private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService(); + private final BindableService lrsService = + XdsTestUtils.createLrsService(lrsEnded, loadReportCalls); + + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + private TestWatcher testWatcher; + private XdsConfig defaultXdsConfig; // set in setUp() + + @Before + public void setUp() throws Exception { + xdsServer = cleanupRule.register(InProcessServerBuilder + .forName(serverName) + .addService(controlPlaneService) + .addService(lrsService) + .directExecutor() + .build() + .start()); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName); + + channel = cleanupRule.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + XdsTransportFactory xdsTransportFactory = + ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel); + + xdsClient = CommonBootstrapperTestUtils.createXdsClient( + Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock, + new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); + + testWatcher = new TestWatcher(); + xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher)); + defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName); + } + + @After + public void tearDown() throws InterruptedException { + if (xdsDependencyManager != null) { + xdsDependencyManager.shutdown(); + } + xdsClient.shutdown(); + channel.shutdown(); // channel not owned by XdsClient + + xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + + assertThat(adsEnded.get()).isTrue(); + assertThat(lrsEnded.get()).isTrue(); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + } + + @Test + public void verify_basic_config() { + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + testWatcher.verifyStats(1, 0, 0); + } + + private static class TestWatcher implements XdsDependencyManager.XdsConfigWatcher { + XdsConfig lastConfig; + int numUpdates = 0; + int numError = 0; + int numDoesNotExist = 0; + + @Override + public void onUpdate(XdsConfig config) { + log.fine("Config changed: " + config); + lastConfig = config; + numUpdates++; + } + + @Override + public void onError(String resourceContext, Status status) { + log.fine(String.format("Error %s for %s: ", status, resourceContext)); + numError++; + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + log.fine("Resource does not exist: " + resourceName); + numDoesNotExist++; + } + + private List getStats() { + return Arrays.asList(numUpdates, numError, numDoesNotExist); + } + + private void verifyStats(int updt, int err, int notExist) { + assertThat(getStats()).isEqualTo(Arrays.asList(updt, err, notExist)); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java new file mode 100644 index 00000000000..c3ea3ce7fda --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -0,0 +1,413 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.BoolValue; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import com.google.rpc.Code; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.Route; +import io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.grpc.BindableService; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.StatusOr; +import io.grpc.internal.JsonParser; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.Endpoints.LbEndpoint; +import io.grpc.xds.Endpoints.LocalityLbEndpoints; +import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.EnvoyProtoData; +import io.grpc.xds.client.Locality; +import io.grpc.xds.client.XdsResourceType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; +import org.mockito.Mockito; +import org.mockito.verification.VerificationMode; + +public class XdsTestUtils { + private static final Logger log = Logger.getLogger(XdsTestUtils.class.getName()); + private static final String RDS_NAME = "route-config.googleapis.com"; + private static final String CLUSTER_NAME = "cluster0"; + private static final String EDS_NAME = "eds-service-0"; + private static final String SERVER_LISTENER = "grpc/server?udpa.resource.listening_address="; + public static final String ENDPOINT_HOSTNAME = "data-host"; + public static final int ENDPOINT_PORT = 1234; + + static BindableService createLrsService(AtomicBoolean lrsEnded, + Queue loadReportCalls) { + return new LoadReportingServiceGrpc.LoadReportingServiceImplBase() { + @Override + public StreamObserver streamLoadStats( + StreamObserver responseObserver) { + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + LrsRpcCall call = new LrsRpcCall(requestObserver, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + lrsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + loadReportCalls.offer(call); + return requestObserver; + } + }; + } + + static boolean matchErrorDetail( + com.google.rpc.Status errorDetail, int expectedCode, List expectedMessages) { + if (expectedCode != errorDetail.getCode()) { + return false; + } + List errors = Splitter.on('\n').splitToList(errorDetail.getMessage()); + if (errors.size() != expectedMessages.size()) { + return false; + } + for (int i = 0; i < errors.size(); i++) { + if (!errors.get(i).startsWith(expectedMessages.get(i))) { + return false; + } + } + return true; + } + + static void setAdsConfig(XdsTestControlPlaneService service, String serverName) { + setAdsConfig(service, serverName, RDS_NAME, CLUSTER_NAME, EDS_NAME, ENDPOINT_HOSTNAME, + ENDPOINT_PORT); + } + + static void setAdsConfig(XdsTestControlPlaneService service, String serverName, String rdsName, + String clusterName, String edsName, String endpointHostname, + int endpointPort) { + + Listener serverListener = ControlPlaneRule.buildServerListener(); + Listener clientListener = ControlPlaneRule.buildClientListener(serverName, serverName, rdsName); + service.setXdsConfig(ADS_TYPE_URL_LDS, + ImmutableMap.of(SERVER_LISTENER, serverListener, serverName, clientListener)); + + RouteConfiguration routeConfig = + buildRouteConfiguration(serverName, rdsName, clusterName); + service.setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of(rdsName, routeConfig));; + + Cluster cluster = ControlPlaneRule.buildCluster(clusterName, edsName); + service.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(clusterName, cluster)); + + ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + serverName, endpointHostname, endpointPort); + service.setXdsConfig(ADS_TYPE_URL_EDS, + ImmutableMap.of(edsName, clusterLoadAssignment)); + + log.log(Level.FINE, String.format("Set ADS config for %s with address %s:%d", + serverName, endpointHostname, endpointPort)); + + } + + static XdsConfig getDefaultXdsConfig(String serverHostName) + throws XdsResourceType.ResourceInvalidException, IOException { + XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); + + Filter.NamedFilterConfig routerFilterConfig = new Filter.NamedFilterConfig( + serverHostName, RouterFilter.ROUTER_CONFIG); + + HttpConnectionManager httpConnectionManager = HttpConnectionManager.forRdsName( + 0L, RDS_NAME, Collections.singletonList(routerFilterConfig)); + XdsListenerResource.LdsUpdate ldsUpdate = + XdsListenerResource.LdsUpdate.forApiListener(httpConnectionManager); + + RouteConfiguration routeConfiguration = + buildRouteConfiguration(serverHostName, RDS_NAME, CLUSTER_NAME); + Bootstrapper.ServerInfo serverInfo = null; + XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, "0", "0", null, null, null); + XdsRouteConfigureResource.RdsUpdate rdsUpdate = + XdsRouteConfigureResource.processRouteConfiguration( + routeConfiguration, FilterRegistry.getDefaultRegistry(), args); + + // Need to create endpoints to create locality endpoints map to create edsUpdate + Map lbEndpointsMap = new HashMap<>(); + LbEndpoint lbEndpoint = + LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME); + lbEndpointsMap.put( + Locality.create("", "", ""), + LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0)); + + // Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder + XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate( + EDS_NAME, lbEndpointsMap, Collections.emptyList()); + XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds( + CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null) + .lbPolicyConfig(getWrrLbConfigAsMap()).build(); + XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig( + CLUSTER_NAME, cdsUpdate, StatusOr.fromValue(edsUpdate)); + + builder.setListener(ldsUpdate) + .setRoute(rdsUpdate) + .addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig)); + + return builder.build(); + } + + private static ConfigOrError getWrrLbConfig() throws IOException { + Map lbParsed = getWrrLbConfigAsMap(); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(lbParsed); + + return ConfigOrError.fromConfig(lbConfig); + } + + private static ImmutableMap getWrrLbConfigAsMap() throws IOException { + String lbConfigStr = "{\"wrr_locality_experimental\" : " + + "{ \"childPolicy\" : [{\"round_robin\" : {}}]}}"; + + return ImmutableMap.copyOf((Map) JsonParser.parse(lbConfigStr)); + } + + static RouteConfiguration buildRouteConfiguration(String authority, String rdsName, + String clusterName) { + io.envoyproxy.envoy.config.route.v3.VirtualHost.Builder vhBuilder = + io.envoyproxy.envoy.config.route.v3.VirtualHost.newBuilder() + .setName(rdsName) + .addDomains(authority) + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build()) + .setRoute( + RouteAction.newBuilder().setCluster(clusterName) + .setAutoHostRewrite(BoolValue.newBuilder().setValue(true).build()) + .build())); + io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHost = vhBuilder.build(); + return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); + } + + /** + * Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl, + * response nonce and collection of resource names regardless of order. + */ + static class DiscoveryRequestMatcher implements ArgumentMatcher { + private final Node node; + private final String versionInfo; + private final String typeUrl; + private final Set resources; + private final String responseNonce; + @Nullable + private final Integer errorCode; + private final List errorMessages; + + private DiscoveryRequestMatcher( + Node node, String versionInfo, List resources, + String typeUrl, String responseNonce, @Nullable Integer errorCode, + @Nullable List errorMessages) { + this.node = node; + this.versionInfo = versionInfo; + this.resources = new HashSet<>(resources); + this.typeUrl = typeUrl; + this.responseNonce = responseNonce; + this.errorCode = errorCode; + this.errorMessages = errorMessages != null ? errorMessages : ImmutableList.of(); + } + + @Override + public boolean matches(DiscoveryRequest argument) { + if (!typeUrl.equals(argument.getTypeUrl())) { + return false; + } + if (!versionInfo.equals(argument.getVersionInfo())) { + return false; + } + if (!responseNonce.equals(argument.getResponseNonce())) { + return false; + } + if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) { + return false; + } + if (errorCode == null && argument.hasErrorDetail()) { + return false; + } + if (errorCode != null + && !matchErrorDetail(argument.getErrorDetail(), errorCode, errorMessages)) { + return false; + } + return node.equals(argument.getNode()); + } + + @Override + public String toString() { + return "DiscoveryRequestMatcher{" + + "node=" + node + + ", versionInfo='" + versionInfo + '\'' + + ", typeUrl='" + typeUrl + '\'' + + ", resources=" + resources + + ", responseNonce='" + responseNonce + '\'' + + ", errorCode=" + errorCode + + ", errorMessages=" + errorMessages + + '}'; + } + } + + /** + * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with + * the same list of clusterName:clusterServiceName pair. + */ + static class LrsRequestMatcher implements ArgumentMatcher { + private final List expected; + + private LrsRequestMatcher(List clusterNames) { + expected = new ArrayList<>(); + for (String[] pair : clusterNames) { + expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1])); + } + Collections.sort(expected); + } + + @Override + public boolean matches(LoadStatsRequest argument) { + List actual = new ArrayList<>(); + for (ClusterStats clusterStats : argument.getClusterStatsList()) { + actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName()); + } + Collections.sort(actual); + return actual.equals(expected); + } + } + + static class DiscoveryRpcCall { + StreamObserver requestObserver; + StreamObserver responseObserver; + + private DiscoveryRpcCall(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + } + + protected void verifyRequest( + XdsResourceType type, List resources, String versionInfo, String nonce, + EnvoyProtoData.Node node, VerificationMode verificationMode) { + verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher( + node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null))); + } + + protected void verifyRequestNack( + XdsResourceType type, List resources, String versionInfo, String nonce, + EnvoyProtoData.Node node, List errorMessages) { + verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher( + node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, + Code.INVALID_ARGUMENT_VALUE, errorMessages))); + } + + protected void verifyNoMoreRequest() { + verifyNoMoreInteractions(requestObserver); + } + + protected void sendResponse( + XdsResourceType type, List resources, String versionInfo, String nonce) { + DiscoveryResponse response = + DiscoveryResponse.newBuilder() + .setVersionInfo(versionInfo) + .addAllResources(resources) + .setTypeUrl(type.typeUrl()) + .setNonce(nonce) + .build(); + responseObserver.onNext(response); + } + + protected void sendError(Throwable t) { + responseObserver.onError(t); + } + + protected void sendCompleted() { + responseObserver.onCompleted(); + } + + protected boolean isReady() { + return ((ServerCallStreamObserver)responseObserver).isReady(); + } + } + + static class LrsRpcCall { + private final StreamObserver requestObserver; + private final StreamObserver responseObserver; + private final InOrder inOrder; + + private LrsRpcCall(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + inOrder = inOrder(requestObserver); + } + + protected void verifyNextReportClusters(List clusters) { + inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters))); + } + + protected void sendResponse(List clusters, long loadReportIntervalNano) { + LoadStatsResponse response = + LoadStatsResponse.newBuilder() + .addAllClusters(clusters) + .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano)) + .build(); + responseObserver.onNext(response); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java index 27a0d4ba1d9..b10f3155905 100644 --- a/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java @@ -36,9 +36,14 @@ import javax.annotation.Nullable; public class CommonBootstrapperTestUtils { + public static final String SERVER_URI = "trafficdirector.googleapis.com"; private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create(); private static final String SERVER_URI_CUSTOM_AUTHORITY = "trafficdirector2.googleapis.com"; private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com"; + public static final String LDS_RESOURCE = "listener.googleapis.com"; + public static final String RDS_RESOURCE = "route-configuration.googleapis.com"; + public static final String CDS_RESOURCE = "cluster.googleapis.com"; + public static final String EDS_RESOURCE = "cluster-load-assignment.googleapis.com"; private static final long TIME_INCREMENT = TimeUnit.SECONDS.toNanos(1);