From e2bc3275affe02f7066f7a6d0c395e95bf89d65d Mon Sep 17 00:00:00 2001 From: Max Neverov Date: Fri, 27 Dec 2019 15:46:58 +0100 Subject: [PATCH 1/2] Fix for issue 1121: do not delete topics if retention time is -1 --- docs/_documentation/faq.md | 2 +- .../nakadi/webservice/EventTypeAT.java | 32 ++++++++++++++++++- .../nakadi/service/EventTypeService.java | 18 ++++++++--- .../service/timeline/TimelineService.java | 6 +++- 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/docs/_documentation/faq.md b/docs/_documentation/faq.md index 46a0868fb9..338599da73 100644 --- a/docs/_documentation/faq.md +++ b/docs/_documentation/faq.md @@ -29,7 +29,7 @@ position: 14 #### How long will events be persisted for? -The default retention time in the project is set by the `retentionMs` value in `application.yml`, which is currently 2 days. +The default retention time in the project is set by the `retentionMs` value in `application.yml`, which is currently 2 days. If set to -1, no time limit is applied. The service installation you're working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service. diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventTypeAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventTypeAT.java index 925749e044..36e7038850 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventTypeAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventTypeAT.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableList; + import org.apache.http.HttpStatus; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -200,7 +201,6 @@ public void whenUpdateRetentionTimeThenUpdateInKafkaAndDB() throws Exception { final EventType eventType = NakadiTestUtils.createEventType(); IntStream.range(0, 15).forEach(x -> publishEvent(eventType.getName(), "{\"foo\":\"bar\"}")); NakadiTestUtils.switchTimelineDefaultStorage(eventType); - NakadiTestUtils.switchTimelineDefaultStorage(eventType); List timelines = NakadiTestUtils.listTimelines(eventType.getName()); final String cleanupTimeBeforeUpdate = (String) timelines.get(0).get("cleaned_up_at"); @@ -228,6 +228,36 @@ public void whenUpdateRetentionTimeThenUpdateInKafkaAndDB() throws Exception { assertRetentionTime(newRetentionTime, eventType.getName()); } + @Test + public void whenUpdateRetentionTimeWithNegativeValueThenUpdateInKafkaAndDB() throws Exception { + final EventType eventType = NakadiTestUtils.createEventType(); + IntStream.range(0, 15).forEach(x -> publishEvent(eventType.getName(), "{\"foo\":\"bar\"}")); + NakadiTestUtils.switchTimelineDefaultStorage(eventType); + + List timelines = NakadiTestUtils.listTimelines(eventType.getName()); + final String cleanupTimeBeforeUpdate = (String) timelines.get(0).get("cleaned_up_at"); + Assert.assertNotNull("clean up time before update should not be null", cleanupTimeBeforeUpdate); + final Long defaultRetentionTime = 172800000L; + assertRetentionTime(defaultRetentionTime, eventType.getName()); + + final Long infiniteRetentionTime = -1L; + eventType.getOptions().setRetentionTime(infiniteRetentionTime); + final String updateBody = MAPPER.writer().writeValueAsString(eventType); + given().body(updateBody) + .header("accept", "application/json") + .contentType(JSON) + .put(ENDPOINT + "/" + eventType.getName()) + .then() + .body(equalTo("")) + .statusCode(HttpStatus.SC_OK); + + timelines = NakadiTestUtils.listTimelines(eventType.getName()); + final String cleanupTimeAfterUpdate = (String) timelines.get(0).get("cleaned_up_at"); + Assert.assertNull("clean up time after update should be null", cleanupTimeAfterUpdate); + + assertRetentionTime(infiniteRetentionTime, eventType.getName()); + } + @Test(timeout = 10000) public void compactedEventTypeJourney() throws IOException { // create event type with 'compact' cleanup_policy diff --git a/src/main/java/org/zalando/nakadi/service/EventTypeService.java b/src/main/java/org/zalando/nakadi/service/EventTypeService.java index 3179bd607b..a5559160d5 100644 --- a/src/main/java/org/zalando/nakadi/service/EventTypeService.java +++ b/src/main/java/org/zalando/nakadi/service/EventTypeService.java @@ -2,6 +2,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; + import org.everit.json.schema.Schema; import org.everit.json.schema.SchemaException; import org.everit.json.schema.loader.SchemaLoader; @@ -485,14 +486,23 @@ private void updateTimelinesCleanup(final String eventType, final Long newRetent throws InternalNakadiException, NoSuchEventTypeException { if (newRetentionTime != null && !newRetentionTime.equals(oldRetentionTime)) { - final long retentionDiffMs = newRetentionTime - oldRetentionTime; + long retentionDiffMs = newRetentionTime - oldRetentionTime; + if (newRetentionTime < 0 || oldRetentionTime < 0) { + retentionDiffMs = newRetentionTime; + } final List timelines = timelineService.getActiveTimelinesOrdered(eventType); for (final Timeline timeline : timelines) { - if (timeline.getCleanedUpAt() != null) { - timeline.setCleanedUpAt(new Date(timeline.getCleanedUpAt().getTime() + retentionDiffMs)); - timelineService.updateTimeline(timeline); + Date cleanedUpAt; + if (retentionDiffMs < 0) { + cleanedUpAt = null; + } else if (timeline.getCleanedUpAt() != null) { + cleanedUpAt = new Date(timeline.getCleanedUpAt().getTime() + retentionDiffMs); + } else { + cleanedUpAt = new Date(System.currentTimeMillis() + retentionDiffMs); } + timeline.setCleanedUpAt(cleanedUpAt); + timelineService.updateTimeline(timeline); } } } diff --git a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index 8469d38a79..0b60a73c60 100644 --- a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -343,7 +343,11 @@ private void scheduleTimelineCleanup(final Timeline timeline) throws Inconsisten if (retentionTime == null) { throw new InconsistentStateException("Event type should has information about its retention time"); } - final Date cleanupDate = new Date(System.currentTimeMillis() + retentionTime); + + Date cleanupDate = null; + if (retentionTime >= 0) { + cleanupDate = new Date(System.currentTimeMillis() + retentionTime); + } timeline.setCleanedUpAt(cleanupDate); } catch (final InternalNakadiException | NoSuchEventTypeException e) { throw new InconsistentStateException("Unexpected error occurred when scheduling timeline cleanup", e); From 5033fb855f0b1734fd199df883325247087882b7 Mon Sep 17 00:00:00 2001 From: Max Neverov Date: Fri, 27 Dec 2019 20:46:23 +0100 Subject: [PATCH 2/2] Fix checkstyle --- .../java/org/zalando/nakadi/service/EventTypeService.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventTypeService.java b/src/main/java/org/zalando/nakadi/service/EventTypeService.java index a5559160d5..d84cb034df 100644 --- a/src/main/java/org/zalando/nakadi/service/EventTypeService.java +++ b/src/main/java/org/zalando/nakadi/service/EventTypeService.java @@ -493,15 +493,13 @@ private void updateTimelinesCleanup(final String eventType, final Long newRetent final List timelines = timelineService.getActiveTimelinesOrdered(eventType); for (final Timeline timeline : timelines) { - Date cleanedUpAt; if (retentionDiffMs < 0) { - cleanedUpAt = null; + timeline.setCleanedUpAt(null); } else if (timeline.getCleanedUpAt() != null) { - cleanedUpAt = new Date(timeline.getCleanedUpAt().getTime() + retentionDiffMs); + timeline.setCleanedUpAt(new Date(timeline.getCleanedUpAt().getTime() + retentionDiffMs)); } else { - cleanedUpAt = new Date(System.currentTimeMillis() + retentionDiffMs); + timeline.setCleanedUpAt(new Date(System.currentTimeMillis() + retentionDiffMs)); } - timeline.setCleanedUpAt(cleanedUpAt); timelineService.updateTimeline(timeline); } }