From aa5ab4eac5034e69aa9f991a155414fc2794eaca Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 24 Apr 2024 13:39:57 -0400 Subject: [PATCH] fix(transaction): ensure transactions occur in correct context/thread (#396) --- .../expressions/MatchExpressions.java | 6 +- .../io/cryostat/graphql/ActiveRecordings.java | 63 +++---- .../cryostat/graphql/ArchivedRecordings.java | 1 + .../io/cryostat/graphql/EnvironmentNodes.java | 2 + .../java/io/cryostat/graphql/RootNode.java | 3 + .../java/io/cryostat/graphql/TargetNodes.java | 16 +- .../cryostat/recordings/RecordingHelper.java | 162 +++++++++--------- .../io/cryostat/recordings/Recordings.java | 7 +- .../java/io/cryostat/reports/Reports.java | 7 +- src/main/resources/application.properties | 1 + 10 files changed, 137 insertions(+), 131 deletions(-) diff --git a/src/main/java/io/cryostat/expressions/MatchExpressions.java b/src/main/java/io/cryostat/expressions/MatchExpressions.java index 256413761..1055a4b57 100644 --- a/src/main/java/io/cryostat/expressions/MatchExpressions.java +++ b/src/main/java/io/cryostat/expressions/MatchExpressions.java @@ -29,7 +29,6 @@ import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; import jakarta.ws.rs.GET; -import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.core.Response; @@ -73,10 +72,7 @@ public Multi> list() { @RolesAllowed("read") @Blocking public MatchedExpression get(@RestPath long id) throws ScriptException { - MatchExpression expr = MatchExpression.findById(id); - if (expr == null) { - throw new NotFoundException(); - } + MatchExpression expr = MatchExpression.find("id", id).singleResult(); return targetMatcher.match(expr); } diff --git a/src/main/java/io/cryostat/graphql/ActiveRecordings.java b/src/main/java/io/cryostat/graphql/ActiveRecordings.java index 4f3f35985..2960b6924 100644 --- a/src/main/java/io/cryostat/graphql/ActiveRecordings.java +++ b/src/main/java/io/cryostat/graphql/ActiveRecordings.java @@ -26,6 +26,7 @@ import org.openjdk.jmc.common.unit.QuantityConversionException; +import io.cryostat.ConfigProperties; import io.cryostat.core.templates.Template; import io.cryostat.core.templates.TemplateType; import io.cryostat.discovery.DiscoveryNode; @@ -44,10 +45,10 @@ import io.smallrye.common.annotation.Blocking; import io.smallrye.graphql.api.Nullable; import io.smallrye.graphql.execution.ExecutionException; -import io.smallrye.mutiny.Uni; import jakarta.inject.Inject; import jakarta.transaction.Transactional; import jdk.jfr.RecordingState; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.graphql.Description; import org.eclipse.microprofile.graphql.GraphQLApi; import org.eclipse.microprofile.graphql.Mutation; @@ -61,6 +62,9 @@ public class ActiveRecordings { @Inject RecordingHelper recordingHelper; @Inject Logger logger; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + @Blocking @Transactional @Mutation @@ -226,53 +230,55 @@ public List createSnapshot(@NonNull DiscoveryNodeFilter nodes) @Blocking @Transactional @Description("Start a new Flight Recording on the specified Target") - public Uni doStartRecording( + public ActiveRecording doStartRecording( @Source Target target, @NonNull RecordingSettings recording) throws QuantityConversionException { - var fTarget = Target.findById(target.id); + var fTarget = Target.getTargetById(target.id); Template template = recordingHelper.getPreferredTemplate( fTarget, recording.template, TemplateType.valueOf(recording.templateType)); - return recordingHelper.startRecording( - fTarget, - Optional.ofNullable(recording.replace) - .map(RecordingReplace::valueOf) - .orElse(RecordingReplace.STOPPED), - template, - recording.asOptions(), - Optional.ofNullable(recording.metadata).map(s -> s.labels).orElse(Map.of())); + return recordingHelper + .startRecording( + fTarget, + Optional.ofNullable(recording.replace) + .map(RecordingReplace::valueOf) + .orElse(RecordingReplace.STOPPED), + template, + recording.asOptions(), + Optional.ofNullable(recording.metadata).map(s -> s.labels).orElse(Map.of())) + .await() + .atMost(timeout); } @Blocking @Transactional @Description("Create a new Flight Recorder Snapshot on the specified Target") - public Uni doSnapshot(@Source Target target) { - var fTarget = Target.findById(target.id); - return recordingHelper.createSnapshot(fTarget); + public ActiveRecording doSnapshot(@Source Target target) { + var fTarget = Target.getTargetById(target.id); + return recordingHelper.createSnapshot(fTarget).await().atMost(timeout); } @Blocking @Transactional @Description("Stop the specified Flight Recording") - public Uni doStop(@Source ActiveRecording recording) throws Exception { - var ar = ActiveRecording.findById(recording.id); - return recordingHelper.stopRecording(ar); + public ActiveRecording doStop(@Source ActiveRecording recording) throws Exception { + var ar = ActiveRecording.find("id", recording.id).singleResult(); + return recordingHelper.stopRecording(ar).await().atMost(timeout); } @Blocking @Transactional @Description("Delete the specified Flight Recording") - public Uni doDelete(@Source ActiveRecording recording) { - var ar = ActiveRecording.findById(recording.id); - return recordingHelper.deleteRecording(ar); + public ActiveRecording doDelete(@Source ActiveRecording recording) { + var ar = ActiveRecording.find("id", recording.id).singleResult(); + return recordingHelper.deleteRecording(ar).await().atMost(timeout); } @Blocking - @Transactional @Description("Archive the specified Flight Recording") - public Uni doArchive(@Source ActiveRecording recording) throws Exception { - var ar = ActiveRecording.findById(recording.id); - return Uni.createFrom().item(recordingHelper.archiveRecording(ar, null, null)); + public ArchivedRecording doArchive(@Source ActiveRecording recording) throws Exception { + var ar = ActiveRecording.find("id", recording.id).singleResult(); + return recordingHelper.archiveRecording(ar, null, null); } public TargetNodes.ActiveRecordings active( @@ -319,14 +325,9 @@ public RecordingOptions asOptions() { @Blocking @Transactional @Description("Updates the metadata labels for an existing Flight Recording.") - public Uni doPutMetadata( + public ActiveRecording doPutMetadata( @Source ActiveRecording recording, MetadataLabels metadataInput) { - return Uni.createFrom() - .item( - () -> { - return recordingHelper.updateRecordingMetadata( - recording.id, metadataInput.getLabels()); - }); + return recordingHelper.updateRecordingMetadata(recording.id, metadataInput.getLabels()); } @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") diff --git a/src/main/java/io/cryostat/graphql/ArchivedRecordings.java b/src/main/java/io/cryostat/graphql/ArchivedRecordings.java index e62a40a5e..171b355de 100644 --- a/src/main/java/io/cryostat/graphql/ArchivedRecordings.java +++ b/src/main/java/io/cryostat/graphql/ArchivedRecordings.java @@ -78,6 +78,7 @@ public ArchivedRecording doDelete(@Source ArchivedRecording recording) { return recording; } + @Blocking @NonNull public ArchivedRecording doPutMetadata( @Source ArchivedRecording recording, MetadataLabels metadataInput) { diff --git a/src/main/java/io/cryostat/graphql/EnvironmentNodes.java b/src/main/java/io/cryostat/graphql/EnvironmentNodes.java index d70859989..c5615312a 100644 --- a/src/main/java/io/cryostat/graphql/EnvironmentNodes.java +++ b/src/main/java/io/cryostat/graphql/EnvironmentNodes.java @@ -20,6 +20,7 @@ import io.cryostat.discovery.DiscoveryNode; import io.cryostat.graphql.RootNode.DiscoveryNodeFilter; +import io.smallrye.common.annotation.Blocking; import io.smallrye.graphql.api.Nullable; import org.eclipse.microprofile.graphql.Description; import org.eclipse.microprofile.graphql.GraphQLApi; @@ -28,6 +29,7 @@ @GraphQLApi public class EnvironmentNodes { + @Blocking @Query("environmentNodes") @Description("Get all environment nodes in the discovery tree with optional filtering") public List environmentNodes(@Nullable DiscoveryNodeFilter filter) { diff --git a/src/main/java/io/cryostat/graphql/RootNode.java b/src/main/java/io/cryostat/graphql/RootNode.java index 90b76fcb4..3a1633667 100644 --- a/src/main/java/io/cryostat/graphql/RootNode.java +++ b/src/main/java/io/cryostat/graphql/RootNode.java @@ -24,6 +24,7 @@ import io.cryostat.graphql.matchers.LabelSelectorMatcher; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.smallrye.common.annotation.Blocking; import io.smallrye.graphql.api.Nullable; import org.eclipse.microprofile.graphql.Description; import org.eclipse.microprofile.graphql.GraphQLApi; @@ -33,12 +34,14 @@ @GraphQLApi public class RootNode { + @Blocking @Query("rootNode") @Description("Get the root target discovery node") public DiscoveryNode getRootNode() { return DiscoveryNode.getUniverse(); } + @Blocking @Description( "Get target nodes that are descendants of this node. That is, get the set of leaf nodes" + " from anywhere below this node's subtree.") diff --git a/src/main/java/io/cryostat/graphql/TargetNodes.java b/src/main/java/io/cryostat/graphql/TargetNodes.java index dfe4907a0..7c77c68a8 100644 --- a/src/main/java/io/cryostat/graphql/TargetNodes.java +++ b/src/main/java/io/cryostat/graphql/TargetNodes.java @@ -36,8 +36,8 @@ import io.smallrye.common.annotation.Blocking; import io.smallrye.graphql.api.Context; import io.smallrye.graphql.api.Nullable; -import io.smallrye.mutiny.Uni; import jakarta.inject.Inject; +import jakarta.transaction.Transactional; import org.apache.commons.lang3.StringUtils; import org.eclipse.microprofile.graphql.Description; import org.eclipse.microprofile.graphql.GraphQLApi; @@ -75,9 +75,10 @@ public List getTargetNodes(DiscoveryNodeFilter filter) { // } @Blocking + @Transactional public ActiveRecordings activeRecordings( @Source Target target, @Nullable ActiveRecordingsFilter filter) { - var fTarget = Target.findById(target.id); + var fTarget = Target.getTargetById(target.id); var recordings = new ActiveRecordings(); if (StringUtils.isNotBlank(fTarget.jvmId)) { recordings.data = @@ -92,7 +93,7 @@ public ActiveRecordings activeRecordings( @Blocking public ArchivedRecordings archivedRecordings( @Source Target target, @Nullable ArchivedRecordingsFilter filter) { - var fTarget = Target.findById(target.id); + var fTarget = Target.getTargetById(target.id); var recordings = new ArchivedRecordings(); if (StringUtils.isNotBlank(fTarget.jvmId)) { recordings.data = @@ -105,9 +106,10 @@ public ArchivedRecordings archivedRecordings( } @Blocking + @Transactional @Description("Get the active and archived recordings belonging to this target") public Recordings recordings(@Source Target target, Context context) { - var fTarget = Target.findById(target.id); + var fTarget = Target.getTargetById(target.id); var recordings = new Recordings(); if (StringUtils.isBlank(fTarget.jvmId)) { return recordings; @@ -133,9 +135,9 @@ public Recordings recordings(@Source Target target, Context context) { @Blocking @Description("Get live MBean metrics snapshot from the specified Target") - public Uni mbeanMetrics(@Source Target target) { - var fTarget = Target.findById(target.id); - return connectionManager.executeConnectedTaskUni(fTarget, JFRConnection::getMBeanMetrics); + public MBeanMetrics mbeanMetrics(@Source Target target) { + var fTarget = Target.getTargetById(target.id); + return connectionManager.executeConnectedTask(fTarget, JFRConnection::getMBeanMetrics); } @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index 1a615128a..ac6dea3f1 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -106,6 +106,7 @@ import org.quartz.JobExecutionException; import org.quartz.JobKey; import org.quartz.Scheduler; +import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import software.amazon.awssdk.core.exception.SdkClientException; @@ -305,87 +306,92 @@ public Uni startRecording( Map rawLabels) throws QuantityConversionException { String recordingName = options.name(); - return connectionManager.executeConnectedTaskUni( - target, - conn -> { - RecordingOptionsBuilder optionsBuilder = - recordingOptionsBuilderFactory.create(target).name(options.name()); - if (options.duration().isPresent()) { - optionsBuilder = - optionsBuilder.duration( - TimeUnit.SECONDS.toMillis(options.duration().get())); - } - if (options.toDisk().isPresent()) { - optionsBuilder = optionsBuilder.toDisk(options.toDisk().get()); - } - if (options.maxAge().isPresent()) { - optionsBuilder = optionsBuilder.maxAge(options.maxAge().get()); - } - if (options.maxSize().isPresent()) { - optionsBuilder = optionsBuilder.maxSize(options.maxSize().get()); - } - IConstrainedMap recordingOptions = optionsBuilder.build(); - getDescriptorByName(conn, options.name()) - .ifPresent( - previous -> { - RecordingState previousState = mapState(previous); - boolean restart = - shouldRestartRecording( - replace, previousState, options.name()); - if (!restart) { - throw new EntityExistsException( - "Recording", options.name()); - } - listActiveRecordings(target).stream() - .filter(r -> r.name.equals(recordingName)) - .forEach(this::deleteRecording); - }); - - IRecordingDescriptor desc = - conn.getService() + + RecordingState previousState = + connectionManager.executeConnectedTask( + target, + conn -> + getDescriptorByName(conn, recordingName) + .map(this::mapState) + .orElse(null)); + boolean restart = + previousState == null + || shouldRestartRecording(replace, previousState, recordingName); + if (!restart) { + throw new EntityExistsException("Recording", recordingName); + } + listActiveRecordings(target).stream() + .filter(r -> r.name.equals(recordingName)) + .forEach(this::deleteRecording); + var desc = + connectionManager.executeConnectedTask( + target, + conn -> { + RecordingOptionsBuilder optionsBuilder = + recordingOptionsBuilderFactory + .create(target) + .name(recordingName); + if (options.duration().isPresent()) { + optionsBuilder = + optionsBuilder.duration( + TimeUnit.SECONDS.toMillis( + options.duration().get())); + } + if (options.toDisk().isPresent()) { + optionsBuilder = optionsBuilder.toDisk(options.toDisk().get()); + } + if (options.maxAge().isPresent()) { + optionsBuilder = optionsBuilder.maxAge(options.maxAge().get()); + } + if (options.maxSize().isPresent()) { + optionsBuilder = optionsBuilder.maxSize(options.maxSize().get()); + } + IConstrainedMap recordingOptions = optionsBuilder.build(); + + return conn.getService() .start( recordingOptions, template.getName(), template.getType()); + }); - Map labels = new HashMap<>(rawLabels); - labels.put("template.name", template.getName()); - labels.put("template.type", template.getType().toString()); - Metadata meta = new Metadata(labels); - - ActiveRecording recording = ActiveRecording.from(target, desc, meta); - recording.persist(); - - target.activeRecordings.add(recording); - target.persist(); + Map labels = new HashMap<>(rawLabels); + labels.put("template.name", template.getName()); + labels.put("template.type", template.getType().toString()); + Metadata meta = new Metadata(labels); + + ActiveRecording recording = ActiveRecording.from(target, desc, meta); + recording.persist(); + + target.activeRecordings.add(recording); + target.persist(); + + if (!recording.continuous) { + JobDetail jobDetail = + JobBuilder.newJob(StopRecordingJob.class) + .withIdentity(recording.name, target.jvmId) + .build(); + if (!jobs.contains(jobDetail.getKey())) { + Map data = jobDetail.getJobDataMap(); + data.put("recordingId", recording.id); + data.put("archive", options.archiveOnStop().orElse(false)); + Trigger trigger = + TriggerBuilder.newTrigger() + .withIdentity(recording.name, target.jvmId) + .usingJobData(jobDetail.getJobDataMap()) + .startAt(new Date(System.currentTimeMillis() + recording.duration)) + .build(); + try { + scheduler.scheduleJob(jobDetail, trigger); + } catch (SchedulerException e) { + logger.warn(e); + } + } + } - if (!recording.continuous) { - JobDetail jobDetail = - JobBuilder.newJob(StopRecordingJob.class) - .withIdentity(recording.name, target.jvmId) - .build(); - if (!jobs.contains(jobDetail.getKey())) { - Map data = jobDetail.getJobDataMap(); - data.put("recordingId", recording.id); - data.put("archive", options.archiveOnStop().orElse(false)); - Trigger trigger = - TriggerBuilder.newTrigger() - .withIdentity(recording.name, target.jvmId) - .usingJobData(jobDetail.getJobDataMap()) - .startAt( - new Date( - System.currentTimeMillis() - + recording.duration)) - .build(); - scheduler.scheduleJob(jobDetail, trigger); - } - } + logger.tracev("Started recording: {0} {1}", target.connectUrl, target.activeRecordings); - logger.tracev( - "Started recording: {0} {1}", - target.connectUrl, target.activeRecordings); - return recording; - }); + return Uni.createFrom().item(recording); } public Uni createSnapshot(Target target) { @@ -901,7 +907,7 @@ public InputStream getActiveInputStream(ActiveRecording recording) throws Except } public InputStream getActiveInputStream(long targetId, long remoteId) throws Exception { - var target = Target.findById(targetId); + var target = Target.getTargetById(targetId); var recording = target.getRecordingById(remoteId); var stream = remoteRecordingStreamFactory.open(recording); return stream; @@ -1063,11 +1069,7 @@ private Metadata taggingToMetadata(List tagSet) { public ActiveRecording updateRecordingMetadata( long recordingId, Map newLabels) { - ActiveRecording recording = ActiveRecording.findById(recordingId); - - if (recording == null) { - throw new NotFoundException("Recording not found for ID: " + recordingId); - } + ActiveRecording recording = ActiveRecording.find("id", recordingId).singleResult(); if (!recording.metadata.labels().equals(newLabels)) { Metadata updatedMetadata = new Metadata(newLabels); diff --git a/src/main/java/io/cryostat/recordings/Recordings.java b/src/main/java/io/cryostat/recordings/Recordings.java index ba1bf5965..3f4fb95fd 100644 --- a/src/main/java/io/cryostat/recordings/Recordings.java +++ b/src/main/java/io/cryostat/recordings/Recordings.java @@ -443,6 +443,7 @@ public Collection listFsArchives(@RestPath String jv } @GET + @Transactional @Path("/api/v3/targets/{id}/recordings") @RolesAllowed("read") public List listForTarget(@RestPath long id) throws Exception { @@ -763,6 +764,7 @@ public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String fil @POST @Blocking + @Transactional @Path("/api/v1/targets/{connectUrl}/recordings/{recordingName}/upload") @RolesAllowed("write") public Response uploadActiveToGrafanaV1( @@ -947,10 +949,7 @@ public Map patchRecordingOptions( @Path("/api/v3/activedownload/{id}") @RolesAllowed("read") public Response handleActiveDownload(@RestPath long id) throws Exception { - ActiveRecording recording = ActiveRecording.findById(id); - if (recording == null) { - throw new NotFoundException(); - } + ActiveRecording recording = ActiveRecording.find("id", id).singleResult(); if (!transientArchivesEnabled) { return Response.status(RestResponse.Status.OK) .header( diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index da110a52e..bdf20909f 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -31,6 +31,7 @@ import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import jakarta.transaction.Transactional; import jakarta.ws.rs.ClientErrorException; import jakarta.ws.rs.GET; import jakarta.ws.rs.NotFoundException; @@ -109,6 +110,7 @@ public Uni> get(@RestPath String encodedKey) { @GET @Blocking + @Transactional @Path("/api/v1/targets/{targetId}/reports/{recordingName}") @Produces({MediaType.APPLICATION_JSON}) @RolesAllowed("read") @@ -137,10 +139,7 @@ public Response getActiveV1(@RestPath String targetId, @RestPath String recordin @Deprecated(since = "3.0", forRemoval = true) public Uni> getActive( @RestPath long targetId, @RestPath long recordingId) throws Exception { - var target = Target.findById(targetId); - if (target == null) { - throw new NotFoundException(); - } + var target = Target.getTargetById(targetId); var recording = target.getRecordingById(recordingId); if (recording == null) { throw new NotFoundException(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 695c79032..f27a26dcd 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -58,6 +58,7 @@ quarkus.smallrye-openapi.info-license-name=Apache 2.0 quarkus.smallrye-openapi.info-license-url=https://github.com/cryostatio/cryostat3/blob/main/LICENSE quarkus.smallrye-graphql.events.enabled=true +quarkus.smallrye-graphql.nonblocking.enabled=false quarkus.smallrye-graphql.root-path=/api/v3/graphql quarkus.smallrye-graphql.http.get.enabled=true quarkus.smallrye-graphql.print-data-fetcher-exception=true