From d454e11c38280e718dcae6ae1118ae4399d6a092 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 10:13:41 -0400 Subject: [PATCH] decouple target JVM ID retrieval from persistence, handle in separate worker threads, attempt reconnections periodically --- .../cryostat/discovery/KubeApiDiscovery.java | 28 ++- .../java/io/cryostat/rules/RuleService.java | 5 + src/main/java/io/cryostat/targets/Target.java | 89 +--------- .../targets/TargetJvmIdUpdateJob.java | 83 +++++++++ .../targets/TargetJvmIdUpdateService.java | 165 ++++++++++++++++++ 5 files changed, 271 insertions(+), 99 deletions(-) create mode 100644 src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java create mode 100644 src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 8bbf10cae..685dbd92c 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -352,24 +352,20 @@ public void handleEndpointEvent(EndpointDiscoveryEvent evt) { DiscoveryNode.environment( namespace, KubeDiscoveryNodeType.NAMESPACE)); - try { - if (evt.eventKind == EventKind.FOUND) { - buildOwnerChain(nsNode, evt.target, evt.objRef); - } else { - pruneOwnerChain(nsNode, evt.target); - } + if (evt.eventKind == EventKind.FOUND) { + buildOwnerChain(nsNode, evt.target, evt.objRef); + } else { + pruneOwnerChain(nsNode, evt.target); + } - if (!nsNode.hasChildren()) { - realm.children.remove(nsNode); - nsNode.parent = null; - } else if (!realm.children.contains(nsNode)) { - realm.children.add(nsNode); - nsNode.parent = realm; - } - realm.persist(); - } catch (Exception e) { - logger.warn("Endpoint handler exception", e); + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; } + realm.persist(); } private void notify(NamespaceQueryEvent evt) { diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 164c53cdb..d080fe5f7 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -40,6 +40,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -86,6 +87,10 @@ void onStart(@Observes StartupEvent ev) { .forEach(this::applyRuleToMatchingTargets)); } + void onStop(@Observes ShutdownEvent evt) throws SchedulerException { + quartz.shutdown(); + } + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index 9bc733033..d722f7825 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -18,7 +18,6 @@ import java.net.URI; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -28,17 +27,14 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; -import io.cryostat.ConfigProperties; -import io.cryostat.core.net.JFRConnection; -import io.cryostat.credentials.Credential; import io.cryostat.discovery.DiscoveryNode; -import io.cryostat.expressions.MatchExpressionEvaluator; -import io.cryostat.libcryostat.JvmIdentifier; import io.cryostat.recordings.ActiveRecording; -import io.cryostat.recordings.RecordingHelper; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; @@ -46,7 +42,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.hibernate.orm.panache.PanacheEntity; -import io.quarkus.vertx.ConsumeEvent; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -63,15 +58,12 @@ import jakarta.persistence.PostRemove; import jakarta.persistence.PostUpdate; import jakarta.persistence.PrePersist; -import jakarta.transaction.Transactional; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import org.apache.commons.lang3.StringUtils; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.hibernate.annotations.JdbcTypeCode; import org.hibernate.type.SqlTypes; import org.jboss.logging.Logger; -import org.projectnessie.cel.tools.ScriptException; @Entity @EntityListeners(Target.Listener.class) @@ -291,53 +283,8 @@ static class Listener { @Inject Logger logger; @Inject EventBus bus; - @Inject TargetConnectionManager connectionManager; - @Inject RecordingHelper recordingHelper; - @Inject MatchExpressionEvaluator matchExpressionEvaluator; - - @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) - Duration timeout; - - @Transactional - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) - void onMessage(TargetDiscovery event) { - var target = Target.find("id", event.serviceRef().id).singleResultOptional(); - switch (event.kind()) { - case LOST: - // this should already be handled by the cascading deletion of the Target - // TODO verify this - break; - case FOUND: - target.ifPresent(recordingHelper::listActiveRecordings); - break; - case MODIFIED: - target.ifPresent(recordingHelper::listActiveRecordings); - break; - default: - // no-op - break; - } - } - - @ConsumeEvent(value = Credential.CREDENTIALS_STORED, blocking = true) - @Transactional - void updateCredential(Credential credential) { - Target.stream("#Target.unconnected") - .forEach( - t -> { - try { - if (matchExpressionEvaluator.applies( - credential.matchExpression, t)) { - updateTargetJvmId(t, credential); - t.persist(); - } - } catch (ScriptException e) { - logger.error(e); - } catch (Exception e) { - logger.warn(e); - } - }); - } + ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); @PrePersist void prePersist(Target target) { @@ -358,35 +305,11 @@ void prePersist(Target target) { if (target.activeRecordings == null) { target.activeRecordings = new ArrayList<>(); } - - try { - if (StringUtils.isBlank(target.jvmId)) { - updateTargetJvmId(target, null); - } - } catch (Exception e) { - logger.warn(e); - } - } - - private void updateTargetJvmId(Target t, Credential credential) { - try { - t.jvmId = - connectionManager - .executeDirect( - t, - Optional.ofNullable(credential), - JFRConnection::getJvmIdentifier) - .map(JvmIdentifier::getHash) - .await() - .atMost(timeout); - } catch (Exception e) { - logger.error(e); - } } @PostPersist void postPersist(Target target) { - notify(EventKind.FOUND, target); + scheduler.schedule(() -> notify(EventKind.FOUND, target), 1, TimeUnit.SECONDS); } @PostUpdate diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java new file mode 100644 index 000000000..0bac9462c --- /dev/null +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -0,0 +1,83 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.targets; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; + +import io.cryostat.ConfigProperties; +import io.cryostat.core.net.JFRConnection; +import io.cryostat.libcryostat.JvmIdentifier; + +import io.quarkus.narayana.jta.QuarkusTransaction; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class TargetJvmIdUpdateJob implements Job { + + @Inject Logger logger; + @Inject TargetConnectionManager connectionManager; + ExecutorService executor = ForkJoinPool.commonPool(); + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration connectionTimeout; + + @Override + @Transactional + public void execute(JobExecutionContext context) throws JobExecutionException { + Target.stream("#Target.unconnected") + .forEach( + t -> { + executor.submit( + () -> { + try { + updateTargetJvmId(t.id); + } catch (Exception e) { + logger.warn(e); + } + }); + }); + } + + private void updateTargetJvmId(long id) { + QuarkusTransaction.requiringNew() + .run( + () -> { + try { + Target target = Target.getTargetById(id); + target.jvmId = + connectionManager + .executeDirect( + target, + Optional.empty(), + JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + target.persist(); + } catch (Exception e) { + logger.error(e); + } + }); + } +} diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java new file mode 100644 index 000000000..ce41141df --- /dev/null +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -0,0 +1,165 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.targets; + +import java.time.Duration; +import java.time.Instant; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import io.cryostat.ConfigProperties; +import io.cryostat.core.net.JFRConnection; +import io.cryostat.credentials.Credential; +import io.cryostat.expressions.MatchExpressionEvaluator; +import io.cryostat.libcryostat.JvmIdentifier; +import io.cryostat.recordings.RecordingHelper; +import io.cryostat.targets.Target.TargetDiscovery; + +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.vertx.ConsumeEvent; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.transaction.Transactional; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import org.projectnessie.cel.tools.ScriptException; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +@ApplicationScoped +public class TargetJvmIdUpdateService { + + @Inject Logger logger; + @Inject TargetConnectionManager connectionManager; + @Inject RecordingHelper recordingHelper; + @Inject EntityManager entityManager; + @Inject MatchExpressionEvaluator matchExpressionEvaluator; + @Inject Scheduler scheduler; + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration connectionTimeout; + + private final List jobs = new CopyOnWriteArrayList<>(); + + void onStart(@Observes StartupEvent evt) { + logger.tracev("{0} started", getClass().getName()); + + JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); + + if (jobs.contains(jobDetail.getKey())) { + return; + } + + Trigger trigger = + TriggerBuilder.newTrigger() + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(30) + .repeatForever() + .withMisfireHandlingInstructionNowWithExistingCount()) + .startAt(Date.from(Instant.now().plusSeconds(30))) + .build(); + try { + scheduler.scheduleJob(jobDetail, trigger); + } catch (SchedulerException e) { + logger.errorv(e, "Failed to schedule JVM ID updater job"); + } + jobs.add(jobDetail.getKey()); + } + + void onStop(@Observes ShutdownEvent evt) throws SchedulerException { + scheduler.shutdown(); + } + + @Transactional + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) + void onMessage(TargetDiscovery event) { + var target = Target.find("id", event.serviceRef().id).singleResultOptional(); + switch (event.kind()) { + case LOST: + // this should already be handled by the cascading deletion of the Target + // TODO verify this + break; + case MODIFIED: + // fall-through + case FOUND: + target.ifPresent( + t -> { + try { + logger.debugv("Updating JVM ID for {0} ({1})", t.connectUrl, t.id); + if (StringUtils.isBlank(t.jvmId)) { + updateTargetJvmId(t, null); + } + } catch (Exception e) { + logger.warn(e); + } + }); + target.ifPresent(recordingHelper::listActiveRecordings); + break; + default: + // no-op + break; + } + } + + @ConsumeEvent(value = Credential.CREDENTIALS_STORED, blocking = true) + @Transactional + void updateCredential(Credential credential) { + Target.stream("#Target.unconnected") + .forEach( + t -> { + try { + if (matchExpressionEvaluator.applies( + credential.matchExpression, t)) { + updateTargetJvmId(t, credential); + } + } catch (ScriptException e) { + logger.error(e); + } catch (Exception e) { + logger.warn(e); + } + }); + } + + private void updateTargetJvmId(Target t, Credential credential) { + try { + t.jvmId = + connectionManager + .executeDirect( + t, + Optional.ofNullable(credential), + JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + t.persist(); + } catch (Exception e) { + logger.error(e); + } + } +}