Skip to content

Commit

Permalink
xds: retain locality stats counter when the child balancer for that l…
Browse files Browse the repository at this point in the history
…ocality is deactivated (backport v1.30.x) (#7096) (#7098)

Create the counter for recording per locality stats upon creating the child balancer for that locality. When the locality is deactivated (due to EDS response update removes it), the counter is not deleted from the LoadStatsStore. Delete it when the child balancer for that locality is shut down. In this way, the lifecycle of the load stats counter for a certain locality stays same with the child balancer for that locality. This is exactly what will happen after we refactor LocalityStore to PriorityLoadBalancer and LrsLoadBalancer (i.e., when some priority is deactivated, its subtree is not deleted immediately, so the LrsLoadBalancer instances for localities still hold the load stats counters).
  • Loading branch information
voidzcy authored Jun 6, 2020
1 parent 9a4657c commit 238c952
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 174 deletions.
220 changes: 79 additions & 141 deletions xds/src/main/java/io/grpc/xds/LocalityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
Expand Down Expand Up @@ -55,7 +54,6 @@
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -115,8 +113,6 @@ final class LocalityStoreImpl implements LocalityStore {
private final OrcaOobUtil orcaOobUtil;
private final PriorityManager priorityManager = new PriorityManager();
private final Map<Locality, LocalityLbInfo> localityMap = new HashMap<>();
// Most current set of localities instructed by traffic director
private Set<Locality> localities = ImmutableSet.of();
private List<DropOverload> dropOverloads = ImmutableList.of();
private long metricsReportIntervalNano = -1;

Expand Down Expand Up @@ -202,56 +198,20 @@ public void reset() {
localityMap.get(locality).shutdown();
}
localityMap.clear();

for (Locality locality : localities) {
loadStatsStore.removeLocality(locality);
}
localities = ImmutableSet.of();

priorityManager.reset();
}

@Override
public void updateLocalityStore(final Map<Locality, LocalityLbEndpoints> localityInfoMap) {

Set<Locality> newLocalities = localityInfoMap.keySet();
// TODO: put endPointWeights into attributes for WRR.
for (Locality locality : newLocalities) {
if (localityMap.containsKey(locality)) {
LocalityLbInfo localityLbInfo = localityMap.get(locality);
LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality);
handleEagsOnChildBalancer(helper, localityLbInfo, localityLbEndpoints, locality);
}
}

for (Locality newLocality : newLocalities) {
if (!localities.contains(newLocality)) {
loadStatsStore.addLocality(newLocality);
localityLbInfo.refreshEndpoints(localityInfoMap.get(locality));
}
}
final Set<Locality> toBeRemovedFromStatsStore = new HashSet<>();
// There is a race between picking a subchannel and updating localities, which leads to
// the possibility that RPCs will be sent to a removed locality. As a result, those RPC
// loads will not be recorded. We consider this to be natural. By removing locality counters
// after updating subchannel pickers, we eliminate the race and conservatively record loads
// happening in that period.
for (Locality oldLocality : localities) {
if (!localityInfoMap.containsKey(oldLocality)) {
toBeRemovedFromStatsStore.add(oldLocality);
}
}
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
for (Locality locality : toBeRemovedFromStatsStore) {
loadStatsStore.removeLocality(locality);
}
}
});
localities = newLocalities;

priorityManager.updateLocalities(localityInfoMap);

for (Locality oldLocality : localityMap.keySet()) {
if (!newLocalities.contains(oldLocality)) {
deactivate(oldLocality);
Expand Down Expand Up @@ -346,14 +306,39 @@ private final class LocalityLbInfo {
final Locality locality;
final LoadBalancer childBalancer;
final ChildHelper childHelper;

@Nullable
private ScheduledHandle delayedDeletionTimer;

LocalityLbInfo(Locality locality, LoadBalancer childBalancer, ChildHelper childHelper) {
LocalityLbInfo(Locality locality) {
this.locality = checkNotNull(locality, "locality");
this.childBalancer = checkNotNull(childBalancer, "childBalancer");
this.childHelper = checkNotNull(childHelper, "childHelper");
loadStatsStore.addLocality(locality);
childHelper = new ChildHelper();
childBalancer = loadBalancerProvider.newLoadBalancer(childHelper);
}

void refreshEndpoints(LocalityLbEndpoints localityLbEndpoints) {
final List<EquivalentAddressGroup> eags = new ArrayList<>();
for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) {
if (endpoint.isHealthy()) {
eags.add(endpoint.getAddress());
}
}
// In extreme case handleResolvedAddresses() may trigger updateBalancingState()
// immediately, so execute handleResolvedAddresses() after all the setup in the caller is
// complete.
childHelper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
if (eags.isEmpty() && !childBalancer.canHandleEmptyAddressListFromNameResolution()) {
childBalancer.handleNameResolutionError(
Status.UNAVAILABLE.withDescription(
"Locality " + locality + " has no healthy endpoint"));
} else {
childBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(eags).build());
}
}
});
}

void shutdown() {
Expand All @@ -362,6 +347,7 @@ void shutdown() {
delayedDeletionTimer = null;
}
childBalancer.shutdown();
loadStatsStore.removeLocality(locality);
logger.log(XdsLogLevel.INFO, "Shut down child balancer for locality {0}", locality);
}

Expand All @@ -375,73 +361,62 @@ void reactivate() {
boolean isDeactivated() {
return delayedDeletionTimer != null;
}
}

class ChildHelper extends ForwardingLoadBalancerHelper {

private final OrcaReportingHelperWrapper orcaReportingHelperWrapper;
class ChildHelper extends ForwardingLoadBalancerHelper {

private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER;
private ConnectivityState currentChildState = CONNECTING;
private final OrcaReportingHelperWrapper orcaReportingHelperWrapper;
private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER;
private ConnectivityState currentChildState = CONNECTING;

ChildHelper(final Locality locality, final ClientLoadCounter counter,
OrcaOobUtil orcaOobUtil) {
checkNotNull(locality, "locality");
checkNotNull(counter, "counter");
checkNotNull(orcaOobUtil, "orcaOobUtil");
Helper delegate = new ForwardingLoadBalancerHelper() {
@Override
protected Helper delegate() {
return helper;
}
ChildHelper() {
final ClientLoadCounter counter = loadStatsStore.getLocalityCounter(locality);
Helper delegate = new ForwardingLoadBalancerHelper() {
@Override
protected Helper delegate() {
return helper;
}

@Override
public void updateBalancingState(ConnectivityState newState,
final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
logger.log(
XdsLogLevel.INFO,
"Update load balancing state for locality {0} to {1}", locality, newState);
currentChildState = newState;
currentChildPicker =
new LoadRecordingSubchannelPicker(counter,
new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter),
newPicker, orcaPerRequestUtil));

// delegate to parent helper
priorityManager.updatePriorityState(priorityManager.getPriority(locality));
}
@Override
public void updateBalancingState(
ConnectivityState newState, SubchannelPicker newPicker) {
logger.log(
XdsLogLevel.INFO,
"Update load balancing state for locality {0} to {1}", locality, newState);
currentChildState = newState;
currentChildPicker =
new LoadRecordingSubchannelPicker(
counter,
new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter),
newPicker, orcaPerRequestUtil));

priorityManager.updatePriorityState(priorityManager.getPriority(locality));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("locality", locality).toString();
}
@Override
public String getAuthority() {
//FIXME: This should be a new proposed field of Locality, locality_name
return locality.getSubZone();
}
};

@Override
public String getAuthority() {
//FIXME: This should be a new proposed field of Locality, locality_name
return locality.getSubZone();
orcaReportingHelperWrapper =
orcaOobUtil.newOrcaReportingHelperWrapper(
delegate, new MetricsRecordingListener(counter));
if (metricsReportIntervalNano > 0) {
updateMetricsReportInterval(metricsReportIntervalNano);
}
};
orcaReportingHelperWrapper =
checkNotNull(orcaOobUtil, "orcaOobUtil")
.newOrcaReportingHelperWrapper(delegate, new MetricsRecordingListener(counter));

if (metricsReportIntervalNano > 0) {
updateMetricsReportInterval(metricsReportIntervalNano);
}
}

void updateMetricsReportInterval(long intervalNanos) {
orcaReportingHelperWrapper
.setReportingConfig(OrcaReportingConfig.newBuilder()
.setReportInterval(intervalNanos, TimeUnit.NANOSECONDS).build());
}
void updateMetricsReportInterval(long intervalNanos) {
orcaReportingHelperWrapper
.setReportingConfig(OrcaReportingConfig.newBuilder()
.setReportInterval(intervalNanos, TimeUnit.NANOSECONDS).build());
}

@Override
protected Helper delegate() {
return orcaReportingHelperWrapper.asHelper();
@Override
protected Helper delegate() {
return orcaReportingHelperWrapper.asHelper();
}
}
}

Expand Down Expand Up @@ -594,47 +569,10 @@ public void run() {

private void initLocality(Locality locality) {
logger.log(XdsLogLevel.INFO, "Create child balancer for locality {0}", locality);
ChildHelper childHelper =
new ChildHelper(locality, loadStatsStore.getLocalityCounter(locality),
orcaOobUtil);
LocalityLbInfo localityLbInfo =
new LocalityLbInfo(
locality,
loadBalancerProvider.newLoadBalancer(childHelper),
childHelper);
LocalityLbInfo localityLbInfo = new LocalityLbInfo(locality);
localityMap.put(locality, localityLbInfo);
LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality);
handleEagsOnChildBalancer(childHelper, localityLbInfo, localityLbEndpoints, locality);
}
}

private static void handleEagsOnChildBalancer(
Helper childHelper, final LocalityLbInfo localityLbInfo,
final LocalityLbEndpoints localityLbEndpoints, final Locality locality) {
final List<EquivalentAddressGroup> eags = new ArrayList<>();
for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) {
if (endpoint.isHealthy()) {
eags.add(endpoint.getAddress());
}
localityLbInfo.refreshEndpoints(localityInfoMap.get(locality));
}
// In extreme case handleResolvedAddresses() may trigger updateBalancingState()
// immediately, so execute handleResolvedAddresses() after all the setup in the caller is
// complete.
childHelper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
if (eags.isEmpty()
&& !localityLbInfo.childBalancer.canHandleEmptyAddressListFromNameResolution()) {
localityLbInfo.childBalancer.handleNameResolutionError(
Status.UNAVAILABLE.withDescription(
"Locality " + locality + " has no healthy endpoint"));
} else {
localityLbInfo.childBalancer
.handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags).build());
}
}
});
}
}
}
33 changes: 0 additions & 33 deletions xds/src/test/java/io/grpc/xds/LocalityStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,39 +235,6 @@ public OrcaReportingHelperWrapper answer(InvocationOnMock invocation) {
orcaPerRequestUtil, orcaOobUtil);
}

@Test
@SuppressWarnings("unchecked")
public void updateLocalityStore_updateStatsStoreLocalityTracking() {
Map<Locality, LocalityLbEndpoints> localityInfoMap = new HashMap<>();
localityInfoMap
.put(locality1,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0));
localityInfoMap
.put(locality2,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2, 0));
localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap));
verify(loadStatsStore).addLocality(locality1);
verify(loadStatsStore).addLocality(locality2);

localityInfoMap
.put(locality3,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3, 0));
localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap));
verify(loadStatsStore).addLocality(locality3);

localityInfoMap = ImmutableMap
.of(locality4,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4, 0));
localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap));
verify(loadStatsStore).removeLocality(locality1);
verify(loadStatsStore).removeLocality(locality2);
verify(loadStatsStore).removeLocality(locality3);
verify(loadStatsStore).addLocality(locality4);

localityStore.updateLocalityStore(ImmutableMap.copyOf(Collections.EMPTY_MAP));
verify(loadStatsStore).removeLocality(locality4);
}

@Test
public void updateLocalityStore_pickResultInterceptedForLoadRecordingWhenSubchannelReady() {
// Simulate receiving two localities.
Expand Down

0 comments on commit 238c952

Please sign in to comment.