From c41bac7ae3826d0e686ea2fbf10e651e51dffcad Mon Sep 17 00:00:00 2001 From: Luca Molteni Date: Thu, 6 Apr 2023 17:32:22 +0200 Subject: [PATCH] Unit test for ProcessorProvisioner Integration test for ProcessorProvisioner --- .../cos/fleetshard/api/ManagedProcessor.java | 8 + .../support/resources/Resources.java | 2 +- .../fleetshard/support/resources/Secrets.java | 13 + .../cos/fleetshard/sync/it/SyncResource.java | 11 + .../sync/it/ProcessorProvisionerTest.java | 273 ++++++++++++++++ .../sync/it/support/SyncTestSupport.java | 34 ++ .../cos/fleetshard/sync/FleetShardSync.java | 5 + .../sync/client/FleetShardClient.java | 141 +++++++-- .../reapers/NamespacesReaper.java | 7 +- .../ProcessorDeploymentProvisioner.java | 75 ++++- .../resources/ProcessorStatusExtractor.java | 91 ++++++ .../sync/resources/ProcessorStatusSync.java | 147 +++++++++ .../resources/ProcessorStatusSyncJob.java | 18 ++ .../resources/ProcessorStatusUpdater.java | 48 +++ .../sync/resources/ResourcePoll.java | 5 + .../src/main/resources/application.properties | 2 +- .../sync/connector/ConnectorTestSupport.java | 11 +- .../sync/processor/ProcessorTestSupport.java | 275 ++++++++++++++++ .../resources/ConnectorProvisionerTest.java | 12 +- .../resources/ProcessorProvisionerTest.java | 293 ++++++++++++++++++ .../crds/kustomization.yaml | 1 + .../crds/processors.cos.bf2.org-v1.yml | 199 ------------ 22 files changed, 1420 insertions(+), 251 deletions(-) create mode 100644 cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java create mode 100644 cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusExtractor.java create mode 100644 cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java create mode 100644 cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSyncJob.java create mode 100644 cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java create mode 100644 cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/processor/ProcessorTestSupport.java create mode 100644 cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ProcessorProvisionerTest.java delete mode 100644 etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/processors.cos.bf2.org-v1.yml diff --git a/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java b/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java index a04737f2..8b70d1c5 100644 --- a/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java +++ b/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java @@ -7,7 +7,15 @@ import io.fabric8.kubernetes.model.annotation.Group; import io.fabric8.kubernetes.model.annotation.ShortNames; import io.fabric8.kubernetes.model.annotation.Version; +import io.sundr.builder.annotations.Buildable; +import io.sundr.builder.annotations.BuildableReference; +import lombok.EqualsAndHashCode; +import lombok.ToString; +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder", refs = @BuildableReference(CustomResource.class), + editableEnabled = false) @Version(ManagedConnector.VERSION) @Group(ManagedConnector.GROUP) @ShortNames("mpsr") diff --git a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java index f214d44d..35c77d40 100644 --- a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java +++ b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java @@ -55,7 +55,7 @@ public final class Resources { public static final String CONNECTOR_SECRET_DEPLOYMENT_SUFFIX = "-deploy"; public static final String CONNECTOR_CONFIGMAP_SUFFIX = "-configmap"; - public static final String PROCESSOR_PREFIX = "pcr-"; + public static final String PROCESSOR_PREFIX = "mpsr-"; public static final String LABEL_KCP_TARGET_CLUSTER_ID = "kcp.dev/cluster"; diff --git a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java index c5fdc056..5ab8dff4 100644 --- a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java +++ b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java @@ -130,6 +130,19 @@ public static String generateConnectorSecretId(String id) { return answer; } + public static String generateProcessorSecretId(String id) { + String answer = id; + + if (!answer.startsWith(Resources.PROCESSOR_PREFIX)) { + answer = Resources.PROCESSOR_PREFIX + answer; + } + if (!answer.endsWith(Resources.CONNECTOR_SECRET_DEPLOYMENT_SUFFIX)) { + answer += Resources.CONNECTOR_SECRET_DEPLOYMENT_SUFFIX; + } + + return answer; + } + public static String generateSecretId(String id) { String answer = id; diff --git a/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java b/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java index 3b37d3e1..daf65de8 100644 --- a/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java +++ b/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java @@ -10,6 +10,7 @@ import org.bf2.cos.fleetshard.sync.housekeeping.reapers.AddonReaper; import org.bf2.cos.fleetshard.sync.resources.ConnectorDeploymentProvisioner; import org.bf2.cos.fleetshard.sync.resources.ConnectorNamespaceProvisioner; +import org.bf2.cos.fleetshard.sync.resources.ProcessorDeploymentProvisioner; import org.bf2.cos.fleetshard.sync.resources.ResourcePoll; @ApplicationScoped @@ -20,6 +21,8 @@ public class SyncResource { @Inject ConnectorDeploymentProvisioner deploymentProvisioner; @Inject + ProcessorDeploymentProvisioner processorDeploymentProvisioner; + @Inject AddonReaper addonReaper; @Inject ResourcePoll resourceSync; @@ -38,12 +41,20 @@ public void pollConnectors(Long gv) { deploymentProvisioner.poll(gv); } + @Path("/provisioner/processors") + @POST + @Consumes(MediaType.TEXT_PLAIN) + public void pollProcessors(Long gv) { + processorDeploymentProvisioner.poll(gv); + } + @Path("/provisioner/all") @POST @Consumes(MediaType.TEXT_PLAIN) public void poll() { namespaceProvisioner.poll(0); deploymentProvisioner.poll(0); + processorDeploymentProvisioner.poll(0); } @Path("/provisioner/sync") diff --git a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java new file mode 100644 index 00000000..12bcd0f1 --- /dev/null +++ b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java @@ -0,0 +1,273 @@ +package org.bf2.cos.fleetshard.sync.it; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import javax.ws.rs.core.MediaType; + +import org.bf2.cos.fleet.manager.model.KafkaConnectionSettings; +import org.bf2.cos.fleet.manager.model.ProcessorDesiredState; +import org.bf2.cos.fleet.manager.model.ServiceAccount; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.resources.Namespaces; +import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.it.support.FleetManagerMockServer; +import org.bf2.cos.fleetshard.sync.it.support.FleetManagerTestInstance; +import org.bf2.cos.fleetshard.sync.it.support.SyncTestProfile; +import org.bf2.cos.fleetshard.sync.it.support.SyncTestSupport; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.http.ContentTypeHeader; +import com.github.tomakehurst.wiremock.http.RequestMethod; + +import io.fabric8.kubernetes.api.model.Namespace; +import io.fabric8.kubernetes.api.model.Secret; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static io.restassured.RestAssured.given; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; +import static org.assertj.core.api.Assertions.assertThat; +import static org.bf2.cos.fleetshard.support.resources.Resources.uid; +import static org.bf2.cos.fleetshard.support.resources.Secrets.SECRET_ENTRY_SERVICE_ACCOUNT; +import static org.bf2.cos.fleetshard.support.resources.Secrets.toBase64; + +@QuarkusTest +@TestProfile(ProcessorProvisionerTest.Profile.class) +public class ProcessorProvisionerTest extends SyncTestSupport { + public static final String DEPLOYMENT_ID = uid(); + public static final String KAFKA_URL = "kafka.acme.com:2181"; + public static final String KAFKA_CLIENT_ID = uid(); + public static final String KAFKA_CLIENT_SECRET = toBase64(uid()); + + @FleetManagerTestInstance + FleetManagerMockServer server; + + @ConfigProperty(name = "cos.cluster.id") + String clusterId; + + @Test + void processorIsProvisioned() { + { + given() + .contentType(MediaType.TEXT_PLAIN) + .body(0L) + .post("/test/provisioner/namespaces"); + + Namespace ns1 = until( + () -> fleetShardClient.getNamespace(clusterId), + Objects::nonNull); + } + + { + // + // Deployment v1 + // + + given() + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.TEXT_PLAIN) + .body(0L) + .post("/test/provisioner/processors"); + + Secret s1 = until( + () -> fleetShardClient.getProcessorSecret(clusterId, DEPLOYMENT_ID), + item -> Objects.equals( + "1", + item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION))); + + ManagedProcessor mp = until( + () -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID), + item -> { + return item.getSpec().getDeploymentResourceVersion() == 1L + && item.getSpec().getSecret() != null; + }); + + assertThat(s1).satisfies(item -> { + assertThat(item.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(mp.getSpec().getDeploymentId())); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT)) + .isObject() + .containsEntry("client_id", KAFKA_CLIENT_ID) + .containsEntry("client_secret", KAFKA_CLIENT_SECRET); + + }); + + assertThat(mp.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); + assertThat(mp.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); + assertThat(mp.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); + + } + { + // + // Deployment v2 + // + + given() + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.TEXT_PLAIN) + .body(1L) + .post("/test/provisioner/processors"); + + // Secret s1 = until( + // () -> fleetShardClient.getSecret(clusterId, DEPLOYMENT_ID), + // item -> Objects.equals( + // "2", + // item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION))); + + ManagedProcessor mc = until( + () -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID), + item -> { + return item.getSpec().getDeploymentResourceVersion() == 2L + && item.getSpec().getSecret() != null; + }); + + // assertThat(s1).satisfies(item -> { + // assertThat(item.getMetadata().getName()) + // .isEqualTo(Secrets.generateProcessorSecretId(mc.getSpec().getDeploymentId())); + // + // assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT)) + // .isObject() + // .containsEntry("client_id", KAFKA_CLIENT_ID) + // .containsEntry("client_secret", KAFKA_CLIENT_SECRET); + // + // assertThatJson(Secrets.extract(item, SECRET_ENTRY_PROCESSOR)) + // .inPath("processor") + // .isObject() + // .containsEntry("foo", "processor-bar"); + // assertThatJson(Secrets.extract(item, SECRET_ENTRY_PROCESSOR)) + // .inPath("kafka") + // .isObject() + // .containsEntry("topic", "kafka-bar"); + // + // }); + + assertThat(mc.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); + assertThat(mc.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); + // assertThat(mc.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); + } + } + + public static class Profile extends SyncTestProfile { + @Override + public Map getConfigOverrides() { + return Map.of( + "cos.cluster.id", getId(), + "test.namespace", Namespaces.generateNamespaceId(getId()), + "cos.namespace", Namespaces.generateNamespaceId(getId()), + "cos.resources.poll-interval", "disabled", + "cos.resources.resync-interval", "disabled", + "cos.resources.update-interval", "disabled", + "cos.metrics.recorder.tags.annotations[0]", "my.cos.bf2.org/processor-group", + "cos.metrics.recorder.tags.labels[0]", "cos.bf2.org/organization-id", + "cos.metrics.recorder.tags.labels[1]", "cos.bf2.org/pricing-tier"); + } + + @Override + public List testResources() { + return List.of( + new TestResourceEntry(FleetManagerTestResource.class)); + } + } + + public static class FleetManagerTestResource extends org.bf2.cos.fleetshard.sync.it.support.ControlPlaneTestResource { + @Override + protected void configure(FleetManagerMockServer server) { + final String clusterId = ConfigProvider.getConfig().getValue("cos.cluster.id", String.class); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/namespaces", + resp -> { + JsonNode body = namespaceList( + namespace(clusterId, clusterId)); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + + server.stubMatching( + RequestMethod.PUT, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments/.*/status", + () -> WireMock.ok()); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments", + req -> req.withQueryParam("gt_version", equalTo("0")), + resp -> { + JsonNode body = processorDeploymentList( + processorDeployment(DEPLOYMENT_ID, 1L, + depl -> { + depl.getMetadata().annotations(Map.of()); + }, + spec -> { + spec.namespaceId(clusterId); + spec.processorId("processor-1"); + spec.processorTypeId("processor-type-1"); + spec.processorResourceVersion(1L); + spec.kafka( + new KafkaConnectionSettings() + .url(KAFKA_URL)); + spec.serviceAccount( + new ServiceAccount() + .clientId(KAFKA_CLIENT_ID) + .clientSecret(KAFKA_CLIENT_SECRET)); + spec.shardMetadata(node(n -> { + n.withArray("operators").addObject() + .put("type", "camel-connector-operator") + .put("version", "[1.0.0,2.0.0)"); + })); + + spec.desiredState(ProcessorDesiredState.READY); + })); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments", + req -> req.withQueryParam("gt_version", equalTo("1")), + resp -> { + JsonNode body = processorDeploymentList( + processorDeployment(DEPLOYMENT_ID, 2L, + depl -> { + depl.getMetadata().annotations(Map.of()); + }, + spec -> { + spec.namespaceId(clusterId); + spec.processorId("processor-1"); + spec.processorTypeId("processor-type-1"); + spec.processorResourceVersion(1L); + spec.kafka( + new KafkaConnectionSettings() + .url(KAFKA_URL)); + spec.serviceAccount( + new ServiceAccount() + .clientId(KAFKA_CLIENT_ID) + .clientSecret(KAFKA_CLIENT_SECRET)); + spec.shardMetadata(node(n -> { + n.withArray("operators").addObject() + .put("type", "camel-connector-operator") + .put("version", "[1.0.0,2.0.0)"); + })); + spec.desiredState(ProcessorDesiredState.READY); + })); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + } + } +} diff --git a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java index 2fe22694..9ff5b9e4 100644 --- a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java +++ b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java @@ -24,6 +24,9 @@ import org.bf2.cos.fleet.manager.model.ConnectorNamespaceStatus; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceTenant; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceTenantKind; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentList; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentSpec; import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; import org.bf2.cos.fleetshard.sync.client.FleetShardClient; @@ -60,6 +63,19 @@ public static ObjectNode deploymentList(ConnectorDeployment... deployments) { return Serialization.jsonMapper().convertValue(items, ObjectNode.class); } + public static ObjectNode processorDeploymentList(ProcessorDeployment... deployments) { + var items = new ProcessorDeploymentList(); + items.page(1); + items.size(deployments.length); + items.total(deployments.length); + + for (ProcessorDeployment deployment : deployments) { + items.addItemsItem(deployment); + } + + return Serialization.jsonMapper().convertValue(items, ObjectNode.class); + } + public static ObjectNode namespaceList(ConnectorNamespaceDeployment... namespaces) { var items = new ConnectorNamespaceDeploymentList(); items.page(1); @@ -136,6 +152,24 @@ public static ConnectorDeployment deployment( return answer; } + public static ProcessorDeployment processorDeployment( + String name, + long revision, + Consumer deploymentConsumer, + Consumer deploymentSpecConsumer) { + + ProcessorDeployment answer = new ProcessorDeployment() + .kind("ProcessorDeployment") + .id(name) + .metadata(new ConnectorDeploymentAllOfMetadata().resourceVersion(revision)) + .spec(new ProcessorDeploymentSpec()); + + deploymentConsumer.accept(answer); + deploymentSpecConsumer.accept(answer.getSpec()); + + return answer; + } + public static JsonNode node(Consumer consumer) { ObjectNode answer = Serialization.jsonMapper().createObjectNode(); consumer.accept(answer); diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java index 664bf920..0779860c 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java @@ -11,6 +11,7 @@ import org.bf2.cos.fleetshard.sync.housekeeping.MetricsHousekeeper; import org.bf2.cos.fleetshard.sync.resources.ConnectorClusterStatusSync; import org.bf2.cos.fleetshard.sync.resources.ConnectorStatusSync; +import org.bf2.cos.fleetshard.sync.resources.ProcessorStatusSync; import org.bf2.cos.fleetshard.sync.resources.ResourcePoll; @ApplicationScoped @@ -26,6 +27,8 @@ public class FleetShardSync implements Service { @Inject ConnectorClusterStatusSync clusterStatusSync; @Inject + ProcessorStatusSync processorStatusSync; + @Inject Housekeeper housekeeping; @Inject MetricsHousekeeper metricsHousekeeping; @@ -55,12 +58,14 @@ public void startResourcesSync() throws Exception { resourceSync.start(); connectorStatusSync.start(); clusterStatusSync.start(); + processorStatusSync.start(); } public void stopResourcesSync() throws Exception { Resources.closeQuietly(resourceSync); Resources.closeQuietly(connectorStatusSync); Resources.closeQuietly(clusterStatusSync); + Resources.closeQuietly(processorStatusSync); } } diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java index 0b0a5ec1..d8015526 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java @@ -66,6 +66,10 @@ public void start() throws Exception { .inAnyNamespace() .withLabel(Resources.LABEL_CLUSTER_ID, getClusterId()) .inform(); + processorsInformer = kubernetesClient.resources(ManagedProcessor.class) + .inAnyNamespace() + .withLabel(Resources.LABEL_CLUSTER_ID, getClusterId()) + .inform(); operatorsInformer.stopped().whenComplete((unused, throwable) -> { if (throwable != null) { @@ -85,6 +89,12 @@ public void start() throws Exception { System.exit(-1); } }); + processorsInformer.stopped().whenComplete((unused, throwable) -> { + if (throwable != null) { + LOGGER.warn("Processor informer has stopped working, exiting", throwable); + System.exit(-1); + } + }); } @Override @@ -92,6 +102,7 @@ public void stop() throws Exception { Resources.closeQuietly(operatorsInformer); Resources.closeQuietly(namespaceInformers); Resources.closeQuietly(connectorsInformer); + Resources.closeQuietly(processorsInformer); } public String getClusterId() { @@ -109,6 +120,13 @@ public long getMaxDeploymentResourceRevision() { .orElse(0); } + public long getProcessorMaxDeploymentResourceRevision() { + return this.processorsInformer.getIndexer().list().stream() + .mapToLong(c -> c.getSpec().getDeploymentResourceVersion()) + .max() + .orElse(0); + } + public long getMaxNamespaceResourceRevision() { return this.namespaceInformers.getIndexer().list().stream() .mapToLong(c -> { @@ -198,7 +216,6 @@ public Optional getSecret(ConnectorDeployment deployment) { deployment.getId()); } - // TODO remove duplication here? public Optional getSecret(ProcessorDeployment deployment) { return getSecret( deployment.getSpec().getNamespaceId(), @@ -221,6 +238,14 @@ public Optional getSecret(String namespaceId, String deploymentId) { .get()); } + public Optional getProcessorSecret(String namespaceId, String deploymentId) { + return Optional.ofNullable( + kubernetesClient.secrets() + .inNamespace(generateNamespaceId(namespaceId)) + .withName(Secrets.generateProcessorSecretId(deploymentId)) + .get()); + } + // ************************************* // // Connectors @@ -235,17 +260,6 @@ public Boolean deleteConnector(ManagedConnector managedConnector) { .delete().isEmpty(); } - public Optional getProcessor(NamespacedName id) { - if (connectorsInformer == null) { - throw new IllegalStateException("Informer must be started before adding handlers"); - } - - final String key = Cache.namespaceKeyFunc(id.getNamespace(), id.getName()); - final ManagedProcessor val = processorsInformer.getIndexer().getByKey(key); - - return Optional.ofNullable(val); - } - public Optional getConnector(NamespacedName id) { if (connectorsInformer == null) { throw new IllegalStateException("Informer must be started before adding handlers"); @@ -263,12 +277,6 @@ public Optional getConnector(ConnectorDeployment deployment) { deployment.getId()); } - public Optional getProcessor(ProcessorDeployment deployment) { - return getProcessor( - deployment.getSpec().getNamespaceId(), - deployment.getId()); - } - public Optional getConnector(String namespaceId, String deploymentId) { if (connectorsInformer == null) { throw new IllegalStateException("Informer must be started before adding handlers"); @@ -280,17 +288,6 @@ public Optional getConnector(String namespaceId, String deploy return Optional.ofNullable(val); } - public Optional getProcessor(String namespaceId, String deploymentId) { - if (processorsInformer == null) { - throw new IllegalStateException("Informer must be started before adding handlers"); - } - - final String key = Cache.namespaceKeyFunc(generateNamespaceId(namespaceId), generateConnectorId(deploymentId)); - final ManagedProcessor val = processorsInformer.getIndexer().getByKey(key); - - return Optional.ofNullable(val); - } - public List getAllConnectors() { if (connectorsInformer == null) { throw new IllegalStateException("Informer must be started before adding handlers"); @@ -333,16 +330,94 @@ public ManagedConnector createConnector(ManagedConnector connector) { .createOrReplace(); } + public String generateConnectorId(String namespaceId) { + return Connectors.generateConnectorId(namespaceId); + } + + // ************************************* + // + // Processors + // + // ************************************* + + public Boolean deleteProcessor(ManagedProcessor managedProcessor) { + return !kubernetesClient.resources(ManagedProcessor.class) + .inNamespace(managedProcessor.getMetadata().getNamespace()) + .withName(managedProcessor.getMetadata().getName()) + .withPropagationPolicy(DeletionPropagation.FOREGROUND) + .delete().isEmpty(); + } + + public Optional getProcessor(NamespacedName id) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + final String key = Cache.namespaceKeyFunc(id.getNamespace(), id.getName()); + final ManagedProcessor val = processorsInformer.getIndexer().getByKey(key); + + return Optional.ofNullable(val); + } + + public Optional getProcessor(ProcessorDeployment deployment) { + return getProcessor( + deployment.getSpec().getNamespaceId(), + deployment.getId()); + } + + public Optional getProcessor(String namespaceId, String deploymentId) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + final String key = Cache.namespaceKeyFunc(generateNamespaceId(namespaceId), generateProcessorId(deploymentId)); + final ManagedProcessor val = processorsInformer.getIndexer().getByKey(key); + + return Optional.ofNullable(val); + } + + public List getAllProcessors() { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + return processorsInformer.getIndexer().list(); + } + + public List getProcessors(String namespace) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + return processorsInformer.getIndexer().byIndex(Cache.NAMESPACE_INDEX, namespace); + } + + public List getProcessors(Namespace namespace) { + return getProcessors(namespace.getMetadata().getName()); + } + + public void watchProcessors(Consumer handler) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + processorsInformer.addEventHandler(Informers.wrap(handler)); + } + + public void watchProcessors(ResourceEventHandler handler) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + processorsInformer.addEventHandler(handler); + } + public ManagedProcessor createProcessor(ManagedProcessor processor) { return kubernetesClient.resource(processor) .inNamespace(processor.getMetadata().getNamespace()) .createOrReplace(); } - public String generateConnectorId(String namespaceId) { - return Connectors.generateConnectorId(namespaceId); - } - public String generateProcessorId(String namespaceId) { return Processors.generateProcessorId(namespaceId); } diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java index 17c39911..2d4f7129 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java @@ -7,6 +7,7 @@ import javax.enterprise.context.ApplicationScoped; import org.bf2.cos.fleetshard.api.ManagedConnector; +import org.bf2.cos.fleetshard.api.ManagedProcessor; import org.bf2.cos.fleetshard.support.Service; import org.bf2.cos.fleetshard.support.resources.Namespaces; import org.bf2.cos.fleetshard.support.resources.Resources; @@ -73,14 +74,16 @@ private void doRun() { private void delete(Namespace ns) { Collection connectors = fleetShardClient.getConnectors(ns); + Collection processors = fleetShardClient.getProcessors(ns); try { - LOGGER.info("Deleting namespace: {} (id: {}, state: {}, expiration: {}, connectors {}", + LOGGER.info("Deleting namespace: {} (id: {}, state: {}, expiration: {}, connectors {}, processors {}", ns.getMetadata().getName(), Resources.getLabel(ns, Resources.LABEL_NAMESPACE_ID), Resources.getLabel(ns, Resources.LABEL_NAMESPACE_STATE), Resources.getAnnotation(ns, Resources.ANNOTATION_NAMESPACE_EXPIRATION), - connectors.size()); + connectors.size(), + processors.size()); fleetShardClient.deleteNamespace(ns); diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorDeploymentProvisioner.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorDeploymentProvisioner.java index 59825fb9..011bc779 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorDeploymentProvisioner.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorDeploymentProvisioner.java @@ -21,6 +21,7 @@ import org.bf2.cos.fleetshard.support.metrics.MetricsRecorder; import org.bf2.cos.fleetshard.support.resources.Processors; import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.resources.Secrets; import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; import org.bf2.cos.fleetshard.sync.client.FleetShardClient; @@ -32,6 +33,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Secret; import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_CLUSTER_ID; import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_ID; @@ -122,11 +124,14 @@ public void provision(ProcessorDeployment deployment) { uow); ManagedProcessor processor = createProcessor(uow, deployment, null); + final Secret secret = createProcessorSecret(uow, deployment, processor); - LOGGER.info("CreateOrReplace - uow: {}, processor: {}/{}", + LOGGER.info("CreateOrReplace - uow: {}, processor: {}/{}, secret: {}/{}", uow, processor.getMetadata().getNamespace(), - processor.getMetadata().getName()); + processor.getMetadata().getName(), + secret.getMetadata().getNamespace(), + secret.getMetadata().getName()); } private ManagedProcessor createProcessor(String uow, ProcessorDeployment deployment, HasMetadata owner) { @@ -211,7 +216,7 @@ private ManagedProcessor createProcessor(String uow, ProcessorDeployment deploym operatorSelector.getType()); } - if (config != null) { + if (config != null && config.processors() != null) { config.processors().labels().forEach((k, v) -> { Resources.setLabel(processor, k, v); }); @@ -239,7 +244,6 @@ private ManagedProcessor createProcessor(String uow, ProcessorDeployment deploym processor.getSpec().setDeploymentResourceVersion(deployment.getMetadata().getResourceVersion()); processor.getSpec().setDesiredState(deployment.getSpec().getDesiredState().getValue()); processor.getSpec().setProcessorTypeId(deployment.getSpec().getProcessorTypeId()); - processor.getSpec().setDeploymentResourceVersion(deployment.getSpec().getProcessorResourceVersion()); KafkaConnectionSettings kafkaConnectionSettings = deployment.getSpec().getKafka(); if (kafkaConnectionSettings != null) { @@ -249,6 +253,7 @@ private ManagedProcessor createProcessor(String uow, ProcessorDeployment deploym } processor.getSpec().setOperatorSelector(operatorSelector); + processor.getSpec().setSecret(Secrets.generateProcessorSecretId(deployment.getId())); copyMetadata(deployment, processor); @@ -265,7 +270,69 @@ private ManagedProcessor createProcessor(String uow, ProcessorDeployment deploym } } + private Secret createProcessorSecret(String uow, ProcessorDeployment deployment, ManagedProcessor owner) { + Secret secret = fleetShard.getSecret(deployment) + .orElseGet(() -> { + LOGGER.info( + "Secret not found (cluster_id: {}, namespace_id: {}, processor_id: {}, deployment_id: {}, resource_version: {}), creating a new one", + fleetShard.getClusterId(), + deployment.getSpec().getNamespaceId(), + deployment.getSpec().getProcessorId(), + deployment.getId(), + deployment.getMetadata().getResourceVersion()); + + Secret answer = new Secret(); + answer.setMetadata(new ObjectMeta()); + answer.getMetadata().setNamespace(fleetShard.generateNamespaceId(deployment.getSpec().getNamespaceId())); + answer.getMetadata().setName(Secrets.generateProcessorSecretId(deployment.getId())); + + Resources.setLabels( + answer, + LABEL_CLUSTER_ID, fleetShard.getClusterId(), + LABEL_PROCESSOR_ID, deployment.getSpec().getProcessorId(), + LABEL_DEPLOYMENT_ID, deployment.getId(), + LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + deployment.getMetadata().getResourceVersion()); + + return answer; + }); + + Resources.setOwnerReferences( + secret, + owner); + + // add resource version to label + Resources.setLabel( + secret, + LABEL_DEPLOYMENT_RESOURCE_VERSION, + "" + deployment.getMetadata().getResourceVersion()); + + // add uow + Resources.setLabel( + secret, + LABEL_UOW, + uow); + + // copy operator type + Resources.setLabel( + secret, + LABEL_OPERATOR_TYPE, + owner.getMetadata().getLabels().get(LABEL_OPERATOR_TYPE)); + + Secrets.set(secret, Secrets.SECRET_ENTRY_SERVICE_ACCOUNT, deployment.getSpec().getServiceAccount()); + Secrets.set(secret, Secrets.SECRET_ENTRY_META, deployment.getSpec().getShardMetadata()); + + copyMetadata(deployment, secret); + + try { + return fleetShard.createSecret(secret); + } catch (Exception e) { + LOGGER.warn("", e); + throw e; + } + } + // TODO remove duplication here + // Used for the billing model so it might be not necessary as well private void copyMetadata(ProcessorDeployment deployment, HasMetadata target) { if (deployment.getMetadata() != null && deployment.getMetadata().getAnnotations() != null) { config.metrics().recorder().tags().labels() diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusExtractor.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusExtractor.java new file mode 100644 index 00000000..cbe71ee5 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusExtractor.java @@ -0,0 +1,91 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import org.bf2.cos.fleet.manager.model.ConnectorDeploymentStatusOperators; +import org.bf2.cos.fleet.manager.model.ConnectorOperator; +import org.bf2.cos.fleet.manager.model.MetaV1Condition; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; +import org.bf2.cos.fleet.manager.model.ProcessorState; +import org.bf2.cos.fleetshard.api.Conditions; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.api.Operator; + +import io.fabric8.kubernetes.api.model.Condition; + +import static org.bf2.cos.fleetshard.api.ManagedConnector.DESIRED_STATE_DELETED; +import static org.bf2.cos.fleetshard.api.ManagedConnector.DESIRED_STATE_STOPPED; +import static org.bf2.cos.fleetshard.api.ManagedConnector.DESIRED_STATE_UNASSIGNED; + +public class ProcessorStatusExtractor { + public static ProcessorDeploymentStatus extract(ManagedProcessor processor) { + ProcessorDeploymentStatus status = new ProcessorDeploymentStatus(); + + status.setResourceVersion(processor.getSpec().getDeploymentResourceVersion()); + + if (processor.getSpec().getOperatorSelector() == null || processor.getSpec().getOperatorSelector().getId() == null) { + status.setPhase(ProcessorState.FAILED); + status.addConditionsItem(new MetaV1Condition() + .type(Conditions.TYPE_READY) + .status(Conditions.STATUS_FALSE) + .message("No assignable operator") + .reason(Conditions.NO_ASSIGNABLE_OPERATOR_REASON) + .lastTransitionTime(Conditions.now())); + + return status; + } + + if (processor.getStatus() != null && processor.getStatus().getProcessorStatus() != null) { + status.setOperators( + new ConnectorDeploymentStatusOperators() + .assigned( + toConnectorOperator(processor.getStatus().getProcessorStatus().getAssignedOperator())) + .available( + toConnectorOperator(processor.getStatus().getProcessorStatus().getAvailableOperator()))); + + if (processor.getStatus().getProcessorStatus() != null) { + if (processor.getStatus().getProcessorStatus().getPhase() != null) { + status.setPhase(ProcessorState.fromValue( + processor.getStatus().getProcessorStatus().getPhase())); + } + if (processor.getStatus().getProcessorStatus().getConditions() != null) { + for (var cond : processor.getStatus().getProcessorStatus().getConditions()) { + status.addConditionsItem(toMetaV1Condition(cond)); + } + } + } + } + + if (status.getPhase() == null) { + status.setPhase(ProcessorState.PROVISIONING); + + if (DESIRED_STATE_DELETED.equals(processor.getSpec().getDesiredState())) { + status.setPhase(ProcessorState.DEPROVISIONING); + } else if (DESIRED_STATE_STOPPED.equals(processor.getSpec().getDesiredState())) { + status.setPhase(ProcessorState.DEPROVISIONING); + } else if (DESIRED_STATE_UNASSIGNED.equals(processor.getSpec().getDesiredState())) { + status.setPhase(ProcessorState.DEPROVISIONING); + } + } + + return status; + } + + public static ConnectorOperator toConnectorOperator(Operator operator) { + if (operator == null) { + return null; + } + + return new ConnectorOperator() + .id(operator.getId()) + .type(operator.getType()) + .version(operator.getVersion()); + } + + public static MetaV1Condition toMetaV1Condition(Condition condition) { + return new MetaV1Condition() + .type(condition.getType()) + .status(condition.getStatus()) + .message(condition.getMessage()) + .reason(condition.getReason()) + .lastTransitionTime(condition.getLastTransitionTime()); + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java new file mode 100644 index 00000000..75b43a93 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java @@ -0,0 +1,147 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.Temporal; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.Service; +import org.bf2.cos.fleetshard.support.metrics.StaticMetricsRecorder; +import org.bf2.cos.fleetshard.support.resources.NamespacedName; +import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; +import org.bf2.cos.fleetshard.sync.FleetShardSyncScheduler; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.bf2.cos.fleetshard.sync.metrics.MetricsID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.micrometer.core.instrument.Counter; + +@ApplicationScoped +public class ProcessorStatusSync implements Service { + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusSync.class); + + public static final String JOB_ID = "cos.processors.status.sync"; + public static final String METRICS_SYNC = "processors.status.sync"; + public static final String METRICS_UPDATE = "processors.status.update"; + + @Inject + ProcessorStatusUpdater updater; + @Inject + FleetShardClient connectorClient; + @Inject + FleetShardSyncConfig config; + @Inject + FleetShardSyncScheduler scheduler; + + @Inject + @MetricsID(METRICS_SYNC) + StaticMetricsRecorder syncRecorder; + @Inject + @MetricsID(METRICS_SYNC + ".total") + Counter syncTotalRecorder; + + @Inject + @MetricsID(METRICS_UPDATE) + StaticMetricsRecorder updateRecorder; + @Inject + @MetricsID(METRICS_UPDATE + ".total") + Counter updateTotalRecorder; + + private volatile Instant lastResync; + private volatile Instant lastUpdate; + + private final ConcurrentMap processors = new ConcurrentHashMap<>(); + + @Override + public void start() throws Exception { + LOGGER.info("Starting processor status sync"); + + connectorClient.watchProcessors(new ResourceEventHandler<>() { + @Override + public void onAdd(ManagedProcessor connector) { + processors.put(NamespacedName.of(connector), Instant.now()); + } + + @Override + public void onUpdate(ManagedProcessor ignored, ManagedProcessor connector) { + processors.put(NamespacedName.of(connector), Instant.now()); + } + + @Override + public void onDelete(ManagedProcessor connector, boolean deletedFinalStateUnknown) { + processors.remove(NamespacedName.of(connector)); + } + }); + + scheduler.schedule( + JOB_ID, + ConnectorStatusSyncJob.class, + config.resources().updateInterval()); + } + + @Override + public void stop() { + scheduler.shutdownQuietly(JOB_ID); + } + + public void run() { + final Duration resyncInterval = config.resources().resyncInterval(); + final Instant now = Instant.now(); + final boolean resync = lastResync == null || greater(lastResync, now, resyncInterval); + + if (resync) { + syncRecorder.record(this::sync); + lastResync = now; + } else { + updateRecorder.record(this::update); + } + + lastUpdate = now; + } + + private void sync() { + int count = 0; + + try { + for (ManagedProcessor processor : connectorClient.getAllProcessors()) { + updater.update(processor); + + count++; + } + } finally { + if (count > 0) { + syncTotalRecorder.increment(count); + } + } + } + + private void update() { + int count = 0; + + try { + for (Map.Entry entry : processors.entrySet()) { + if (entry.getValue().isAfter(lastUpdate)) { + connectorClient.getProcessor(entry.getKey()).ifPresent(updater::update); + + count++; + } + } + } finally { + if (count > 0) { + updateTotalRecorder.increment(count); + } + } + } + + private static boolean greater(Temporal startInclusive, Temporal endExclusive, Duration interval) { + return Duration.between(startInclusive, endExclusive).compareTo(interval) >= 0; + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSyncJob.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSyncJob.java new file mode 100644 index 00000000..25687827 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSyncJob.java @@ -0,0 +1,18 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import javax.inject.Inject; + +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; + +@DisallowConcurrentExecution +public class ProcessorStatusSyncJob implements Job { + @Inject + ProcessorStatusSync sync; + + @Override + public void execute(JobExecutionContext context) { + sync.run(); + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java new file mode 100644 index 00000000..4f1ceebd --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java @@ -0,0 +1,48 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClientException; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class ProcessorStatusUpdater { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusUpdater.class); + @Inject + FleetManagerClient fleetManagerClient; + @Inject + FleetShardClient processorClient; + + public void update(ManagedProcessor processor) { + LOGGER.debug("Update processor status (name: {}, phase: {})", + processor.getMetadata().getName(), + processor.getStatus().getPhase()); + + try { + ProcessorDeploymentStatus processorDeploymentStatus = ProcessorStatusExtractor.extract(processor); + + fleetManagerClient.updateProcessorStatus(processor.getSpec().getClusterId(), + processor.getSpec().getDeploymentId(), + processorDeploymentStatus); + + } catch (FleetManagerClientException e) { + if (e.getStatusCode() == 410) { + LOGGER.info("Processor " + processor.getMetadata().getName() + " does not exists anymore, deleting it"); + if (processorClient.deleteProcessor(processor)) { + LOGGER.info("Processor " + processor.getMetadata().getName() + " deleted"); + } + } else { + LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e); + } + } catch (Exception e) { + LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e); + } + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java index d843bbd6..fa5719bb 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java @@ -31,6 +31,8 @@ public class ResourcePoll implements Service { @Inject ConnectorDeploymentProvisioner connectorsProvisioner; @Inject + ProcessorDeploymentProvisioner processorDeploymentProvisioner; + @Inject ConnectorNamespaceProvisioner namespaceProvisioner; @Inject @@ -76,6 +78,7 @@ public void run() { private void sync() { namespaceProvisioner.poll(BEGINNING); connectorsProvisioner.poll(BEGINNING); + processorDeploymentProvisioner.poll(BEGINNING); } private void poll() { @@ -83,5 +86,7 @@ private void poll() { connectorClient.getMaxNamespaceResourceRevision()); connectorsProvisioner.poll( connectorClient.getMaxDeploymentResourceRevision()); + processorDeploymentProvisioner.poll( + connectorClient.getProcessorMaxDeploymentResourceRevision()); } } diff --git a/cos-fleetshard-sync/src/main/resources/application.properties b/cos-fleetshard-sync/src/main/resources/application.properties index 822a3e3f..3e9150ab 100644 --- a/cos-fleetshard-sync/src/main/resources/application.properties +++ b/cos-fleetshard-sync/src/main/resources/application.properties @@ -1,5 +1,5 @@ quarkus.banner.enabled = false -quarkus.log.level = INFO +quarkus.log.level = DEBUG quarkus.log.console.format = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c] (%t) %s%e%n quarkus.ssl.native = true diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java index 5b1977e7..e0581dbb 100644 --- a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java @@ -86,8 +86,8 @@ public static ManagedConnectorCluster createCluster() { .build(); } - public static ConnectorDeployment createDeployment(long deploymentRevision) { - return createDeployment( + public static ConnectorDeployment createConnectorDeployment(long deploymentRevision) { + return createConnectorDeployment( deploymentRevision, () -> { ObjectNode answer = Serialization.jsonMapper().createObjectNode(); @@ -110,13 +110,14 @@ public static ConnectorDeployment createDeployment(long deploymentRevision) { }); } - public static ConnectorDeployment createDeployment(long deploymentRevision, Consumer customizer) { - ConnectorDeployment answer = createDeployment(deploymentRevision); + public static ConnectorDeployment createConnectorDeployment(long deploymentRevision, + Consumer customizer) { + ConnectorDeployment answer = createConnectorDeployment(deploymentRevision); customizer.accept(answer); return answer; } - public static ConnectorDeployment createDeployment( + public static ConnectorDeployment createConnectorDeployment( long deploymentRevision, Supplier connectorSpec, Supplier connectorMeta) { diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/processor/ProcessorTestSupport.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/processor/ProcessorTestSupport.java new file mode 100644 index 00000000..091b1b95 --- /dev/null +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/processor/ProcessorTestSupport.java @@ -0,0 +1,275 @@ +package org.bf2.cos.fleetshard.sync.processor; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.bf2.cos.fleet.manager.model.ConnectorDeploymentAllOfMetadata; +import org.bf2.cos.fleet.manager.model.KafkaConnectionSettings; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentSpec; +import org.bf2.cos.fleet.manager.model.ProcessorDesiredState; +import org.bf2.cos.fleet.manager.model.ServiceAccount; +import org.bf2.cos.fleetshard.api.ManagedConnectorCluster; +import org.bf2.cos.fleetshard.api.ManagedConnectorClusterBuilder; +import org.bf2.cos.fleetshard.api.ManagedConnectorClusterSpecBuilder; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.resources.Clusters; +import org.bf2.cos.fleetshard.support.resources.Processors; +import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.mockito.Mockito; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.client.utils.Serialization; + +import static org.bf2.cos.fleetshard.support.resources.Resources.uid; +import static org.bf2.cos.fleetshard.support.resources.Secrets.toBase64; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +public final class ProcessorTestSupport { + + private ProcessorTestSupport() { + } + + public static Optional lookupProcessor( + Collection processors, + String clusterId, + ProcessorDeployment deployment) { + + return processors.stream().filter( + entry -> { + return Objects.equals(Processors.generateProcessorId(deployment.getId()), entry.getMetadata().getName()); + }).findFirst(); + } + + public static Optional lookupSecret( + Collection secrets, + String clusterId, + ProcessorDeployment deployment) { + + return secrets.stream().filter( + entry -> { + return Objects.equals(Secrets.generateProcessorSecretId(deployment.getId()), entry.getMetadata().getName()); + }).findFirst(); + } + + public static ManagedConnectorCluster createCluster() { + final String clusterId = uid(); + + return new ManagedConnectorClusterBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Clusters.CONNECTOR_CLUSTER_PREFIX + "-" + clusterId) + .addToLabels(Resources.LABEL_CLUSTER_ID, clusterId) + .build()) + .withSpec(new ManagedConnectorClusterSpecBuilder() + .withClusterId(clusterId) + .build()) + .build(); + } + + public static ProcessorDeployment createProcessorDeployment(long deploymentRevision) { + return createProcessorDeployment( + deploymentRevision, + () -> { + CamelConnectorMeta answer = new CamelConnectorMeta( + "sink", + "quay.io/mcs_dev/aws-s3-sink:0.0.1", + CamelConnectorOperator.of( + "camel-connector-operator", + "[1.0.0,2.0.0)"), + Map.of( + "kafka", "managed-kafka-source")); + + return Serialization.jsonMapper().convertValue(answer, JsonNode.class); + }); + } + + public static ProcessorDeployment createProcessorDeployment(long deploymentRevision, + Consumer customizer) { + ProcessorDeployment answer = createProcessorDeployment(deploymentRevision); + customizer.accept(answer); + return answer; + } + + public static ProcessorDeployment createProcessorDeployment( + long deploymentRevision, + Supplier connectorMeta) { + + final String namespaceId = "nid"; + final String deploymentId = "did"; + final String processorId = "pid"; + + return new ProcessorDeployment() + .kind("ProcessorDeployment") + .id(deploymentId) + .metadata(new ConnectorDeploymentAllOfMetadata() + .resourceVersion(deploymentRevision)) + .spec(new ProcessorDeploymentSpec() + .namespaceId(namespaceId) + .processorId(processorId) + .processorResourceVersion(1L) + .kafka(new KafkaConnectionSettings() + .url("kafka.acme.com:2181")) + .serviceAccount(new ServiceAccount() + .clientId(UUID.randomUUID().toString()) + .clientSecret(toBase64(UUID.randomUUID().toString()))) + .shardMetadata(connectorMeta.get()) + .desiredState(ProcessorDesiredState.READY)); + } + + public static FleetShardClient fleetShard( + String clusterId, + Collection processors, + Collection secrets) { + + Map allProcessors = processors.stream() + .collect(Collectors.toMap(e -> e.getMetadata().getName(), Function.identity())); + Map allSecrets = secrets.stream() + .collect(Collectors.toMap(e -> e.getMetadata().getName(), Function.identity())); + + FleetShardClient answer = Mockito.mock(FleetShardClient.class); + + when(answer.getClusterId()) + .thenAnswer(invocation -> clusterId); + + when(answer.getProcessor(any(ProcessorDeployment.class))) + .thenAnswer(invocation -> { + return lookupProcessor(allProcessors.values(), clusterId, invocation.getArgument(0)); + }); + + when(answer.getSecret(any(ProcessorDeployment.class))) + .thenAnswer(invocation -> { + return lookupSecret(allSecrets.values(), clusterId, invocation.getArgument(0)); + }); + + when(answer.createProcessor(any(ManagedProcessor.class))) + .thenAnswer(invocation -> { + var arg = invocation.getArgument(0, ManagedProcessor.class); + allProcessors.put(arg.getMetadata().getName(), arg); + return arg; + }); + when(answer.createSecret(any(Secret.class))) + .thenAnswer(invocation -> { + var arg = invocation.getArgument(0, Secret.class); + allSecrets.put(arg.getMetadata().getName(), arg); + return arg; + }); + + when(answer.getOrCreateManagedConnectorCluster()) + .thenAnswer(invocation -> { + return new ManagedConnectorClusterBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Clusters.CONNECTOR_CLUSTER_PREFIX + "-" + clusterId) + .addToLabels(Resources.LABEL_CLUSTER_ID, clusterId) + .build()) + .withSpec(new ManagedConnectorClusterSpecBuilder() + .withClusterId(clusterId) + .build()) + .build(); + }); + + return answer; + } + + public static FleetManagerClient fleetManagerClient() { + FleetManagerClient answer = Mockito.mock(FleetManagerClient.class); + return answer; + } + + public static FleetShardSyncConfig config() { + FleetShardSyncConfig answer = Mockito.mock(FleetShardSyncConfig.class); + when(answer.processors()).thenAnswer(invocation -> { + var processors = Mockito.mock(FleetShardSyncConfig.Processors.class); + when(processors.annotations()).thenReturn(Collections.emptyMap()); + when(processors.labels()).thenReturn(Collections.emptyMap()); + return processors; + }); + when(answer.imagePullSecretsName()).thenAnswer(invocation -> { + return "foo"; + }); + when(answer.namespace()).thenAnswer(invocation -> { + return "bar"; + }); + when(answer.metrics()).thenAnswer(invocation -> { + var metrics = Mockito.mock(FleetShardSyncConfig.Metrics.class); + when(metrics.baseName()).thenReturn("base"); + return metrics; + }); + when(answer.quota()).thenAnswer(invocation -> { + return Mockito.mock(FleetShardSyncConfig.Quota.class); + }); + when(answer.quota().defaultLimits()).thenAnswer(invocation -> { + return Mockito.mock(FleetShardSyncConfig.DefaultLimits.class); + }); + when(answer.quota().defaultRequest()).thenAnswer(invocation -> { + return Mockito.mock(FleetShardSyncConfig.DefaultRequest.class); + }); + + return answer; + } + + public static class CamelConnectorMeta { + + @JsonProperty("connector_type") + String connectorType; + + @JsonProperty("connector_image") + String connectorImage; + + @JsonProperty("operators") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List operators; + + @JsonProperty("kamelets") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + Map kamelets; + + public CamelConnectorMeta( + String connectorType, + String connectorImage, + CamelConnectorOperator operator, + Map kamelets) { + + this.connectorType = connectorType; + this.connectorImage = connectorImage; + this.operators = List.of(operator); + this.kamelets = kamelets; + } + } + + public static class CamelConnectorOperator { + + @JsonProperty("type") + String type; + + @JsonProperty("version") + String version; + + public CamelConnectorOperator(String type, String version) { + this.type = type; + this.version = version; + } + + public static CamelConnectorOperator of(String type, String version) { + return new CamelConnectorOperator(type, version); + } + } +} diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java index 9f2df8ad..db9bbd08 100644 --- a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java @@ -30,7 +30,7 @@ import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_ID; import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION; import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_UOW; -import static org.bf2.cos.fleetshard.sync.connector.ConnectorTestSupport.createDeployment; +import static org.bf2.cos.fleetshard.sync.connector.ConnectorTestSupport.createConnectorDeployment; import static org.mockito.Mockito.verify; public class ConnectorProvisionerTest { @@ -41,7 +41,7 @@ void createResources() { // // Given that no resources associated to the provided deployment exist // - final ConnectorDeployment deployment = createDeployment(0); + final ConnectorDeployment deployment = ConnectorTestSupport.createConnectorDeployment(0); final List connectors = List.of(); final List secrets = List.of(); @@ -132,7 +132,7 @@ void updateResources() { // // Given that the resources associated to the provided deployment exist // - final ConnectorDeployment oldDeployment = createDeployment(0); + final ConnectorDeployment oldDeployment = ConnectorTestSupport.createConnectorDeployment(0); final List connectors = List.of( new ManagedConnectorBuilder() @@ -168,7 +168,7 @@ void updateResources() { // // When deployment is updated // - final ConnectorDeployment newDeployment = createDeployment(0, d -> { + final ConnectorDeployment newDeployment = ConnectorTestSupport.createConnectorDeployment(0, d -> { d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); ((ObjectNode) d.getSpec().getConnectorSpec()).withObject("/connector").put("foo", "connector-baz"); ((ObjectNode) d.getSpec().getShardMetadata()).put("connector_image", "quay.io/mcs_dev/aws-s3-sink:0.1.0"); @@ -251,7 +251,7 @@ void updateAndCreateResources() { // // Given that the resources associated to the provided deployment exist // - final ConnectorDeployment oldDeployment = createDeployment(0); + final ConnectorDeployment oldDeployment = ConnectorTestSupport.createConnectorDeployment(0); final List connectors = List.of( new ManagedConnectorBuilder() @@ -287,7 +287,7 @@ void updateAndCreateResources() { // // When a change to the deployment happen that ends up with a new resource version // - final ConnectorDeployment newDeployment = createDeployment(0, d -> { + final ConnectorDeployment newDeployment = ConnectorTestSupport.createConnectorDeployment(0, d -> { d.getMetadata().setResourceVersion(1L); d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); ((ObjectNode) d.getSpec().getConnectorSpec()).withObject("/connector").put("foo", "connector-baz"); diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ProcessorProvisionerTest.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ProcessorProvisionerTest.java new file mode 100644 index 00000000..657e6347 --- /dev/null +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ProcessorProvisionerTest.java @@ -0,0 +1,293 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import java.util.List; +import java.util.UUID; + +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ServiceAccount; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.api.ManagedProcessorBuilder; +import org.bf2.cos.fleetshard.api.ManagedProcessorSpec; +import org.bf2.cos.fleetshard.support.client.EventClient; +import org.bf2.cos.fleetshard.support.metrics.MetricsRecorder; +import org.bf2.cos.fleetshard.support.resources.Processors; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.processor.ProcessorTestSupport; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.api.model.SecretBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_CLUSTER_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_PROCESSOR_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_UOW; +import static org.bf2.cos.fleetshard.sync.processor.ProcessorTestSupport.createProcessorDeployment; +import static org.mockito.Mockito.verify; + +public class ProcessorProvisionerTest { + private static final String CLUSTER_ID = UUID.randomUUID().toString(); + + @Test + void createResources() { + // + // Given that no resources associated to the provided deployment exist + // + final ProcessorDeployment deployment = createProcessorDeployment(0); + + final List processors = List.of(); + final List secrets = List.of(); + + final ArgumentCaptor sc = ArgumentCaptor.forClass(Secret.class); + final ArgumentCaptor mcc = ArgumentCaptor.forClass(ManagedProcessor.class); + + final ProcessorDeploymentProvisioner provisioner = new ProcessorDeploymentProvisioner(); + provisioner.config = ProcessorTestSupport.config(); + + provisioner.fleetShard = ProcessorTestSupport.fleetShard(CLUSTER_ID, processors, secrets); + provisioner.fleetManager = ProcessorTestSupport.fleetManagerClient(); + provisioner.eventClient = Mockito.mock(EventClient.class); + provisioner.recorder = Mockito.mock(MetricsRecorder.class); + + // + // When deployment is applied + // + provisioner.provision(deployment); + + verify(provisioner.fleetShard).createSecret(sc.capture()); + verify(provisioner.fleetShard).createProcessor(mcc.capture()); + + // + // Then resources must be created according to the deployment + // + assertThat(sc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(deployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, deployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, deployment.getId()) + .containsEntry(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + deployment.getMetadata().getResourceVersion()) + .containsKey(LABEL_UOW); + + }); + + assertThat(mcc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Processors.generateProcessorId(deployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, deployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, deployment.getId()) + .containsKey(LABEL_UOW); + + assertThat(val.getSpec()).satisfies(d -> { + assertThat(d.getSecret()).isEqualTo(sc.getValue().getMetadata().getName()); + assertThat(d.getKafka().getUrl()) + .isNotEmpty() + .isEqualTo(deployment.getSpec().getKafka().getUrl()); + }); + }); + } + + @Test + void updateResources() { + // + // Given that the resources associated to the provided deployment exist + // + final ProcessorDeployment oldDeployment = ProcessorTestSupport.createProcessorDeployment(0); + + final List processors = List.of( + new ManagedProcessorBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Processors.generateProcessorId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .build()) + .withSpec(new ManagedProcessorSpec()) + .build()); + final List secrets = List.of( + new SecretBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Secrets.generateProcessorSecretId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .addToLabels(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + oldDeployment.getMetadata().getResourceVersion()) + .build()) + .build()); + + final ProcessorDeploymentProvisioner provisioner = new ProcessorDeploymentProvisioner(); + provisioner.config = ProcessorTestSupport.config(); + provisioner.fleetShard = ProcessorTestSupport.fleetShard(CLUSTER_ID, processors, secrets); + provisioner.fleetManager = ProcessorTestSupport.fleetManagerClient(); + provisioner.eventClient = Mockito.mock(EventClient.class); + provisioner.recorder = Mockito.mock(MetricsRecorder.class); + + final ArgumentCaptor sc = ArgumentCaptor.forClass(Secret.class); + final ArgumentCaptor mcc = ArgumentCaptor.forClass(ManagedProcessor.class); + + // + // When deployment is updated + // + final ProcessorDeployment newDeployment = ProcessorTestSupport.createProcessorDeployment(0, d -> { + d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); + // TODO what is this in processsors? + // ((ObjectNode) d.getSpec()).withObject("/connector").put("foo", "connector-baz"); + }); + + provisioner.provision(newDeployment); + + verify(provisioner.fleetShard).createSecret(sc.capture()); + verify(provisioner.fleetShard).createProcessor(mcc.capture()); + + // + // Then the existing resources must be updated to reflect the changes made to the + // deployment. This scenario could happen when a resource on the connector cluster + // is amended outside the control of fleet manager (i.e. with kubectl) and in such + // case, the expected behavior is that the resource is re-set to the configuration + // from the fleet manager. + // + assertThat(sc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, newDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, newDeployment.getId()) + .containsEntry(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + newDeployment.getMetadata().getResourceVersion()) + .containsKey(LABEL_UOW); + + var serviceAccountNode = Secrets.extract(val, Secrets.SECRET_ENTRY_SERVICE_ACCOUNT, ServiceAccount.class); + assertThat(serviceAccountNode.getClientSecret()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientSecret()); + assertThat(serviceAccountNode.getClientId()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientId()); + }); + + assertThat(mcc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Processors.generateProcessorId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .containsKey(LABEL_UOW); + + assertThat(val.getSpec()).satisfies(d -> { + assertThat(d.getDeploymentResourceVersion()).isEqualTo(oldDeployment.getMetadata().getResourceVersion()); + assertThat(d.getDeploymentResourceVersion()).isEqualTo(newDeployment.getMetadata().getResourceVersion()); + assertThat(d.getSecret()).isEqualTo(sc.getValue().getMetadata().getName()); + assertThat(d.getKafka().getUrl()) + .isNotEmpty() + .isEqualTo(newDeployment.getSpec().getKafka().getUrl()); + }); + }); + } + + @Test + void updateAndCreateResources() { + // + // Given that the resources associated to the provided deployment exist + // + final ProcessorDeployment oldDeployment = ProcessorTestSupport.createProcessorDeployment(0); + + final List processors = List.of( + new ManagedProcessorBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Processors.generateProcessorId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .build()) + .withSpec(new ManagedProcessorSpec()) + .build()); + final List secrets = List.of( + new SecretBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Secrets.generateProcessorSecretId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .addToLabels(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + oldDeployment.getMetadata().getResourceVersion()) + .build()) + .build()); + + final ProcessorDeploymentProvisioner provisioner = new ProcessorDeploymentProvisioner(); + provisioner.config = ProcessorTestSupport.config(); + provisioner.fleetShard = ProcessorTestSupport.fleetShard(CLUSTER_ID, processors, secrets); + provisioner.fleetManager = ProcessorTestSupport.fleetManagerClient(); + provisioner.eventClient = Mockito.mock(EventClient.class); + provisioner.recorder = Mockito.mock(MetricsRecorder.class); + + final ArgumentCaptor sc = ArgumentCaptor.forClass(Secret.class); + final ArgumentCaptor processorArgumentCaptor = ArgumentCaptor.forClass(ManagedProcessor.class); + + // + // When a change to the deployment happen that ends up with a new resource version + // + final ProcessorDeployment newDeployment = ProcessorTestSupport.createProcessorDeployment(0, d -> { + d.getMetadata().setResourceVersion(1L); + d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); + // TODO what is this in processors? + // ((ObjectNode) d.getSpec()).withObject("/connector").put("foo", "connector-baz"); + }); + + provisioner.provision(newDeployment); + + verify(provisioner.fleetShard).createSecret(sc.capture()); + verify(provisioner.fleetShard).createProcessor(processorArgumentCaptor.capture()); + + // + // Then the managed connector resource is expected to be updated to reflect the + // changes made to the deployment + // + assertThat(sc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, newDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, newDeployment.getId()) + .containsEntry(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + newDeployment.getMetadata().getResourceVersion()) + .containsKey(LABEL_UOW); + + var serviceAccountNode = Secrets.extract(val, Secrets.SECRET_ENTRY_SERVICE_ACCOUNT, ServiceAccount.class); + assertThat(serviceAccountNode.getClientSecret()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientSecret()); + assertThat(serviceAccountNode.getClientId()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientId()); + }); + + assertThat(processorArgumentCaptor.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Processors.generateProcessorId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .containsKey(LABEL_UOW); + + assertThat(val.getSpec()).satisfies(d -> { + assertThat(d.getDeploymentResourceVersion()).isEqualTo(newDeployment.getMetadata().getResourceVersion()); + assertThat(d.getDeploymentResourceVersion()).isNotEqualTo(oldDeployment.getMetadata().getResourceVersion()); + assertThat(d.getSecret()).isEqualTo(sc.getValue().getMetadata().getName()); + assertThat(d.getKafka().getUrl()) + .isNotEmpty() + .isEqualTo(newDeployment.getSpec().getKafka().getUrl()); + }); + }); + } +} diff --git a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml b/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml index 7807ef32..2f86155e 100644 --- a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml +++ b/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml @@ -3,3 +3,4 @@ resources: - ./managedconnectorclusters.cos.bf2.org-v1.yml - ./managedconnectoroperators.cos.bf2.org-v1.yml - ./managedconnectors.cos.bf2.org-v1.yml + - ./managedprocessors.cos.bf2.org-v1.yml diff --git a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/processors.cos.bf2.org-v1.yml b/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/processors.cos.bf2.org-v1.yml deleted file mode 100644 index 344b5e52..00000000 --- a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/processors.cos.bf2.org-v1.yml +++ /dev/null @@ -1,199 +0,0 @@ -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - name: processors.cos.bf2.org -spec: - group: cos.bf2.org - names: - kind: Processor - plural: processors - shortNames: - - prc - singular: processor - scope: Namespaced - versions: - - additionalPrinterColumns: - - jsonPath: .spec.clusterId - name: CLUSTER_ID - type: string - - jsonPath: .spec.deploymentId - name: DEPLOYMENT_ID - type: string - - jsonPath: .status.phase - name: PHASE - type: string - - jsonPath: .spec.processorId - name: PROCESSOR_ID - type: string - - jsonPath: .spec.deployment.processorTypeId - name: PROCESSOR_TYPE_ID - type: string - - jsonPath: .status.deployment.processorTypeId - name: PROCESSOR_TYPE_ID - type: string - - jsonPath: .status.connectorStatus.phase - name: deployment_phase - type: string - name: v1alpha1 - schema: - openAPIV3Schema: - properties: - spec: - properties: - clusterId: - type: string - deployment: - properties: - configMapChecksum: - type: string - deploymentResourceVersion: - type: integer - desiredState: - type: string - kafka: - properties: - id: - type: string - url: - type: string - type: object - processorResourceVersion: - type: integer - processorTypeId: - type: string - schemaRegistry: - properties: - id: - type: string - url: - type: string - type: object - secret: - type: string - unitOfWork: - type: string - type: object - deploymentId: - type: string - operatorSelector: - properties: - id: - type: string - type: - type: string - version: - type: string - type: object - processorId: - type: string - type: object - status: - properties: - conditions: - items: - properties: - lastTransitionTime: - type: string - message: - type: string - observedGeneration: - type: integer - reason: - type: string - status: - type: string - type: - type: string - type: object - type: array - connectorStatus: - properties: - assignedOperator: - properties: - id: - type: string - type: - type: string - version: - type: string - type: object - availableOperator: - properties: - id: - type: string - type: - type: string - version: - type: string - type: object - conditions: - items: - properties: - lastTransitionTime: - type: string - message: - type: string - observedGeneration: - type: integer - reason: - type: string - status: - type: string - type: - type: string - type: object - type: array - phase: - type: string - type: object - deployment: - properties: - configMapChecksum: - type: string - deploymentResourceVersion: - type: integer - desiredState: - type: string - kafka: - properties: - id: - type: string - url: - type: string - type: object - processorResourceVersion: - type: integer - processorTypeId: - type: string - schemaRegistry: - properties: - id: - type: string - url: - type: string - type: object - secret: - type: string - unitOfWork: - type: string - type: object - phase: - enum: - - Initialization - - Augmentation - - Monitor - - Deleting - - Deleted - - Stopping - - Stopped - - Transferring - - Transferred - - Error - - id - type: string - type: object - type: object - served: true - storage: true - subresources: - status: {}