Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Do not delete topics if retention time is -1 #1135

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/_documentation/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ position: 14

#### How long will events be persisted for?

The default retention time in the project is set by the `retentionMs` value in `application.yml`, which is currently 2 days.
The default retention time in the project is set by the `retentionMs` value in `application.yml`, which is currently 2 days. If set to -1, no time limit is applied.

The service installation you're working with may have a different operational setting, and you should get in touch with the team operating that internal Nakadi service.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;

import org.apache.http.HttpStatus;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -200,7 +201,6 @@ public void whenUpdateRetentionTimeThenUpdateInKafkaAndDB() throws Exception {
final EventType eventType = NakadiTestUtils.createEventType();
IntStream.range(0, 15).forEach(x -> publishEvent(eventType.getName(), "{\"foo\":\"bar\"}"));
NakadiTestUtils.switchTimelineDefaultStorage(eventType);
NakadiTestUtils.switchTimelineDefaultStorage(eventType);

List<Map> timelines = NakadiTestUtils.listTimelines(eventType.getName());
final String cleanupTimeBeforeUpdate = (String) timelines.get(0).get("cleaned_up_at");
Expand Down Expand Up @@ -228,6 +228,36 @@ public void whenUpdateRetentionTimeThenUpdateInKafkaAndDB() throws Exception {
assertRetentionTime(newRetentionTime, eventType.getName());
}

@Test
public void whenUpdateRetentionTimeWithNegativeValueThenUpdateInKafkaAndDB() throws Exception {
final EventType eventType = NakadiTestUtils.createEventType();
IntStream.range(0, 15).forEach(x -> publishEvent(eventType.getName(), "{\"foo\":\"bar\"}"));
NakadiTestUtils.switchTimelineDefaultStorage(eventType);

List<Map> timelines = NakadiTestUtils.listTimelines(eventType.getName());
final String cleanupTimeBeforeUpdate = (String) timelines.get(0).get("cleaned_up_at");
Assert.assertNotNull("clean up time before update should not be null", cleanupTimeBeforeUpdate);
final Long defaultRetentionTime = 172800000L;
assertRetentionTime(defaultRetentionTime, eventType.getName());

final Long infiniteRetentionTime = -1L;
eventType.getOptions().setRetentionTime(infiniteRetentionTime);
final String updateBody = MAPPER.writer().writeValueAsString(eventType);
given().body(updateBody)
.header("accept", "application/json")
.contentType(JSON)
.put(ENDPOINT + "/" + eventType.getName())
.then()
.body(equalTo(""))
.statusCode(HttpStatus.SC_OK);

timelines = NakadiTestUtils.listTimelines(eventType.getName());
final String cleanupTimeAfterUpdate = (String) timelines.get(0).get("cleaned_up_at");
Assert.assertNull("clean up time after update should be null", cleanupTimeAfterUpdate);

assertRetentionTime(infiniteRetentionTime, eventType.getName());
}

@Test(timeout = 10000)
public void compactedEventTypeJourney() throws IOException {
// create event type with 'compact' cleanup_policy
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/org/zalando/nakadi/service/EventTypeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;

import org.everit.json.schema.Schema;
import org.everit.json.schema.SchemaException;
import org.everit.json.schema.loader.SchemaLoader;
Expand Down Expand Up @@ -485,14 +486,21 @@ private void updateTimelinesCleanup(final String eventType, final Long newRetent
throws InternalNakadiException, NoSuchEventTypeException {

if (newRetentionTime != null && !newRetentionTime.equals(oldRetentionTime)) {
final long retentionDiffMs = newRetentionTime - oldRetentionTime;
long retentionDiffMs = newRetentionTime - oldRetentionTime;
if (newRetentionTime < 0 || oldRetentionTime < 0) {
retentionDiffMs = newRetentionTime;
}
final List<Timeline> timelines = timelineService.getActiveTimelinesOrdered(eventType);

for (final Timeline timeline : timelines) {
if (timeline.getCleanedUpAt() != null) {
if (retentionDiffMs < 0) {
timeline.setCleanedUpAt(null);
} else if (timeline.getCleanedUpAt() != null) {
timeline.setCleanedUpAt(new Date(timeline.getCleanedUpAt().getTime() + retentionDiffMs));
timelineService.updateTimeline(timeline);
} else {
timeline.setCleanedUpAt(new Date(System.currentTimeMillis() + retentionDiffMs));
}
timelineService.updateTimeline(timeline);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ private void scheduleTimelineCleanup(final Timeline timeline) throws Inconsisten
if (retentionTime == null) {
throw new InconsistentStateException("Event type should has information about its retention time");
}
final Date cleanupDate = new Date(System.currentTimeMillis() + retentionTime);

Date cleanupDate = null;
if (retentionTime >= 0) {
cleanupDate = new Date(System.currentTimeMillis() + retentionTime);
}
timeline.setCleanedUpAt(cleanupDate);
} catch (final InternalNakadiException | NoSuchEventTypeException e) {
throw new InconsistentStateException("Unexpected error occurred when scheduling timeline cleanup", e);
Expand Down