diff --git a/api/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java b/api/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java index 2317f830..59094075 100644 --- a/api/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java +++ b/api/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java @@ -100,31 +100,22 @@ public Single> apply(String namespace, @Valid @Body Schema .defaultIfEmpty(Optional.empty()) .flatMapSingle(latestSubjectOptional -> schemaService .register(ns, schema) - .flatMap(id -> schemaService - .getLatestSubject(ns, schema.getMetadata().getName()) - .map(Optional::of) - .defaultIfEmpty(Optional.empty()) - .flatMapSingle(registeredSchemaOptional -> { - if (registeredSchemaOptional.isEmpty()) { - return Single.error(new Exception(String.format("Cannot register schema %s", schema.getMetadata().getName()))); - } - - Schema registeredSchema = registeredSchemaOptional.get(); - ApplyStatus status; - - if (latestSubjectOptional.isEmpty()) { - status = ApplyStatus.created; - sendEventLog(schema.getKind(), registeredSchema.getMetadata(), status, null, registeredSchema.getSpec()); - } else if (registeredSchema.getSpec().getVersion() > latestSubjectOptional.get().getSpec().getVersion()) { - status = ApplyStatus.changed; - sendEventLog(schema.getKind(), registeredSchema.getMetadata(), status, - latestSubjectOptional.get().getSpec(), registeredSchema.getSpec()); - } else { - status = ApplyStatus.unchanged; - } - - return Single.just(formatHttpResponse(schema, status)); - }))); + .map(id -> { + ApplyStatus status; + + if (latestSubjectOptional.isEmpty()) { + status = ApplyStatus.created; + sendEventLog(schema.getKind(), schema.getMetadata(), status, null, schema.getSpec()); + } else if (!id.equals(latestSubjectOptional.get().getSpec().getId())) { + status = ApplyStatus.changed; + sendEventLog(schema.getKind(), schema.getMetadata(), status, latestSubjectOptional.get().getSpec(), + schema.getSpec()); + } else { + status = ApplyStatus.unchanged; + } + + return formatHttpResponse(schema, status); + })); }); } diff --git a/api/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java b/api/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java index aa4b7065..93f0acb2 100644 --- a/api/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java +++ b/api/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java @@ -69,9 +69,7 @@ void applyCreated() { when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Single.just(List.of())); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())) - .thenReturn(Maybe.empty()) - .thenReturn(Maybe.just(schema)); + when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Maybe.empty()); when(schemaService.register(namespace, schema)).thenReturn(Single.just(1)); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); @@ -92,15 +90,12 @@ void applyCreated() { void applyChanged() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); - Schema schemaV2 = buildSchema(); - schemaV2.getSpec().setVersion(2); when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Single.just(List.of())); when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())) - .thenReturn(Maybe.just(schema)) - .thenReturn(Maybe.just(schemaV2)); + .thenReturn(Maybe.just(schema)); when(schemaService.register(namespace, schema)).thenReturn(Single.just(2)); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); @@ -125,9 +120,7 @@ void applyUnchanged() { when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Single.just(List.of())); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())) - .thenReturn(Maybe.just(schema)) - .thenReturn(Maybe.just(schema)); + when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Maybe.just(schema)); when(schemaService.register(namespace, schema)).thenReturn(Single.just(1)); schemaController.apply("myNamespace", schema, false)