Skip to content

Commit

Permalink
fix(discovery): retry failed target connections (backport #1593) (#1600)
Browse files Browse the repository at this point in the history
* fix(discovery): retry failed target connections (#1593)

(cherry picked from commit bf86535)

# Conflicts:
#	compose/compose-cryostat.yaml
#	compose/compose-vertx-jmx.yaml
#	smoketest-docker.sh
#	smoketest.sh

* cherrypick fixup

* remove cherry-pick added unneeded files

* fix podman discovery labels for older version

---------

Co-authored-by: Andrew Azores <[email protected]>
  • Loading branch information
mergify[bot] and andrewazores authored Jul 27, 2023
1 parent 6c1a320 commit 07238cc
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 121 deletions.
15 changes: 12 additions & 3 deletions smoketest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ runDemoApps() {
--label io.cryostat.connectUrl="service:jmx:rmi:///jndi/rmi://localhost:51423/jmxrmi" \
--rm -d quay.io/roberttoyonaga/jmx:jmxquarkus@sha256:b067f29faa91312d20d43c55d194a2e076de7d0d094da3d43ee7d2b2b5a6f100

podman run \
--name vertx-fib-demo-0 \
--env HTTP_PORT=8079 \
--env JMX_PORT=9089 \
--env START_DELAY=60 \
--pod cryostat-pod \
--label io.cryostat.connectUrl="service:jmx:rmi:///jndi/rmi://localhost:9089/jmxrmi" \
--rm -d quay.io/andrewazores/vertx-fib-demo:0.13.0

podman run \
--name vertx-fib-demo-1 \
--env HTTP_PORT=8081 \
Expand All @@ -129,7 +138,7 @@ runDemoApps() {
--env CRYOSTAT_AGENT_AUTHORIZATION="Basic $(echo user:pass | base64)" \
--pod cryostat-pod \
--label io.cryostat.connectUrl="service:jmx:rmi:///jndi/rmi://localhost:9093/jmxrmi" \
--rm -d quay.io/andrewazores/vertx-fib-demo:0.12.3
--rm -d quay.io/andrewazores/vertx-fib-demo:0.13.0

podman run \
--name vertx-fib-demo-2 \
Expand All @@ -147,7 +156,7 @@ runDemoApps() {
--env CRYOSTAT_AGENT_AUTHORIZATION="Basic $(echo user:pass | base64)" \
--pod cryostat-pod \
--label io.cryostat.connectUrl="service:jmx:rmi:///jndi/rmi://localhost:9094/jmxrmi" \
--rm -d quay.io/andrewazores/vertx-fib-demo:0.12.3
--rm -d quay.io/andrewazores/vertx-fib-demo:0.13.0

podman run \
--name vertx-fib-demo-3 \
Expand All @@ -166,7 +175,7 @@ runDemoApps() {
--env CRYOSTAT_AGENT_AUTHORIZATION="Basic $(echo user:pass | base64)" \
--pod cryostat-pod \
--label io.cryostat.connectUrl="service:jmx:rmi:///jndi/rmi://localhost:9095/jmxrmi" \
--rm -d quay.io/andrewazores/vertx-fib-demo:0.12.3
--rm -d quay.io/andrewazores/vertx-fib-demo:0.13.0

# this config is broken on purpose (missing required env vars) to test the agent's behaviour
# when not properly set up
Expand Down
110 changes: 76 additions & 34 deletions src/main/java/io/cryostat/discovery/DiscoveryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Predicate;

import javax.script.ScriptException;

Expand All @@ -57,14 +60,14 @@
import io.cryostat.configuration.StoredCredentials;
import io.cryostat.core.log.Logger;
import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind;
import io.cryostat.net.web.http.AbstractAuthenticatedRequestHandler;
import io.cryostat.platform.ServiceRef;
import io.cryostat.platform.ServiceRef.AnnotationKey;
import io.cryostat.platform.discovery.AbstractNode;
import io.cryostat.platform.discovery.BaseNodeType;
import io.cryostat.platform.discovery.EnvironmentNode;
import io.cryostat.platform.discovery.TargetNode;
import io.cryostat.recordings.JvmIdHelper;
import io.cryostat.recordings.JvmIdHelper.JvmIdGetException;
import io.cryostat.rules.MatchExpressionEvaluator;
import io.cryostat.util.HttpStatusCodeIdentifier;

Expand Down Expand Up @@ -96,9 +99,10 @@ public class DiscoveryStorage extends AbstractPlatformClientVerticle {
private final Gson gson;
private final WebClient http;
private final Logger logger;
private long timerId = -1L;
private long pluginPruneTimerId = -1L;
private long targetRetryTimeId = -1L;

private final Map<TargetNode, UUID> targetsToUpdate = new HashMap<>();
private final Map<TargetNode, UUID> nonConnectableTargets = new HashMap<>();

public static final String DISCOVERY_STARTUP_ADDRESS = "discovery-startup";

Expand Down Expand Up @@ -143,34 +147,53 @@ public void start(Promise<Void> future) throws Exception {
+ " deployed")))
.onFailure(future::fail);

this.timerId = getVertx().setPeriodic(pingPeriod.toMillis(), i -> pingPrune());
this.pluginPruneTimerId = getVertx().setPeriodic(pingPeriod.toMillis(), i -> pingPrune());
this.targetRetryTimeId =
getVertx()
.setPeriodic(
// TODO make this configurable, use an exponential backoff, have a
// maximum retry policy, etc.
15_000,
i -> {
testNonConnectedTargets(
entry -> {
TargetNode targetNode = entry.getKey();
try {
String id =
jvmIdHelper
.get()
.resolveId(
targetNode.getTarget())
.getJvmId();
return StringUtils.isNotBlank(id);
} catch (JvmIdGetException e) {
logger.info(
"Retain null jvmId for node [{}]",
targetNode.getName());
logger.info(e);
return false;
}
});
});
this.credentialsManager
.get()
.addListener(
event -> {
switch (event.getEventType()) {
case ADDED:
Map<TargetNode, UUID> copy = new HashMap<>(targetsToUpdate);
for (var entry : copy.entrySet()) {
try {
if (matchExpressionEvaluator
.get()
.applies(
event.getPayload(),
entry.getKey().getTarget())) {
targetsToUpdate.remove(entry.getKey());
UUID id = entry.getValue();
PluginInfo plugin = getById(id).orElseThrow();
EnvironmentNode original =
gson.fromJson(
plugin.getSubtree(),
EnvironmentNode.class);
update(id, original.getChildren());
}
} catch (JsonSyntaxException | ScriptException e) {
throw new RuntimeException(e);
}
}
testNonConnectedTargets(
entry -> {
try {
return matchExpressionEvaluator
.get()
.applies(
event.getPayload(),
entry.getKey().getTarget());
} catch (ScriptException e) {
logger.error(e);
return false;
}
});
break;
case REMOVED:
break;
Expand All @@ -181,9 +204,33 @@ public void start(Promise<Void> future) throws Exception {
});
}

private void testNonConnectedTargets(Predicate<Entry<TargetNode, UUID>> predicate) {
ForkJoinPool.commonPool()
.execute(
() -> {
Map<TargetNode, UUID> copy = new HashMap<>(nonConnectableTargets);
for (var entry : copy.entrySet()) {
try {
if (predicate.test(entry)) {
nonConnectableTargets.remove(entry.getKey());
UUID id = entry.getValue();
PluginInfo plugin = getById(id).orElseThrow();
EnvironmentNode original =
gson.fromJson(
plugin.getSubtree(), EnvironmentNode.class);
update(id, original.getChildren());
}
} catch (JsonSyntaxException e) {
throw new RuntimeException(e);
}
}
});
}

@Override
public void stop() {
getVertx().cancelTimer(timerId);
getVertx().cancelTimer(pluginPruneTimerId);
getVertx().cancelTimer(targetRetryTimeId);
}

private CompositeFuture pingPrune() {
Expand Down Expand Up @@ -315,17 +362,12 @@ private List<AbstractNode> modifyChildrenWithJvmIds(
ServiceRef ref = ((TargetNode) child).getTarget();
try {
ref = jvmIdHelper.get().resolveId(ref);
child = new TargetNode(child.getNodeType(), ref, child.getLabels());
} catch (Exception e) {
// if Exception is of SSL or JMX Auth, ignore warning and use null jvmId
if (!(AbstractAuthenticatedRequestHandler.isJmxAuthFailure(e)
|| AbstractAuthenticatedRequestHandler.isJmxSslFailure(e))) {
logger.info("Ignoring target node [{}]", child.getName());
continue;
}
logger.info("Update node [{}] with null jvmId", child.getName());
targetsToUpdate.putIfAbsent((TargetNode) child, id);
logger.info(e);
nonConnectableTargets.putIfAbsent((TargetNode) child, id);
}
child = new TargetNode(child.getNodeType(), ref, child.getLabels());
modifiedChildren.add(child);
} else if (child instanceof EnvironmentNode) {
modifiedChildren.add(
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/cryostat/recordings/JvmIdHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public class JvmIdHelper extends AbstractEventEmitter<JvmIdHelper.IdEvent, Strin
}

private boolean observe(ServiceRef sr) {
logger.info("Observing new target: {}", sr);
if (StringUtils.isBlank(sr.getJvmId())) {
return false;
}
Expand All @@ -133,6 +132,7 @@ public ServiceRef resolveId(ServiceRef sr) throws JvmIdGetException {
if (observe(sr)) {
return sr;
}
logger.info("Observing new target: {}", sr);
URI serviceUri = sr.getServiceUri();
String uriStr = serviceUri.toString();
try {
Expand Down
82 changes: 0 additions & 82 deletions src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import javax.inject.Singleton;

Expand All @@ -68,7 +67,6 @@
import io.cryostat.platform.internal.DefaultPlatformClient;
import io.cryostat.platform.internal.KubeApiPlatformClient;
import io.cryostat.recordings.JvmIdHelper;
import io.cryostat.recordings.JvmIdHelper.JvmIdGetException;
import io.cryostat.rules.MatchExpressionEvaluator;

import com.google.gson.Gson;
Expand Down Expand Up @@ -916,85 +914,5 @@ public PluginInfo answer(InvocationOnMock invocation)
new TargetDiscoveryEvent(EventKind.FOUND, updatedServiceRef3),
new TargetDiscoveryEvent(EventKind.MODIFIED, updatedServiceRef4)));
}

@Test
void testIgnoreNodesIfNonSSLAuthExceptions() throws Exception {
UUID id = UUID.randomUUID();

ServiceRef serviceRef1 =
new ServiceRef(
null,
URI.create("service:jmx:rmi:///jndi/rmi://localhost:1/jmxrmi"),
"serviceRef1");
ServiceRef serviceRef2 =
new ServiceRef(
null,
URI.create("service:jmx:rmi:///jndi/rmi://localhost:2/jmxrmi"),
"serviceRef2");
TargetNode target1 = new TargetNode(BaseNodeType.JVM, serviceRef1);
TargetNode target2 = new TargetNode(BaseNodeType.JVM, serviceRef2);

EnvironmentNode realm1 =
new EnvironmentNode("next", BaseNodeType.REALM, Map.of(), Set.of(target1));
EnvironmentNode realm2 =
new EnvironmentNode(
"next", BaseNodeType.REALM, Map.of(), Set.of(target1, target2));

PluginInfo prevPlugin =
new PluginInfo(
"test-realm", URI.create("http://example.com"), gson.toJson(realm1));

Mockito.when(dao.get(Mockito.eq(id))).thenReturn(Optional.of(prevPlugin));
Mockito.when(dao.update(Mockito.any(UUID.class), Mockito.any(Collection.class)))
.thenAnswer(
new Answer<PluginInfo>() {
@Override
public PluginInfo answer(InvocationOnMock invocation)
throws Throwable {
List<AbstractNode> subtree = invocation.getArgument(1);
EnvironmentNode next =
new EnvironmentNode(
"next", BaseNodeType.REALM, Map.of(), subtree);
return new PluginInfo(
"test-realm",
URI.create("http://example.com"),
gson.toJson(next));
}
});
JvmIdGetException jige = Mockito.mock(JvmIdGetException.class);
ExecutionException ex = Mockito.mock(ExecutionException.class);
Mockito.when(jige.getCause()).thenReturn(ex);
Mockito.when(ex.getCause()).thenReturn(new SecurityException("test"));

Mockito.when(jvmIdHelper.resolveId(Mockito.any(ServiceRef.class)))
.thenThrow(jige)
.thenReturn(
new ServiceRef(
"serviceRef2", serviceRef2.getServiceUri(), "serviceRef2"));

var updatedSubtree = storage.update(id, List.of(realm2));
MatcherAssert.assertThat(updatedSubtree, Matchers.notNullValue());
MatcherAssert.assertThat(updatedSubtree, Matchers.hasSize(1));
for (AbstractNode node : updatedSubtree) {
MatcherAssert.assertThat(node, Matchers.instanceOf(EnvironmentNode.class));
EnvironmentNode env = (EnvironmentNode) node;
MatcherAssert.assertThat(env.getChildren(), Matchers.hasSize(2));
for (AbstractNode nested : env.getChildren()) {
if (nested instanceof TargetNode) {
TargetNode target = (TargetNode) nested;
MatcherAssert.assertThat(target, Matchers.instanceOf(TargetNode.class));
if (target.getTarget().getAlias().get().equals("serviceRef1")) {
MatcherAssert.assertThat(
target.getTarget().getJvmId(), Matchers.nullValue());
} else if (target.getTarget().getAlias().get().equals("serviceRef2")) {
MatcherAssert.assertThat(
target.getTarget().getJvmId(), Matchers.equalTo("serviceRef2"));
} else {
throw new IllegalStateException("Unexpected alias");
}
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/test/java/itest/bases/ExternalTargetsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

public abstract class ExternalTargetsTest extends StandardSelfTest {

protected static final String FIB_DEMO_IMAGESPEC = "quay.io/andrewazores/vertx-fib-demo:0.8.0";
protected static final String FIB_DEMO_IMAGESPEC = "quay.io/andrewazores/vertx-fib-demo:0.13.0";

static final int DISCOVERY_POLL_PERIOD_MS =
Integer.parseInt(System.getProperty("cryostat.itest.jdp.poll.period", "2500"));
Expand Down

0 comments on commit 07238cc

Please sign in to comment.