Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xds fallback #11254

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open

Xds fallback #11254

wants to merge 28 commits into from

Conversation

larry-safran
Copy link
Contributor

@ejona86 ejona86 requested a review from YifeiZhuang June 17, 2024 20:49
@larry-safran larry-safran marked this pull request as ready for review June 17, 2024 20:49
Copy link
Contributor

@YifeiZhuang YifeiZhuang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good overall. I finally start to understand the spirit of it.

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
*/
default void assignResourcesToOwner(XdsResourceType<?> type, Collection<String> resources,
Object owner) {
// no-op - useful for test cases where everything is mocked
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:I'm confused by the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When everything is mocked there is no reason to do anything, so the cases where you want your test to implement ResourceStore you don't need to explicitly define this method that you don't want to use.

}

@Override
public void handleStreamReady(ServerInfo serverInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the onReady() callback also supported by stubby xdsTransportFactory? @ejona86

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have to have something that recognizes that a stream has come ready that can feed into these calls.

if (resources != null) {

Collection<String> resources =
resourceStore.getSubscribedResources(serverInfo, resourceType, authority);
Copy link
Contributor

@YifeiZhuang YifeiZhuang Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

targetA(channelA) ----> xdsClientA resource1 (authority1) --> subscriber1 authority1 : [serverInfo1, serverInfo2] controlPlaneClient ---> serverInfo1 ---> serverInfo2 resource2 (authority2) --> subscriber2 authority2 : [serverInfo3, serverInfo4] controlPlaneClient ---> serverInfo3 ---> serverInfo4 resource3 (authority1) --> subscriber3 authority1 : [serverInfo1, serverInfo2] ---> serverInfo1 controlPlaneClient ---> serverInfo2(never happens)
targetA(channelB) ----> xdsClientB resource1 (authority1) --> subscriber1 ...

Each xdsClinet is an instance and has its own the resourceSubscriber map, and separate ads streams for it's resources.
The resources to send request for in this adstream adsStream.sendDiscoveryReuqest should be the resources with the authority the same as in this adsStream.

xds/src/main/java/io/grpc/xds/client/XdsClient.java Outdated Show resolved Hide resolved
…d completely disjoint resource names to assuming that any duplicate names would be compatible, particularly cross authority.
@larry-safran
Copy link
Contributor Author

Fallback is ready for review again.

Copy link
Contributor

@YifeiZhuang YifeiZhuang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending what I have, mostly minor, I haven't looked deep enough.

@Nullable
@Override
public Collection<String> getSubscribedResources(ServerInfo serverInfo,
XdsResourceType<? extends ResourceUpdate> type) {
public Collection<String> getSubscribedResources(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this method to something like this?

resources.entrySet().filter( e -> e.serverInfos.contains(serverInfo)).collect...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the authority approach instead of the ServerInfo because fallback is per authority, so you can have a situation where one authority was complete and another did fallback and requests resources, the first authority's resources would include the fallback server's ServerInfo. We don't want to get an update from the fallback server for the first authority so we don't want to request its resources.

census/src/test/java/io/grpc/census/CensusModulesTest.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending what I have.

xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClient.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
…figure out the current CPC.

Add all live ControlPlaneClients to active list rather than skipping ones in backoff.
Changed isConnected() logic to be based on AdsStream call's success or failure
@ejona86
Copy link
Member

ejona86 commented Oct 31, 2024

Ugh. Github sent the review before I was done with it. Looking over the sent comments, I think they make sense alone/aren't wrong. But I'm still working on the review.

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
args.nonce);
if (controlPlaneClient.isStartingUp()) {
controlPlaneClient.startingUpCompleted();
shutdownLowerPriorityCpcs(controlPlaneClient);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong being only for if errors.isEmpty(). Why is it dependent on whether we are ACKing or NACKing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private void internalHandleStreamReady(
ServerInfo serverInfo, ControlPlaneClient controlPlaneClient, String authority) {
ControlPlaneClient activeCpClient = getActiveCpc(authority);
addCpcToAuthority(authority, controlPlaneClient);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding to the list here? Shouldn't we have done that in fallBackToCpc()? It looks like if it isn't registered for the authority yet, then that means it shouldn't be added because a higher priority server is being used for that authority.

Using getAuthoritiesForServerInfo() before getting here looks to be a mistake. It should be based on activeCpClients, not bootstrapinfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a need to add the first one in this method. You are right that it is cleaner to make that explicit rather than use the general method which could potentially cover up some problem in the fallback logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a need to add the first one in this method.

Is there? Why would the first be special? I see nothing that would guarantee this is the first ServerInfo in the authority's list, so it seems this could also make things out-of-order.

We're looping through all authorities that this serverInfo can potentially serve, not just the ones it has been requested on. By definition if it isn't in the list already it isn't supposed to be answering for this authority. So it seems clearly wrong to go about adding to the list.

We got here because the authority has subscribers. Is it even possible for an authority to have a subscriber, non-empty serverInfos, and yet not have any cpc in the list? It looks like watchXdsResource() would add to the list (via addCpcToAuthority()) when the subscriber was created.

I don't see the point of using getAuthoritiesForServerInfo() (which loops through all server info registrations) and then figure out if we care about the authority with hasSubscribers() and activeCpClients.get(authority).contains(readyCpc). Seems all we need to do is:

for (Map.Entry<String, List<ControlPlaneClient>> me : activeCpClients.entrySet()) {
  if (me.getValue().contains(controlPlaneClient)) {
    internalHandleStreamReady(controlPlaneClient, me.getValue());
  }
}

There's not need to check isShutDown() as shut down cleans up activeCpClients. With this construction, it is impossible for getActiveCpc() to return null. There's no need to check activeCpClient.isReady() && !activeCpClients.get(authority).contains(readyCpc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct that it doesn't need to be added here. The cpc would already have been added in manageControlPlaneClient() before we did the adjustResourceSubscription() that would trigger the stream ready.

Added clearing out activatedCpClients in XdsClient.shutdown() to be able to remove the shutdown check

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
serverInfo.target(), type, resource);
getTarget(), type, resource);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of re-discovering what caused this, can we just pass ControlPlaneClient, ServerInfo, or target into onAbsent()? We have that information in the two places we call it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to get target from activeCpc.

Copy link
Member

@ejona86 ejona86 Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to use activeCpc (to call getActiveCpc() at all). Why re-discover what cpc caused this? It seems easy to add the argument. Would it be a pain to update tests or something like that?

Fix fallback logic for new watch triggering fallback to use the right CPC.
Some non-functional clarity improvement
Change hasSubscribers detection to be across all types and not trying to involve any logic in ControlPlaneClient.
Copy link
Contributor Author

@larry-safran larry-safran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things done in commit 003348b

… ADS stream gets an error. Retry after error is an error.
… there hasn't been an active client for that authority rather than using the generic add method all of the time.
…ill in the list for some other authority, call adjustResourceSubscriptions on all resource types.
Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending what I have.

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClient.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/test/java/io/grpc/xds/ControlPlaneRule.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClient.java Outdated Show resolved Hide resolved
// try to fallback to lower priority control plane client
if (checkForFallback
&& doFallbackForAuthority(
cpcThatClosed, serverInfo, subscriber.serverInfos, subscriber.authority)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs for doFallbackForAuthority() say server info should be for the active CPC. But here it is identical to cpcThatClosed.getServerInfo().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only trying to do fallback for authorities for which the closed cpc was the active cpc, they are the same. However, since they are the same, and this is the only place that this method is called, there is no reason to pass the serverInfo.

authoritiesForClosedCpc = getActiveAuthorities(cpcThatClosed)
...
          if (subscriber.hasResult() || !authoritiesForClosedCpc.contains(subscriber.authority)) {
            continue;
          }

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending what I have.

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
}
}
}

public ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) {
public ControlPlaneClient getOrCreateControlPlaneClient(ImmutableList<ServerInfo> serverInfos) {
if (serverInfos == null || serverInfos.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove isEmpty(), as it isn't needed.

The callers are going out of their way to avoid passing null here, so either delete this check or simplify the callers.

(A combined method with fallBackToCpc() as I discuss in other comments would also probably change what happens here, as you'd probably pass in the authority as the argument and do the lookup of the serverInfos within this method.)

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved

ControlPlaneClient cpcToUse = subscriber.loadControlPlane();

if (subscriber.errorDescription != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition can leak a freshly-created cpcToUse. Move loadControlPlane() after the condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

doFallbackIfNecessary(cpcToUse, subscriber.authority);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method re-discovers the code flow taken by getOrCreateControlPlaneClient(). Can that be integrated into one method to avoid the error-prone handling? Every call to getOrCreateControlPlaneClient() needs to add the fallback if necessary, and the only other call to getOrCreateControlPlaneClient() just calls fallBackToCpc() unconditionally. It would be better to create and update the state together in one method so we know the state remains consistent.

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java Outdated Show resolved Hide resolved
private void internalHandleStreamReady(
ServerInfo serverInfo, ControlPlaneClient controlPlaneClient, String authority) {
ControlPlaneClient activeCpClient = getActiveCpc(authority);
addCpcToAuthority(authority, controlPlaneClient);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a need to add the first one in this method.

Is there? Why would the first be special? I see nothing that would guarantee this is the first ServerInfo in the authority's list, so it seems this could also make things out-of-order.

We're looping through all authorities that this serverInfo can potentially serve, not just the ones it has been requested on. By definition if it isn't in the list already it isn't supposed to be answering for this authority. So it seems clearly wrong to go about adding to the list.

We got here because the authority has subscribers. Is it even possible for an authority to have a subscriber, non-empty serverInfos, and yet not have any cpc in the list? It looks like watchXdsResource() would add to the list (via addCpcToAuthority()) when the subscriber was created.

I don't see the point of using getAuthoritiesForServerInfo() (which loops through all server info registrations) and then figure out if we care about the authority with hasSubscribers() and activeCpClients.get(authority).contains(readyCpc). Seems all we need to do is:

for (Map.Entry<String, List<ControlPlaneClient>> me : activeCpClients.entrySet()) {
  if (me.getValue().contains(controlPlaneClient)) {
    internalHandleStreamReady(controlPlaneClient, me.getValue());
  }
}

There's not need to check isShutDown() as shut down cleans up activeCpClients. With this construction, it is impossible for getActiveCpc() to return null. There's no need to check activeCpClient.isReady() && !activeCpClients.get(authority).contains(readyCpc).

}

restartMatchingSubscriberTimers(authority);
readyCpc.sendDiscoveryRequests();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't limited to the current authority, so won't this send discovery multiple times?

…eateControlPlaneClient and do fallback.

Move some logic into watchXdsResource from private methods, such as doFallbackIfNecessary, and streamline the execution.
Stop storing serverInfos in ResourceSubscriber and just construct it from authority as needed.
… term 'active'.

Minor change to stream ready process to avoid sending discovery requests multiple times if the CPC was supporting multiple authorities.
…n return a null to better match the name.

Do some cleanup
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants