diff --git a/src/main/java/org/zalando/nakadi/config/PluginsConfig.java b/src/main/java/org/zalando/nakadi/config/PluginsConfig.java index d36bce176b..6ea5c7f020 100644 --- a/src/main/java/org/zalando/nakadi/config/PluginsConfig.java +++ b/src/main/java/org/zalando/nakadi/config/PluginsConfig.java @@ -54,4 +54,19 @@ public AuthorizationService authorizationService(@Value("${nakadi.plugins.authz. throw new BeanCreationException("Can't create AuthorizationService " + factoryName, e); } } + + @Bean + public TerminationService terminationService(@Value("${nakadi.plugins.termination.factory}") final String factoryName, + final SystemProperties systemProperties, + final DefaultResourceLoader loader) { + try { + LOGGER.info("Initialize per-resource termination service factory: " + factoryName); + final Class factoryClass = + (Class) loader.getClassLoader().loadClass(factoryName); + final TerminationServiceFactory factory = factoryClass.newInstance(); + return factory.init(systemProperties); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new BeanCreationException("Can't create TerminationService " + factoryName, e); + } + } } diff --git a/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java b/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java index d181a904c9..181c699346 100644 --- a/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java +++ b/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java @@ -1,5 +1,7 @@ package org.zalando.nakadi.controller; +import org.eclipse.jetty.http.HttpStatus; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -12,8 +14,18 @@ @RequestMapping(value = "/health", produces = TEXT_PLAIN_VALUE) public class HealthCheckController { + private final TerminationService terminationService; + + @Autowired + public HealthCheckController(final TerminationService terminationService) { + this.terminationService = terminationService; + } + @RequestMapping(method = GET) public ResponseEntity healthCheck() { + if (terminationService.isTerminating()) { + return ResponseEntity.status(HttpStatus.IM_A_TEAPOT_418).build(); + } return ok().body("OK"); } } diff --git a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java new file mode 100644 index 0000000000..fe77984458 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java @@ -0,0 +1,15 @@ +package org.zalando.nakadi.plugin; + +import org.zalando.nakadi.plugin.api.exceptions.PluginException; + +public class DefaultTerminationService implements TerminationService { + + public void register(final TerminationListener terminationRunnable) { + // skip implementation for the local setup + } + + boolean isTerminating() throws PluginException { + return false; + } + +} diff --git a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationServiceFactory.java b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationServiceFactory.java new file mode 100644 index 0000000000..5218e02328 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationServiceFactory.java @@ -0,0 +1,10 @@ +package org.zalando.nakadi.plugin; + +import org.zalando.nakadi.plugin.api.SystemProperties; + +public class DefaultTerminationServiceFactory { + + public DefaultTerminationService init(final SystemProperties systemProperties) { + return new DefaultTerminationService(); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 57369b7444..1e3b1bea3f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -11,6 +11,7 @@ import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; +import org.zalando.nakadi.plugin.api.exceptions.PluginException; import org.zalando.nakadi.service.AuthorizationValidator; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.CursorConverter; @@ -68,6 +69,7 @@ public class StreamingContext implements SubscriptionStreamer { private final NakadiKpiPublisher kpiPublisher; private final Span currentSpan; private final String kpiDataStreamedEventType; + private final TerminationService terminationService; private final long kpiCollectionFrequencyMs; @@ -105,6 +107,7 @@ private StreamingContext(final Builder builder) { this.kpiCollectionFrequencyMs = builder.kpiCollectionFrequencyMs; this.streamMemoryLimitBytes = builder.streamMemoryLimitBytes; this.currentSpan = builder.currentSpan; + this.terminationService = builder.terminationService; } public Span getCurrentSpan() { @@ -166,7 +169,10 @@ public long getKpiCollectionFrequencyMs() { @Override public void stream() throws InterruptedException { try (Closeable ignore = ShutdownHooks.addHook(this::onNodeShutdown)) { // bugfix ARUHA-485 + terminationService.register(this::onInstanceTermination); streamInternal(new StartingState()); + } catch (final PluginException pe) { + log.error("Failed to register instance termination callback for subscription {}", getSubscription(), pe); } catch (final IOException ex) { log.error( "Failed to delete shutdown hook for subscription {}. This method should not throw any exception", @@ -175,6 +181,11 @@ public void stream() throws InterruptedException { } } + void onInstanceTermination() { + log.info("Instance is about to be terminated. Trying to terminate subscription gracefully"); + switchState(new CleanupState(null)); + } + void onNodeShutdown() { log.info("Shutdown hook called. Trying to terminate subscription gracefully"); switchState(new CleanupState(null)); @@ -372,6 +383,7 @@ public static final class Builder { private long kpiCollectionFrequencyMs; private long streamMemoryLimitBytes; private Span currentSpan; + private TerminationService terminationService; public Builder setCurrentSpan(final Span span) { this.currentSpan = span; @@ -493,11 +505,15 @@ public Builder setKpiCollectionFrequencyMs(final long kpiCollectionFrequencyMs) return this; } + public Builder setTerminationService(final TerminationService terminationService) { + this.terminationService = terminationService; + return this; + } + public StreamingContext build() { return new StreamingContext(this); } - } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java index dab1a4ab14..bd43cf9c28 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java @@ -30,7 +30,6 @@ public void onEnter() { } finally { try { getContext().unregisterSession(); - } finally { switchState(StreamingContext.DEAD_STATE); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index cb8d749170..c63701ad79 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -656,7 +656,7 @@ private void addToStreaming(final Partition partition, LoggerFactory.getLogger(LogPathBuilder.build( getContext().getSubscription().getId(), getSessionId(), String.valueOf(partition.getKey()))), System.currentTimeMillis(), this.getContext().getParameters().batchTimespan - ); + ); offsets.put(partition.getKey(), pd); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0fb0cafab2..936284b998 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -98,6 +98,8 @@ nakadi: factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory authz: factory: org.zalando.nakadi.plugin.auth.DefaultAuthorizationServiceFactory + termination: + factory: org.zalando.nakadi.plugin.DefaultTerminationServiceFactory event.max.bytes: 999000 timeline.wait.timeoutMs: 40000 subscription: