From 7625f5f78b674127ed4e0492aa0a2afbd5af4515 Mon Sep 17 00:00:00 2001 From: Luca Molteni Date: Fri, 7 Apr 2023 15:58:58 +0200 Subject: [PATCH] A few more needed classes copied from Connector --- .../cos/fleetshard/sync/it/SyncResource.java | 11 + .../sync/it/ProcessorProvisionerTest.java | 329 ++++++++++++++++++ .../sync/it/support/SyncTestSupport.java | 34 ++ .../sync/client/FleetShardClient.java | 46 ++- .../sync/resources/ProcessorStatusSync.java | 146 ++++++++ .../resources/ProcessorStatusUpdater.java | 148 ++++++++ .../sync/resources/ResourcePoll.java | 5 + .../src/main/resources/application.properties | 2 +- .../crds/kustomization.yaml | 1 + 9 files changed, 720 insertions(+), 2 deletions(-) create mode 100644 cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java create mode 100644 cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java create mode 100644 cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java diff --git a/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java b/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java index 3b37d3e1..daf65de8 100644 --- a/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java +++ b/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java @@ -10,6 +10,7 @@ import org.bf2.cos.fleetshard.sync.housekeeping.reapers.AddonReaper; import org.bf2.cos.fleetshard.sync.resources.ConnectorDeploymentProvisioner; import org.bf2.cos.fleetshard.sync.resources.ConnectorNamespaceProvisioner; +import org.bf2.cos.fleetshard.sync.resources.ProcessorDeploymentProvisioner; import org.bf2.cos.fleetshard.sync.resources.ResourcePoll; @ApplicationScoped @@ -20,6 +21,8 @@ public class SyncResource { @Inject ConnectorDeploymentProvisioner deploymentProvisioner; @Inject + ProcessorDeploymentProvisioner processorDeploymentProvisioner; + @Inject AddonReaper addonReaper; @Inject ResourcePoll resourceSync; @@ -38,12 +41,20 @@ public void pollConnectors(Long gv) { deploymentProvisioner.poll(gv); } + @Path("/provisioner/processors") + @POST + @Consumes(MediaType.TEXT_PLAIN) + public void pollProcessors(Long gv) { + processorDeploymentProvisioner.poll(gv); + } + @Path("/provisioner/all") @POST @Consumes(MediaType.TEXT_PLAIN) public void poll() { namespaceProvisioner.poll(0); deploymentProvisioner.poll(0); + processorDeploymentProvisioner.poll(0); } @Path("/provisioner/sync") diff --git a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java new file mode 100644 index 00000000..73b51388 --- /dev/null +++ b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java @@ -0,0 +1,329 @@ +package org.bf2.cos.fleetshard.sync.it; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import javax.ws.rs.core.MediaType; + +import org.bf2.cos.fleet.manager.model.KafkaConnectionSettings; +import org.bf2.cos.fleet.manager.model.ProcessorDesiredState; +import org.bf2.cos.fleet.manager.model.ServiceAccount; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.resources.Namespaces; +import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.it.support.FleetManagerMockServer; +import org.bf2.cos.fleetshard.sync.it.support.FleetManagerTestInstance; +import org.bf2.cos.fleetshard.sync.it.support.SyncTestProfile; +import org.bf2.cos.fleetshard.sync.it.support.SyncTestSupport; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.http.ContentTypeHeader; +import com.github.tomakehurst.wiremock.http.RequestMethod; + +import io.fabric8.kubernetes.api.model.Namespace; +import io.fabric8.kubernetes.api.model.Secret; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static io.restassured.RestAssured.given; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; +import static org.assertj.core.api.Assertions.assertThat; +import static org.bf2.cos.fleetshard.support.resources.Resources.uid; +import static org.bf2.cos.fleetshard.support.resources.Secrets.SECRET_ENTRY_META; +import static org.bf2.cos.fleetshard.support.resources.Secrets.SECRET_ENTRY_PROCESSOR; +import static org.bf2.cos.fleetshard.support.resources.Secrets.SECRET_ENTRY_SERVICE_ACCOUNT; +import static org.bf2.cos.fleetshard.support.resources.Secrets.toBase64; + +@QuarkusTest +@TestProfile(ProcessorProvisionerTest.Profile.class) +public class ProcessorProvisionerTest extends SyncTestSupport { + public static final String DEPLOYMENT_ID = uid(); + public static final String KAFKA_URL = "kafka.acme.com:2181"; + public static final String KAFKA_CLIENT_ID = uid(); + public static final String KAFKA_CLIENT_SECRET = toBase64(uid()); + + @FleetManagerTestInstance + FleetManagerMockServer server; + + @ConfigProperty(name = "cos.cluster.id") + String clusterId; + + @Test + void processorIsProvisioned() { + { + given() + .contentType(MediaType.TEXT_PLAIN) + .body(0L) + .post("/test/provisioner/namespaces"); + + Namespace ns1 = until( + () -> fleetShardClient.getNamespace(clusterId), + Objects::nonNull); + } + + { + // + // Deployment v1 + // + + given() + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.TEXT_PLAIN) + .body(0L) + .post("/test/provisioner/processors"); + + Secret s1 = until( + () -> fleetShardClient.getProcessorSecret(clusterId, DEPLOYMENT_ID), + item -> Objects.equals( + "1", + item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION))); + + ManagedProcessor mc = until( + () -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID), + item -> { + return item.getSpec().getDeploymentResourceVersion() == 1L + && item.getSpec().getSecret() != null; + }); + + assertThat(s1).satisfies(item -> { + assertThat(item.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(mc.getSpec().getDeploymentId())); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT)) + .isObject() + .containsEntry("client_id", KAFKA_CLIENT_ID) + .containsEntry("client_secret", KAFKA_CLIENT_SECRET); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_PROCESSOR)) + .inPath("processor") + .isObject() + .containsEntry("foo", "processor-foo"); + assertThatJson(Secrets.extract(item, SECRET_ENTRY_PROCESSOR)) + .inPath("kafka") + .isObject() + .containsEntry("topic", "kafka-foo"); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_META)) + .isObject() + .containsEntry("processor_type", "sink"); + assertThatJson(Secrets.extract(item, SECRET_ENTRY_META)) + .isObject() + .containsEntry("processor_image", "quay.io/mcs_dev/aws-s3-sink:0.0.1"); + + assertThat(s1.getMetadata().getLabels()) + .containsEntry("cos.bf2.org/organization-id", "20000000") + .containsEntry("cos.bf2.org/pricing-tier", "essential"); + assertThat(s1.getMetadata().getAnnotations()) + .containsEntry("my.cos.bf2.org/processor-group", "baz"); + }); + + assertThat(mc.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); + assertThat(mc.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); + assertThat(mc.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); + + assertThat(mc.getMetadata().getLabels()) + .containsEntry("cos.bf2.org/organization-id", "20000000") + .containsEntry("cos.bf2.org/pricing-tier", "essential"); + assertThat(mc.getMetadata().getAnnotations()) + .containsEntry("my.cos.bf2.org/processor-group", "baz"); + + } + { + // + // Deployment v2 + // + + given() + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.TEXT_PLAIN) + .body(1L) + .post("/test/provisioner/processors"); + + Secret s1 = until( + () -> fleetShardClient.getSecret(clusterId, DEPLOYMENT_ID), + item -> Objects.equals( + "2", + item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION))); + + ManagedProcessor mc = until( + () -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID), + item -> { + return item.getSpec().getDeploymentResourceVersion() == 2L + && item.getSpec().getSecret() != null; + }); + + assertThat(s1).satisfies(item -> { + assertThat(item.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(mc.getSpec().getDeploymentId())); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT)) + .isObject() + .containsEntry("client_id", KAFKA_CLIENT_ID) + .containsEntry("client_secret", KAFKA_CLIENT_SECRET); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_PROCESSOR)) + .inPath("processor") + .isObject() + .containsEntry("foo", "processor-bar"); + assertThatJson(Secrets.extract(item, SECRET_ENTRY_PROCESSOR)) + .inPath("kafka") + .isObject() + .containsEntry("topic", "kafka-bar"); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_META)) + .isObject() + .containsEntry("processor_type", "sink"); + assertThatJson(Secrets.extract(item, SECRET_ENTRY_META)) + .isObject() + .containsEntry("processor_image", "quay.io/mcs_dev/aws-s3-sink:0.1.0"); + + assertThat(s1.getMetadata().getLabels()) + .containsEntry("cos.bf2.org/organization-id", "20000001") + .containsEntry("cos.bf2.org/pricing-tier", "essential"); + assertThat(s1.getMetadata().getAnnotations()) + .containsEntry("my.cos.bf2.org/processor-group", "baz"); + }); + + assertThat(mc.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); + assertThat(mc.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); + assertThat(mc.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); + + assertThat(mc.getMetadata().getLabels()) + .containsEntry("cos.bf2.org/organization-id", "20000001") + .containsEntry("cos.bf2.org/pricing-tier", "essential"); + assertThat(mc.getMetadata().getAnnotations()) + .containsEntry("my.cos.bf2.org/processor-group", "baz"); + } + } + + public static class Profile extends SyncTestProfile { + @Override + public Map getConfigOverrides() { + return Map.of( + "cos.cluster.id", getId(), + "test.namespace", Namespaces.generateNamespaceId(getId()), + "cos.namespace", Namespaces.generateNamespaceId(getId()), + "cos.resources.poll-interval", "disabled", + "cos.resources.resync-interval", "disabled", + "cos.resources.update-interval", "disabled", + "cos.metrics.recorder.tags.annotations[0]", "my.cos.bf2.org/processor-group", + "cos.metrics.recorder.tags.labels[0]", "cos.bf2.org/organization-id", + "cos.metrics.recorder.tags.labels[1]", "cos.bf2.org/pricing-tier"); + } + + @Override + public List testResources() { + return List.of( + new TestResourceEntry(FleetManagerTestResource.class)); + } + } + + public static class FleetManagerTestResource extends org.bf2.cos.fleetshard.sync.it.support.ControlPlaneTestResource { + @Override + protected void configure(FleetManagerMockServer server) { + final String clusterId = ConfigProvider.getConfig().getValue("cos.cluster.id", String.class); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/namespaces", + resp -> { + JsonNode body = namespaceList( + namespace(clusterId, clusterId)); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + + server.stubMatching( + RequestMethod.PUT, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments/.*/status", + () -> WireMock.ok()); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments", + req -> req.withQueryParam("gt_version", equalTo("0")), + resp -> { + JsonNode body = processorDeploymentList( + processorDeployment(DEPLOYMENT_ID, 1L, + depl -> { + depl.getMetadata().annotations(Map.of( + "my.cos.bf2.org/processor-group", "baz", + "cos.bf2.org/organization-id", "20000000", + "cos.bf2.org/pricing-tier", "essential")); + }, + spec -> { + spec.namespaceId(clusterId); + spec.processorId("processor-1"); + spec.processorTypeId("processor-type-1"); + spec.processorResourceVersion(1L); + spec.kafka( + new KafkaConnectionSettings() + .url(KAFKA_URL)); + spec.serviceAccount( + new ServiceAccount() + .clientId(KAFKA_CLIENT_ID) + .clientSecret(KAFKA_CLIENT_SECRET)); + spec.shardMetadata(node(n -> { + n.withArray("operators").addObject() + .put("type", "camel-processor-operator") + .put("version", "[1.0.0,2.0.0)"); + })); + + spec.desiredState(ProcessorDesiredState.READY); + })); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments", + req -> req.withQueryParam("gt_version", equalTo("1")), + resp -> { + JsonNode body = processorDeploymentList( + processorDeployment(DEPLOYMENT_ID, 2L, + depl -> { + depl.getMetadata().annotations(Map.of( + "my.cos.bf2.org/processor-group", "baz", + "cos.bf2.org/organization-id", "20000001", + "cos.bf2.org/pricing-tier", "essential") + ); + }, + spec -> { + spec.namespaceId(clusterId); + spec.processorId("processor-1"); + spec.processorTypeId("processor-type-1"); + spec.processorResourceVersion(1L); + spec.kafka( + new KafkaConnectionSettings() + .url(KAFKA_URL)); + spec.serviceAccount( + new ServiceAccount() + .clientId(KAFKA_CLIENT_ID) + .clientSecret(KAFKA_CLIENT_SECRET)); + spec.shardMetadata(node(n -> { + n.put("processor_type", "sink"); + n.put("processor_image", "quay.io/mcs_dev/aws-s3-sink:0.1.0"); + n.withArray("operators").addObject() + .put("type", "camel-processor-operator") + .put("version", "[1.0.0,2.0.0)"); + })); + spec.desiredState(ProcessorDesiredState.READY); + })); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + } + } +} diff --git a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java index 2fe22694..9ff5b9e4 100644 --- a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java +++ b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java @@ -24,6 +24,9 @@ import org.bf2.cos.fleet.manager.model.ConnectorNamespaceStatus; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceTenant; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceTenantKind; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentList; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentSpec; import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; import org.bf2.cos.fleetshard.sync.client.FleetShardClient; @@ -60,6 +63,19 @@ public static ObjectNode deploymentList(ConnectorDeployment... deployments) { return Serialization.jsonMapper().convertValue(items, ObjectNode.class); } + public static ObjectNode processorDeploymentList(ProcessorDeployment... deployments) { + var items = new ProcessorDeploymentList(); + items.page(1); + items.size(deployments.length); + items.total(deployments.length); + + for (ProcessorDeployment deployment : deployments) { + items.addItemsItem(deployment); + } + + return Serialization.jsonMapper().convertValue(items, ObjectNode.class); + } + public static ObjectNode namespaceList(ConnectorNamespaceDeployment... namespaces) { var items = new ConnectorNamespaceDeploymentList(); items.page(1); @@ -136,6 +152,24 @@ public static ConnectorDeployment deployment( return answer; } + public static ProcessorDeployment processorDeployment( + String name, + long revision, + Consumer deploymentConsumer, + Consumer deploymentSpecConsumer) { + + ProcessorDeployment answer = new ProcessorDeployment() + .kind("ProcessorDeployment") + .id(name) + .metadata(new ConnectorDeploymentAllOfMetadata().resourceVersion(revision)) + .spec(new ProcessorDeploymentSpec()); + + deploymentConsumer.accept(answer); + deploymentSpecConsumer.accept(answer.getSpec()); + + return answer; + } + public static JsonNode node(Consumer consumer) { ObjectNode answer = Serialization.jsonMapper().createObjectNode(); consumer.accept(answer); diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java index ec788ca3..40c66f9d 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java @@ -66,6 +66,10 @@ public void start() throws Exception { .inAnyNamespace() .withLabel(Resources.LABEL_CLUSTER_ID, getClusterId()) .inform(); + processorsInformer = kubernetesClient.resources(ManagedProcessor.class) + .inAnyNamespace() + .withLabel(Resources.LABEL_CLUSTER_ID, getClusterId()) + .inform(); operatorsInformer.stopped().whenComplete((unused, throwable) -> { if (throwable != null) { @@ -85,6 +89,12 @@ public void start() throws Exception { System.exit(-1); } }); + processorsInformer.stopped().whenComplete((unused, throwable) -> { + if (throwable != null) { + LOGGER.warn("Processor informer has stopped working, exiting", throwable); + System.exit(-1); + } + }); } @Override @@ -92,6 +102,7 @@ public void stop() throws Exception { Resources.closeQuietly(operatorsInformer); Resources.closeQuietly(namespaceInformers); Resources.closeQuietly(connectorsInformer); + Resources.closeQuietly(processorsInformer); } public String getClusterId() { @@ -109,6 +120,13 @@ public long getMaxDeploymentResourceRevision() { .orElse(0); } + public long getProcessorMaxDeploymentResourceRevision() { + return this.processorsInformer.getIndexer().list().stream() + .mapToLong(c -> c.getSpec().getDeploymentResourceVersion()) + .max() + .orElse(0); + } + public long getMaxNamespaceResourceRevision() { return this.namespaceInformers.getIndexer().list().stream() .mapToLong(c -> { @@ -220,6 +238,15 @@ public Optional getSecret(String namespaceId, String deploymentId) { .get()); } + + public Optional getProcessorSecret(String namespaceId, String deploymentId) { + return Optional.ofNullable( + kubernetesClient.secrets() + .inNamespace(generateNamespaceId(namespaceId)) + .withName(Secrets.generateProcessorSecretId(deploymentId)) + .get()); + } + // ************************************* // // Connectors @@ -235,7 +262,7 @@ public Boolean deleteConnector(ManagedConnector managedConnector) { } public Optional getProcessor(NamespacedName id) { - if (connectorsInformer == null) { + if (processorsInformer == null) { throw new IllegalStateException("Informer must be started before adding handlers"); } @@ -298,6 +325,15 @@ public List getAllConnectors() { return connectorsInformer.getIndexer().list(); } + + public List getAllProcessors() { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + return processorsInformer.getIndexer().list(); + } + public List getConnectors(String namespace) { if (connectorsInformer == null) { throw new IllegalStateException("Informer must be started before adding handlers"); @@ -326,6 +362,14 @@ public void watchConnectors(ResourceEventHandler handler) { connectorsInformer.addEventHandler(handler); } + public void watchProcessor(ResourceEventHandler handler) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + processorsInformer.addEventHandler(handler); + } + public ManagedConnector createConnector(ManagedConnector connector) { return kubernetesClient.resource(connector) .inNamespace(connector.getMetadata().getNamespace()) diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java new file mode 100644 index 00000000..8af21202 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java @@ -0,0 +1,146 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.Temporal; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.micrometer.core.instrument.Counter; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.Service; +import org.bf2.cos.fleetshard.support.metrics.StaticMetricsRecorder; +import org.bf2.cos.fleetshard.support.resources.NamespacedName; +import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; +import org.bf2.cos.fleetshard.sync.FleetShardSyncScheduler; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.bf2.cos.fleetshard.sync.metrics.MetricsID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class ProcessorStatusSync implements Service { + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusSync.class); + + public static final String JOB_ID = "cos.processors.status.sync"; + public static final String METRICS_SYNC = "processors.status.sync"; + public static final String METRICS_UPDATE = "processors.status.update"; + + @Inject + ProcessorStatusUpdater updater; + @Inject + FleetShardClient connectorClient; + @Inject + FleetShardSyncConfig config; + @Inject + FleetShardSyncScheduler scheduler; + + @Inject + @MetricsID(METRICS_SYNC) + StaticMetricsRecorder syncRecorder; + @Inject + @MetricsID(METRICS_SYNC + ".total") + Counter syncTotalRecorder; + + @Inject + @MetricsID(METRICS_UPDATE) + StaticMetricsRecorder updateRecorder; + @Inject + @MetricsID(METRICS_UPDATE + ".total") + Counter updateTotalRecorder; + + private volatile Instant lastResync; + private volatile Instant lastUpdate; + + private final ConcurrentMap processors = new ConcurrentHashMap<>(); + + @Override + public void start() throws Exception { + LOGGER.info("Starting processor status sync"); + + connectorClient.watchProcessor(new ResourceEventHandler<>() { + @Override + public void onAdd(ManagedProcessor connector) { + processors.put(NamespacedName.of(connector), Instant.now()); + } + + @Override + public void onUpdate(ManagedProcessor ignored, ManagedProcessor connector) { + processors.put(NamespacedName.of(connector), Instant.now()); + } + + @Override + public void onDelete(ManagedProcessor connector, boolean deletedFinalStateUnknown) { + processors.remove(NamespacedName.of(connector)); + } + }); + + scheduler.schedule( + JOB_ID, + ConnectorStatusSyncJob.class, + config.resources().updateInterval()); + } + + @Override + public void stop() { + scheduler.shutdownQuietly(JOB_ID); + } + + public void run() { + final Duration resyncInterval = config.resources().resyncInterval(); + final Instant now = Instant.now(); + final boolean resync = lastResync == null || greater(lastResync, now, resyncInterval); + + if (resync) { + syncRecorder.record(this::sync); + lastResync = now; + } else { + updateRecorder.record(this::update); + } + + lastUpdate = now; + } + + private void sync() { + int count = 0; + + try { + for (ManagedProcessor processor : connectorClient.getAllProcessors()) { + updater.update(processor); + + count++; + } + } finally { + if (count > 0) { + syncTotalRecorder.increment(count); + } + } + } + + private void update() { + int count = 0; + + try { + for (Map.Entry entry : processors.entrySet()) { + if (entry.getValue().isAfter(lastUpdate)) { + connectorClient.getProcessor(entry.getKey()).ifPresent(updater::update); + + count++; + } + } + } finally { + if (count > 0) { + updateTotalRecorder.increment(count); + } + } + } + + private static boolean greater(Temporal startInclusive, Temporal endExclusive, Duration interval) { + return Duration.between(startInclusive, endExclusive).compareTo(interval) >= 0; + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java new file mode 100644 index 00000000..a2756f91 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java @@ -0,0 +1,148 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import java.time.Instant; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.metrics.MetricsSupport; +import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClientException; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class ProcessorStatusUpdater { + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusUpdater.class); + + public static final String CONNECTOR_STATE = "processor.state"; + public static final String CONNECTOR_STATE_COUNT = "processor.state.count"; + + static final int CONNECTOR_STATE_READY = 1; + static final int CONNECTOR_STATE_FAILED = 2; + static final int CONNECTOR_STATE_DELETED = 3; + static final int CONNECTOR_STATE_STOPPED = 4; + static final int CONNECTOR_STATE_IN_PROCESS = 5; + + @Inject + FleetManagerClient fleetManagerClient; + @Inject + FleetShardClient connectorClient; + @Inject + MeterRegistry registry; + @Inject + FleetShardSyncConfig config; + + public void update(ManagedProcessor processor) { + LOGGER.debug("Update processor status (name: {}, phase: {})", + processor.getMetadata().getName(), + processor.getStatus().getPhase()); + +// try { +// ProcessorDeploymentStatus ProcessorDeploymentStatus = Process.extract(processor); + +// fleetManagerClient.updateConnectorStatus(processor, ProcessorDeploymentStatus); + +// LOGGER.debug("Updating Connector status metrics (Connector_id: {}, state: {})", +// processor.get(), ProcessorDeploymentStatus.getPhase()); +// +// switch (ProcessorDeploymentStatus.getPhase()) { +// case READY: +// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_READY); +// break; +// case FAILED: +// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_FAILED); +// break; +// case DELETED: +// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_DELETED); +// break; +// case STOPPED: +// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_STOPPED); +// break; +// default: +// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_IN_PROCESS); +// break; +// } + +// } catch (FleetManagerClientException e) { +// if (e.getStatusCode() == 410) { +// LOGGER.info("Connector " + processor.getMetadata().getName() + " does not exists anymore, deleting it"); +// if (connectorClient.deleteConnector(processor)) { +// LOGGER.info("Connector " + processor.getMetadata().getName() + " deleted"); +// } +// } else { +// LOGGER.warn("Error updating status of connector " + processor.getMetadata().getName(), e); +// } +// } catch (Exception e) { +// LOGGER.warn("Error updating status of connector " + processor.getMetadata().getName(), e); +// } + } + + /* + * Expose a Gauge metric "cos_fleetshard_sync_connector_state" which reveals the current connector state. + * Metric value of 1 implies that the connector is in Ready state. Similarly, 2 -> Failed, 3 -> Deleted, + * 4 -> Stopped, 5 -> In Process + * Also exposing a Counter metrics "cos_fleetshard_sync_connector_state_count_total" which reveals each + * state count for the connector + */ + private void measure(ManagedProcessor processor, ProcessorDeploymentStatus ProcessorDeploymentStatus, int connectorState) { + + List tags = MetricsSupport.tags(config.metrics().recorder(), processor); + tags.add(Tag.of("cos.processor.id", processor.getSpec().getProcessorId())); + tags.add(Tag.of("cos.deployment.id", processor.getSpec().getDeploymentId())); + tags.add(Tag.of("cos.namespace", processor.getMetadata().getNamespace())); + +// String connectorResourceVersion = String.valueOf(processor.getSpec().getDeployment().getConnectorResourceVersion()); + + Gauge gauge = registry.find(config.metrics().baseName() + "." + CONNECTOR_STATE).tags(tags).gauge(); + + if (gauge != null) { + registry.remove(gauge); + } + + Gauge.builder(config.metrics().baseName() + "." + CONNECTOR_STATE, () -> new AtomicInteger(connectorState)) + .tags(tags) + .register(registry); + + // Adding deletion timestamp for metric housekeeping + if (CONNECTOR_STATE_DELETED == connectorState) { + LOGGER.info("Adding current timestamp to the deleted connector"); + tags.add(Tag.of("deletion_timestamp", Instant.now().toString())); + } + + Counter.builder(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT) + .tags(tags) + .tag("cos.connector.state", ProcessorDeploymentStatus.getPhase().getValue()) +// .tag("cos.connector.resourceversion", connectorResourceVersion) + .register(registry) + .increment(); + + if (CONNECTOR_STATE_FAILED == connectorState) { + Counter counter = registry.find(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT) + .tags(tags).tag("cos.connector.state", "ready") + .tagKeys("cos.connector.resourceversion").counter(); + + if (counter != null && counter.count() != 0) { + + // Exposing a new state "failed_but_ready" when a connector has already started but now failing + Counter.builder(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT) + .tags(tags) + .tag("cos.connector.state", "failed_but_ready") +// .tag("cos.connector.resourceversion", connectorResourceVersion) + .register(registry) + .increment(); + } + } + + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java index d843bbd6..fa5719bb 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java @@ -31,6 +31,8 @@ public class ResourcePoll implements Service { @Inject ConnectorDeploymentProvisioner connectorsProvisioner; @Inject + ProcessorDeploymentProvisioner processorDeploymentProvisioner; + @Inject ConnectorNamespaceProvisioner namespaceProvisioner; @Inject @@ -76,6 +78,7 @@ public void run() { private void sync() { namespaceProvisioner.poll(BEGINNING); connectorsProvisioner.poll(BEGINNING); + processorDeploymentProvisioner.poll(BEGINNING); } private void poll() { @@ -83,5 +86,7 @@ private void poll() { connectorClient.getMaxNamespaceResourceRevision()); connectorsProvisioner.poll( connectorClient.getMaxDeploymentResourceRevision()); + processorDeploymentProvisioner.poll( + connectorClient.getProcessorMaxDeploymentResourceRevision()); } } diff --git a/cos-fleetshard-sync/src/main/resources/application.properties b/cos-fleetshard-sync/src/main/resources/application.properties index 822a3e3f..3e9150ab 100644 --- a/cos-fleetshard-sync/src/main/resources/application.properties +++ b/cos-fleetshard-sync/src/main/resources/application.properties @@ -1,5 +1,5 @@ quarkus.banner.enabled = false -quarkus.log.level = INFO +quarkus.log.level = DEBUG quarkus.log.console.format = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c] (%t) %s%e%n quarkus.ssl.native = true diff --git a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml b/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml index 7807ef32..2f86155e 100644 --- a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml +++ b/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml @@ -3,3 +3,4 @@ resources: - ./managedconnectorclusters.cos.bf2.org-v1.yml - ./managedconnectoroperators.cos.bf2.org-v1.yml - ./managedconnectors.cos.bf2.org-v1.yml + - ./managedprocessors.cos.bf2.org-v1.yml