Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(registration): discovery plugin registration bugfixes and refactor #193

Merged
merged 4 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 155 additions & 17 deletions src/main/java/io/cryostat/agent/CryostatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;

Expand All @@ -39,8 +42,10 @@
import io.cryostat.agent.model.RegistrationInfo;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -114,7 +119,8 @@ public CompletableFuture<Boolean> checkRegistration(PluginInfo pluginInfo) {
return supply(req, (res) -> logResponse(req, res)).thenApply(this::isOkStatus);
}

public CompletableFuture<PluginInfo> register(PluginInfo pluginInfo, URI callback) {
public CompletableFuture<PluginInfo> register(
int credentialId, PluginInfo pluginInfo, URI callback) {
try {
RegistrationInfo registrationInfo =
new RegistrationInfo(
Expand All @@ -126,7 +132,20 @@ public CompletableFuture<PluginInfo> register(PluginInfo pluginInfo, URI callbac
mapper.writeValueAsString(registrationInfo),
ContentType.APPLICATION_JSON));
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.handle(
(res, t) -> {
if (t != null) {
throw new CompletionException(t);
}
if (!isOkStatus(res)) {
try {
deleteCredentials(credentialId).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to delete previous credentials", e);
}
}
return assertOkStatus(req, res);
})
.thenApply(
res -> {
try (InputStream is = res.getEntity().getContent()) {
Expand Down Expand Up @@ -155,22 +174,82 @@ public CompletableFuture<PluginInfo> register(PluginInfo pluginInfo, URI callbac
public CompletableFuture<Integer> submitCredentialsIfRequired(
int prevId, Credentials credentials, URI callback) {
if (prevId < 0) {
return submitCredentials(credentials, callback);
return queryExistingCredentials(callback)
.thenCompose(
id -> {
if (id >= 0) {
return CompletableFuture.completedFuture(id);
}
return submitCredentials(prevId, credentials, callback);
});
}
HttpGet req = new HttpGet(baseUri.resolve(CREDENTIALS_API_PATH + "/" + prevId));
log.info("{}", req);
return supply(req, (res) -> logResponse(req, res))
.thenApply(this::isOkStatus)
.handle(
(v, t) -> {
if (t != null) {
log.error("Failed to get credentials with ID " + prevId, t);
throw new CompletionException(t);
}
return isOkStatus(v);
})
.thenCompose(
exists -> {
if (exists) {
return CompletableFuture.completedFuture(prevId);
}
return submitCredentials(credentials, callback);
return submitCredentials(prevId, credentials, callback);
});
}

private CompletableFuture<Integer> submitCredentials(Credentials credentials, URI callback) {
private CompletableFuture<Integer> queryExistingCredentials(URI callback) {
HttpGet req = new HttpGet(baseUri.resolve(CREDENTIALS_API_PATH));
log.info("{}", req);
return supply(req, (res) -> logResponse(req, res))
.handle(
(res, t) -> {
if (t != null) {
log.error("Failed to get credentials", t);
throw new CompletionException(t);
}
return assertOkStatus(req, res);
})
.thenApply(
res -> {
try (InputStream is = res.getEntity().getContent()) {
return mapper.readValue(is, ObjectNode.class);
} catch (IOException e) {
log.error("Unable to parse response as JSON", e);
throw new RegistrationException(e);
}
})
.thenApply(
node -> {
try {
return mapper.readValue(
node.get("data").get("result").toString(),
new TypeReference<List<StoredCredential>>() {});
} catch (IOException e) {
log.error("Unable to parse response as JSON", e);
throw new RegistrationException(e);
}
})
.thenApply(
l ->
l.stream()
.filter(
sc ->
Objects.equals(
sc.matchExpression,
selfMatchExpression(callback)))
.map(sc -> sc.id)
.findFirst()
.orElse(-1));
}

private CompletableFuture<Integer> submitCredentials(
int prevId, Credentials credentials, URI callback) {
HttpPost req = new HttpPost(baseUri.resolve(CREDENTIALS_API_PATH));
MultipartEntityBuilder entityBuilder =
MultipartEntityBuilder.create()
Expand Down Expand Up @@ -198,10 +277,38 @@ private CompletableFuture<Integer> submitCredentials(Credentials credentials, UR
log.info("{}", req);
req.setEntity(entityBuilder.build());
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.thenApply(res -> res.getFirstHeader(HttpHeaders.LOCATION).getValue())
.thenApply(res -> res.substring(res.lastIndexOf('/') + 1, res.length()))
.thenApply(Integer::valueOf);
.thenApply(
res -> {
if (!isOkStatus(res)) {
try {
if (res.getStatusLine().getStatusCode() == 409) {
int queried = queryExistingCredentials(callback).get();
if (queried >= 0) {
return queried;
}
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to query for existing credentials", e);
}
try {
deleteCredentials(prevId).get();
} catch (InterruptedException | ExecutionException e) {
log.error(
"Failed to delete previous credentials with id "
+ prevId,
e);
throw new RegistrationException(e);
}
}
String location =
assertOkStatus(req, res)
.getFirstHeader(HttpHeaders.LOCATION)
.getValue();
String id =
location.substring(
location.lastIndexOf('/') + 1, location.length());
return Integer.valueOf(id);
});
}

public CompletableFuture<Void> deleteCredentials(int id) {
Expand All @@ -210,9 +317,7 @@ public CompletableFuture<Void> deleteCredentials(int id) {
}
HttpDelete req = new HttpDelete(baseUri.resolve(CREDENTIALS_API_PATH + "/" + id));
log.info("{}", req);
return supply(req, (res) -> logResponse(req, res))
.thenApply(res -> assertOkStatus(req, res))
.thenApply(res -> null);
return supply(req, (res) -> logResponse(req, res)).thenApply(res -> null);
}

public CompletableFuture<Void> deregister(PluginInfo pluginInfo) {
Expand Down Expand Up @@ -339,14 +444,15 @@ private CountingInputStream getRecordingInputStream(Path filePath) throws IOExce

private String selfMatchExpression(URI callback) {
return String.format(
"target.connectUrl == \"%s\" && target.jvmId == \"%s\" &&"
+ " target.annotations.platform[\"INSTANCE_ID\"] == \"%s\"",
callback, jvmId, instanceId);
"target.connectUrl == \"%s\" && target.annotations.platform[\"INSTANCE_ID\"] =="
+ " \"%s\"",
callback, instanceId);
}

private boolean isOkStatus(HttpResponse res) {
int sc = res.getStatusLine().getStatusCode();
return 200 <= sc && sc < 300;
// 2xx is OK, 3xx is redirect range so allow those too
return 200 <= sc && sc < 400;
}

private HttpResponse assertOkStatus(HttpRequestBase req, HttpResponse res) {
Expand All @@ -364,4 +470,36 @@ private HttpResponse assertOkStatus(HttpRequestBase req, HttpResponse res) {
}
return res;
}

@SuppressFBWarnings(
value = {
"URF_UNREAD_FIELD",
"UWF_UNWRITTEN_FIELD",
"UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"
})
public static class StoredCredential {

public int id;
public String matchExpression;

@Override
public int hashCode() {
return Objects.hash(id, matchExpression);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StoredCredential other = (StoredCredential) obj;
return id == other.id && Objects.equals(matchExpression, other.matchExpression);
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/io/cryostat/agent/HttpException.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ public class HttpException extends RuntimeException {
"Unexpected non-OK status code %d on API path %s",
statusCode, uri.toString()));
}

HttpException(int statusCode, Throwable cause) {
super(String.format("HTTP %d", statusCode), cause);
}
}
34 changes: 22 additions & 12 deletions src/main/java/io/cryostat/agent/MainModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import io.cryostat.core.sys.FileSystem;
import io.cryostat.core.tui.ClientWriter;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import dagger.Lazy;
import dagger.Module;
import dagger.Provides;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
Expand Down Expand Up @@ -92,17 +94,9 @@ public static WebServer provideWebServer(
@Named(ConfigModule.CRYOSTAT_AGENT_WEBSERVER_HOST) String host,
@Named(ConfigModule.CRYOSTAT_AGENT_WEBSERVER_PORT) int port,
@Named(ConfigModule.CRYOSTAT_AGENT_CALLBACK) URI callback,
Lazy<Registration> registration,
@Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_RETRY_MS) int registrationRetryMs) {
Lazy<Registration> registration) {
return new WebServer(
remoteContexts,
cryostat,
executor,
host,
port,
callback,
registration,
registrationRetryMs);
remoteContexts, cryostat, executor, host, port, callback, registration);
}

@Provides
Expand Down Expand Up @@ -170,7 +164,8 @@ public static HttpClient provideHttpClient(

@Provides
public static ObjectMapper provideObjectMapper() {
return new ObjectMapper();
return new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Provides
Expand Down Expand Up @@ -205,8 +200,23 @@ public static Registration provideRegistration(
@Named(ConfigModule.CRYOSTAT_AGENT_APP_JMX_PORT) int jmxPort,
@Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_RETRY_MS) int registrationRetryMs,
@Named(ConfigModule.CRYOSTAT_AGENT_REGISTRATION_CHECK_MS) int registrationCheckMs) {

Logger log = LoggerFactory.getLogger(Registration.class);
return new Registration(
executor,
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("cryostat-agent-registration");
t.setUncaughtExceptionHandler(
(thread, err) ->
log.error(
String.format(
"[%s] Uncaught exception: %s",
thread.getName(),
ExceptionUtils.getStackTrace(err))));
return t;
}),
cryostat,
callback,
webServer,
Expand Down
Loading
Loading