Skip to content

Commit

Permalink
decouple target JVM ID retrieval from persistence, handle in separate…
Browse files Browse the repository at this point in the history
… worker threads, attempt reconnections periodically
  • Loading branch information
andrewazores committed Oct 23, 2024
1 parent 2665826 commit d454e11
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 99 deletions.
28 changes: 12 additions & 16 deletions src/main/java/io/cryostat/discovery/KubeApiDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,24 +352,20 @@ public void handleEndpointEvent(EndpointDiscoveryEvent evt) {
DiscoveryNode.environment(
namespace, KubeDiscoveryNodeType.NAMESPACE));

try {
if (evt.eventKind == EventKind.FOUND) {
buildOwnerChain(nsNode, evt.target, evt.objRef);
} else {
pruneOwnerChain(nsNode, evt.target);
}
if (evt.eventKind == EventKind.FOUND) {
buildOwnerChain(nsNode, evt.target, evt.objRef);
} else {
pruneOwnerChain(nsNode, evt.target);
}

if (!nsNode.hasChildren()) {
realm.children.remove(nsNode);
nsNode.parent = null;
} else if (!realm.children.contains(nsNode)) {
realm.children.add(nsNode);
nsNode.parent = realm;
}
realm.persist();
} catch (Exception e) {
logger.warn("Endpoint handler exception", e);
if (!nsNode.hasChildren()) {
realm.children.remove(nsNode);
nsNode.parent = null;
} else if (!realm.children.contains(nsNode)) {
realm.children.add(nsNode);
nsNode.parent = realm;
}
realm.persist();
}

private void notify(NamespaceQueryEvent evt) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/cryostat/rules/RuleService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -86,6 +87,10 @@ void onStart(@Observes StartupEvent ev) {
.forEach(this::applyRuleToMatchingTargets));
}

void onStop(@Observes ShutdownEvent evt) throws SchedulerException {
quartz.shutdown();
}

@ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true)
void onMessage(TargetDiscovery event) {
switch (event.kind()) {
Expand Down
89 changes: 6 additions & 83 deletions src/main/java/io/cryostat/targets/Target.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -28,25 +27,21 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import io.cryostat.ConfigProperties;
import io.cryostat.core.net.JFRConnection;
import io.cryostat.credentials.Credential;
import io.cryostat.discovery.DiscoveryNode;
import io.cryostat.expressions.MatchExpressionEvaluator;
import io.cryostat.libcryostat.JvmIdentifier;
import io.cryostat.recordings.ActiveRecording;
import io.cryostat.recordings.RecordingHelper;
import io.cryostat.ws.MessagingServer;
import io.cryostat.ws.Notification;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -63,15 +58,12 @@
import jakarta.persistence.PostRemove;
import jakarta.persistence.PostUpdate;
import jakarta.persistence.PrePersist;
import jakarta.transaction.Transactional;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.type.SqlTypes;
import org.jboss.logging.Logger;
import org.projectnessie.cel.tools.ScriptException;

@Entity
@EntityListeners(Target.Listener.class)
Expand Down Expand Up @@ -291,53 +283,8 @@ static class Listener {

@Inject Logger logger;
@Inject EventBus bus;
@Inject TargetConnectionManager connectionManager;
@Inject RecordingHelper recordingHelper;
@Inject MatchExpressionEvaluator matchExpressionEvaluator;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Duration timeout;

@Transactional
@ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true)
void onMessage(TargetDiscovery event) {
var target = Target.<Target>find("id", event.serviceRef().id).singleResultOptional();
switch (event.kind()) {
case LOST:
// this should already be handled by the cascading deletion of the Target
// TODO verify this
break;
case FOUND:
target.ifPresent(recordingHelper::listActiveRecordings);
break;
case MODIFIED:
target.ifPresent(recordingHelper::listActiveRecordings);
break;
default:
// no-op
break;
}
}

@ConsumeEvent(value = Credential.CREDENTIALS_STORED, blocking = true)
@Transactional
void updateCredential(Credential credential) {
Target.<Target>stream("#Target.unconnected")
.forEach(
t -> {
try {
if (matchExpressionEvaluator.applies(
credential.matchExpression, t)) {
updateTargetJvmId(t, credential);
t.persist();
}
} catch (ScriptException e) {
logger.error(e);
} catch (Exception e) {
logger.warn(e);
}
});
}
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

@PrePersist
void prePersist(Target target) {
Expand All @@ -358,35 +305,11 @@ void prePersist(Target target) {
if (target.activeRecordings == null) {
target.activeRecordings = new ArrayList<>();
}

try {
if (StringUtils.isBlank(target.jvmId)) {
updateTargetJvmId(target, null);
}
} catch (Exception e) {
logger.warn(e);
}
}

private void updateTargetJvmId(Target t, Credential credential) {
try {
t.jvmId =
connectionManager
.executeDirect(
t,
Optional.ofNullable(credential),
JFRConnection::getJvmIdentifier)
.map(JvmIdentifier::getHash)
.await()
.atMost(timeout);
} catch (Exception e) {
logger.error(e);
}
}

@PostPersist
void postPersist(Target target) {
notify(EventKind.FOUND, target);
scheduler.schedule(() -> notify(EventKind.FOUND, target), 1, TimeUnit.SECONDS);
}

@PostUpdate
Expand Down
83 changes: 83 additions & 0 deletions src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright The Cryostat Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cryostat.targets;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

import io.cryostat.ConfigProperties;
import io.cryostat.core.net.JFRConnection;
import io.cryostat.libcryostat.JvmIdentifier;

import io.quarkus.narayana.jta.QuarkusTransaction;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class TargetJvmIdUpdateJob implements Job {

@Inject Logger logger;
@Inject TargetConnectionManager connectionManager;
ExecutorService executor = ForkJoinPool.commonPool();

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Duration connectionTimeout;

@Override
@Transactional
public void execute(JobExecutionContext context) throws JobExecutionException {
Target.<Target>stream("#Target.unconnected")
.forEach(
t -> {
executor.submit(
() -> {
try {
updateTargetJvmId(t.id);
} catch (Exception e) {
logger.warn(e);
}
});
});
}

private void updateTargetJvmId(long id) {
QuarkusTransaction.requiringNew()
.run(
() -> {
try {
Target target = Target.getTargetById(id);
target.jvmId =
connectionManager
.executeDirect(
target,
Optional.empty(),
JFRConnection::getJvmIdentifier)
.map(JvmIdentifier::getHash)
.await()
.atMost(connectionTimeout);
target.persist();
} catch (Exception e) {
logger.error(e);
}
});
}
}
Loading

0 comments on commit d454e11

Please sign in to comment.