Skip to content

Commit

Permalink
Remove syncContext from watchers. Add checkNotNull, private and final…
Browse files Browse the repository at this point in the history
… as recommended by code review.
  • Loading branch information
larry-safran committed Jan 8, 2025
1 parent f58a49a commit 1fac607
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 83 deletions.
18 changes: 9 additions & 9 deletions xds/src/main/java/io/grpc/xds/XdsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ public ImmutableMap<String, StatusOr<XdsClusterConfig>> getClusters() {
return clusters;
}

public static class XdsClusterConfig {
final String clusterName;
final CdsUpdate clusterResource;
final StatusOr<EdsUpdate> endpoint;
static final class XdsClusterConfig {
private final String clusterName;
private final CdsUpdate clusterResource;
private final StatusOr<EdsUpdate> endpoint; //Will be null for non-EDS clusters

XdsClusterConfig(String clusterName, CdsUpdate clusterResource,
StatusOr<EdsUpdate> endpoint) {
this.clusterName = clusterName;
this.clusterResource = clusterResource;
this.clusterName = checkNotNull(clusterName, "clusterName");
this.clusterResource = checkNotNull(clusterResource, "clusterResource");
this.endpoint = endpoint;
}

Expand Down Expand Up @@ -140,18 +140,18 @@ public StatusOr<EdsUpdate> getEndpoint() {
}
}

static class XdsConfigBuilder {
static final class XdsConfigBuilder {
private LdsUpdate listener;
private RdsUpdate route;
private Map<String, StatusOr<XdsClusterConfig>> clusters = new HashMap<>();

XdsConfigBuilder setListener(LdsUpdate listener) {
this.listener = listener;
this.listener = checkNotNull(listener, "listener");
return this;
}

XdsConfigBuilder setRoute(RdsUpdate route) {
this.route = route;
this.route = checkNotNull(route, "route");
return this;
}

Expand Down
127 changes: 58 additions & 69 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();

XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
SynchronizationContext syncContext, String dataPlaneAuthority,
String listenerName) {
SynchronizationContext syncContext, String dataPlaneAuthority,
String listenerName) {
logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
logger = XdsLogger.withLogId(logId);
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
Expand Down Expand Up @@ -209,6 +209,7 @@ private void cancelClusterWatcherTree(CdsWatcher root) {
* the watchers.
*/
private void maybePublishConfig() {
syncContext.throwIfNotInThisSynchronizationContext();
boolean waitingOnResource = resourceWatchers.values().stream()
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
.anyMatch(watcher -> !watcher.hasResult());
Expand Down Expand Up @@ -381,23 +382,21 @@ public void onChanged(XdsListenerResource.LdsUpdate update) {
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
String rdsName = httpConnectionManager.rdsName();

syncContext.execute(() -> {
boolean changedRdsName = rdsName != null && !rdsName.equals(this.rdsName);
if (changedRdsName) {
cleanUpRdsWatcher();
}
boolean changedRdsName = rdsName != null && !rdsName.equals(this.rdsName);
if (changedRdsName) {
cleanUpRdsWatcher();
}

if (virtualHosts != null) {
updateRoutes(virtualHosts);
} else if (changedRdsName) {
this.rdsName = rdsName;
addWatcher(new RdsWatcher(rdsName));
logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
}
if (virtualHosts != null) {
updateRoutes(virtualHosts);
} else if (changedRdsName) {
this.rdsName = rdsName;
addWatcher(new RdsWatcher(rdsName));
logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
}

setData(update);
maybePublishConfig();
});
setData(update);
maybePublishConfig();
}

@Override
Expand Down Expand Up @@ -433,10 +432,8 @@ public RdsWatcher(String resourceName) {
@Override
public void onChanged(RdsUpdate update) {
setData(update);
syncContext.execute(() -> {
updateRoutes(update.virtualHosts);
maybePublishConfig();
});
updateRoutes(update.virtualHosts);
maybePublishConfig();
}

@Override
Expand All @@ -460,55 +457,51 @@ private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {

@Override
public void onChanged(XdsClusterResource.CdsUpdate update) {
syncContext.execute(() -> {
switch (update.clusterType()) {
case EDS:
switch (update.clusterType()) {
case EDS:
setData(update);
if (!hasWatcher(ENDPOINT_RESOURCE, update.edsServiceName())) {
addWatcher(new EdsWatcher(update.edsServiceName()));
} else {
maybePublishConfig();
}
break;
case LOGICAL_DNS:
setData(update);
maybePublishConfig();
// no eds needed
break;
case AGGREGATE:
if (data != null && data.hasValue()) {
Set<String> oldNames = new HashSet<>(data.getValue().prioritizedClusterNames());
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());

setData(update);
if (!hasWatcher(ENDPOINT_RESOURCE, update.edsServiceName())) {
addWatcher(new EdsWatcher(update.edsServiceName()));
} else {

Set<String> addedClusters = Sets.difference(newNames, oldNames);
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster)));
deletedClusters.forEach((cluster) -> cancelClusterWatcherTree(getCluster(cluster)));

if (!addedClusters.isEmpty()) {
maybePublishConfig();
}
break;
case LOGICAL_DNS:
} else {
setData(update);
maybePublishConfig();
// no eds needed
break;
case AGGREGATE:
if (data != null && data.hasValue()) {
Set<String> oldNames = new HashSet<>(data.getValue().prioritizedClusterNames());
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());

setData(update);

Set<String> addedClusters = Sets.difference(newNames, oldNames);
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster)));
deletedClusters.forEach((cluster) -> cancelClusterWatcherTree(getCluster(cluster)));

if (!addedClusters.isEmpty()) {
maybePublishConfig();
}
} else {
setData(update);
for (String name : update.prioritizedClusterNames()) {
addWatcher(new CdsWatcher(name));
}
for (String name : update.prioritizedClusterNames()) {
addWatcher(new CdsWatcher(name));
}
break;
default:
throw new AssertionError("Unknown cluster type: " + update.clusterType());
}
});
}
break;
default:
throw new AssertionError("Unknown cluster type: " + update.clusterType());
}
}

@Override
public void onResourceDoesNotExist(String resourceName) {
syncContext.execute(() -> {
handleDoesNotExist(resourceName);
maybePublishConfig();
});
handleDoesNotExist(resourceName);
maybePublishConfig();
}
}

Expand All @@ -519,18 +512,14 @@ private EdsWatcher(String resourceName) {

@Override
public void onChanged(XdsEndpointResource.EdsUpdate update) {
syncContext.execute(() -> {
setData(update);
maybePublishConfig();
});
setData(update);
maybePublishConfig();
}

@Override
public void onResourceDoesNotExist(String resourceName) {
syncContext.execute(() -> {
handleDoesNotExist(resourceName);
maybePublishConfig();
});
handleDoesNotExist(resourceName);
maybePublishConfig();
}
}

Expand Down
10 changes: 5 additions & 5 deletions xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,14 @@ public void verify_simple_aggregate() {
testWatcher.lastConfig.getClusters();
assertThat(lastConfigClusters).hasSize(childNames.size() + 1);
StatusOr<XdsConfig.XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().clusterResource;
XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().getClusterResource();
assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE);
assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames);

for (String childName : childNames) {
assertThat(lastConfigClusters).containsKey(childName);
XdsClusterResource.CdsUpdate childResource =
lastConfigClusters.get(childName).getValue().clusterResource;
lastConfigClusters.get(childName).getValue().getClusterResource();
assertThat(childResource.clusterType()).isEqualTo(EDS);
assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName));

Expand Down Expand Up @@ -283,9 +283,9 @@ public void testMissingCdsAndEds() {
// Check that missing EDS reported Status, the other one is present and the garbage EDS is not
Status expectedEdsStatus = Status.UNAVAILABLE.withDescription(
"No " + toContextStr(ENDPOINT_TYPE_NAME , XdsTestUtils.EDS_NAME + 1));
assertThat(returnedClusters.get(0).getValue().endpoint.hasValue()).isTrue();
assertThat(returnedClusters.get(1).getValue().endpoint.hasValue()).isFalse();
assertThat(returnedClusters.get(1).getValue().endpoint.getStatus().toString())
assertThat(returnedClusters.get(0).getValue().getEndpoint().hasValue()).isTrue();
assertThat(returnedClusters.get(1).getValue().getEndpoint().hasValue()).isFalse();
assertThat(returnedClusters.get(1).getValue().getEndpoint().getStatus().toString())
.isEqualTo(expectedEdsStatus.toString());

verify(xdsConfigWatcher, never()).onResourceDoesNotExist(any());
Expand Down

0 comments on commit 1fac607

Please sign in to comment.