Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(discovery): discovery synchronization for stale lost targets #689

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f2aa3f6
fix(discovery): k8s discovery synchronization for stale lost targets
andrewazores Oct 9, 2024
ce53188
synchronize on lock while building/pruning owner chains to avoid dupl…
andrewazores Oct 9, 2024
4208c7d
do not perform node query in toString
andrewazores Oct 15, 2024
a49cccf
return existing node if already discovered
andrewazores Oct 16, 2024
39d75de
remove unnecessary locking
andrewazores Oct 17, 2024
04edac5
Revert "remove unnecessary locking"
andrewazores Oct 17, 2024
e14582d
start transaction for active recording list request
andrewazores Oct 17, 2024
f086691
cleanup
andrewazores Oct 17, 2024
1f140a4
cleanup
andrewazores Oct 17, 2024
ce4a438
rename
andrewazores Oct 17, 2024
43fa9a6
simplify transaction ordering and locking
andrewazores Oct 17, 2024
8165146
refactor
andrewazores Oct 17, 2024
a7b1005
force resync on existing period
andrewazores Oct 17, 2024
1937597
lower log level
andrewazores Oct 17, 2024
4bb7ba2
ensure full sync on startup
andrewazores Oct 17, 2024
3ea949e
periodically retry storage bucket creation
andrewazores Oct 17, 2024
32b9f96
ensure ordered queue processing of transactions, error handling
andrewazores Oct 22, 2024
5b412a2
fixup! ensure ordered queue processing of transactions, error handling
andrewazores Oct 22, 2024
6516b4b
decouple target JVM ID retrieval from persistence, handle in separate…
andrewazores Oct 23, 2024
ee3605e
ensure JVM ID is nulled if connection fails
andrewazores Oct 23, 2024
bcd6c9d
delay MODIFIED events same as FOUND events
andrewazores Oct 23, 2024
2589069
refactor to use quartz for scheduling delayed connection, and reuse l…
andrewazores Oct 23, 2024
4df0254
remove unused case
andrewazores Oct 23, 2024
63b876f
remove unnecessary job identity
andrewazores Oct 23, 2024
2e35062
handle nullable input data
andrewazores Oct 23, 2024
9b4a2d0
slower initial delay, periodic delay based on connection timeout
andrewazores Oct 23, 2024
d826832
unwrap exception handling so transactions can be rolled back
andrewazores Oct 23, 2024
62b9eda
cleanup
andrewazores Oct 23, 2024
0b3d552
handle single-target updates within existing transaction
andrewazores Oct 23, 2024
10fa20b
reduce delay
andrewazores Oct 23, 2024
e6da47d
skip update if jvmId already known
andrewazores Oct 23, 2024
35885ed
updates should continue even if JVM ID is already known, so that acti…
andrewazores Oct 24, 2024
675caaf
rename
andrewazores Oct 24, 2024
2d13dd8
rules ignore target discovery when JVM ID is still blank, act later a…
andrewazores Oct 24, 2024
3301825
handle updating JVM ID on credential change, null out JVM ID if updat…
andrewazores Oct 24, 2024
c2b8d3f
handle events in ordered serial fashion
andrewazores Oct 24, 2024
d747ede
use infrastructure pool instead of forkjoin
andrewazores Oct 24, 2024
ef99212
Revert "handle events in ordered serial fashion"
andrewazores Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/main/java/io/cryostat/StorageBuckets.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
*/
package io.cryostat;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.cryostat.util.HttpStatusCodeIdentifier;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
Expand All @@ -30,7 +41,17 @@ public class StorageBuckets {
@Inject S3Client storage;
@Inject Logger logger;

@ConfigProperty(name = "storage.buckets.creation-retry.period")
Duration creationRetryPeriod;

private final Set<String> buckets = ConcurrentHashMap.newKeySet();
private final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();

public void createIfNecessary(String bucket) {
buckets.add(bucket);
}

private boolean tryCreate(String bucket) {
boolean exists = false;
logger.debugv("Checking if storage bucket \"{0}\" exists ...", bucket);
try {
Expand All @@ -49,8 +70,27 @@ public void createIfNecessary(String bucket) {
storage.createBucket(CreateBucketRequest.builder().bucket(bucket).build());
logger.debugv("Storage bucket \"{0}\" created", bucket);
} catch (Exception e) {
logger.error(e);
logger.warn(e);
return false;
}
}
return true;
}

void onStart(@Observes StartupEvent evt) {
worker.scheduleAtFixedRate(
() -> {
var it = buckets.iterator();
while (it.hasNext()) {
if (tryCreate(it.next())) it.remove();
}
},
0,
creationRetryPeriod.toMillis(),
TimeUnit.MILLISECONDS);
}

void onStop(@Observes ShutdownEvent evt) {
worker.shutdown();
}
}
71 changes: 47 additions & 24 deletions src/main/java/io/cryostat/discovery/DiscoveryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.fasterxml.jackson.annotation.JsonView;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.panache.common.Parameters;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -42,6 +43,8 @@
import jakarta.persistence.FetchType;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.NamedQueries;
import jakarta.persistence.NamedQuery;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.persistence.PostPersist;
Expand All @@ -56,6 +59,11 @@

@Entity
@EntityListeners(DiscoveryNode.Listener.class)
@NamedQueries({
@NamedQuery(
name = "DiscoveryNode.byTypeWithName",
query = "from DiscoveryNode where nodeType = :nodeType and name = :name")
})
public class DiscoveryNode extends PanacheEntity {

public static final String NODE_TYPE = "nodeType";
Expand Down Expand Up @@ -129,33 +137,48 @@ public static List<DiscoveryNode> findAllByNodeType(NodeType nodeType) {
}

public static DiscoveryNode environment(String name, NodeType nodeType) {
return QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = name;
node.nodeType = nodeType.getKind();
node.labels = new HashMap<>();
node.children = new ArrayList<>();
node.target = null;
node.persist();
return node;
});
var kind = nodeType.getKind();
return DiscoveryNode.<DiscoveryNode>find(
"#DiscoveryNode.byTypeWithName",
Parameters.with("nodeType", kind).and("name", name))
.firstResultOptional()
.orElseGet(
() ->
QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = name;
node.nodeType = kind;
node.labels = new HashMap<>();
node.children = new ArrayList<>();
node.target = null;
node.persist();
return node;
}));
}

public static DiscoveryNode target(Target target, NodeType nodeType) {
return QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = target.connectUrl.toString();
node.nodeType = nodeType.getKind();
node.labels = new HashMap<>(target.labels);
node.children = null;
node.target = target;
node.persist();
return node;
});
var kind = nodeType.getKind();
var connectUrl = target.connectUrl.toString();
return DiscoveryNode.<DiscoveryNode>find(
"#DiscoveryNode.byTypeWithName",
Parameters.with("nodeType", kind).and("name", connectUrl))
.firstResultOptional()
.orElseGet(
() ->
QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = connectUrl;
node.nodeType = kind;
node.labels = new HashMap<>(target.labels);
node.children = null;
node.target = target;
node.persist();
return node;
}));
}

@Override
Expand Down
Loading
Loading