Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

instance termination plugin support #1134

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/main/java/org/zalando/nakadi/config/PluginsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,19 @@ public AuthorizationService authorizationService(@Value("${nakadi.plugins.authz.
throw new BeanCreationException("Can't create AuthorizationService " + factoryName, e);
}
}

@Bean
public TerminationService terminationService(@Value("${nakadi.plugins.termination.factory}") final String factoryName,
final SystemProperties systemProperties,
final DefaultResourceLoader loader) {
try {
LOGGER.info("Initialize per-resource termination service factory: " + factoryName);
final Class<TerminationServiceFactory> factoryClass =
(Class<TerminationServiceFactory>) loader.getClassLoader().loadClass(factoryName);
final TerminationServiceFactory factory = factoryClass.newInstance();
return factory.init(systemProperties);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new BeanCreationException("Can't create TerminationService " + factoryName, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.zalando.nakadi.controller;

import org.eclipse.jetty.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -12,8 +14,18 @@
@RequestMapping(value = "/health", produces = TEXT_PLAIN_VALUE)
public class HealthCheckController {

private final TerminationService terminationService;

@Autowired
public HealthCheckController(final TerminationService terminationService) {
this.terminationService = terminationService;
}

@RequestMapping(method = GET)
public ResponseEntity<String> healthCheck() {
if (terminationService.isTerminating()) {
return ResponseEntity.status(HttpStatus.IM_A_TEAPOT_418).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be used by Nakadi Proxy? When maintaining the list of healthy Nakadi instances, those who report 418 during health check will not be added to the final result but at the same time not marked as unhealthy?

}
return ok().body("OK");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.zalando.nakadi.plugin;

import org.zalando.nakadi.plugin.api.exceptions.PluginException;

public class DefaultTerminationService implements TerminationService {

public void register(final TerminationListener terminationRunnable) {
// skip implementation for the local setup
}

boolean isTerminating() throws PluginException {
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.zalando.nakadi.plugin;

import org.zalando.nakadi.plugin.api.SystemProperties;

public class DefaultTerminationServiceFactory {

public DefaultTerminationService init(final SystemProperties systemProperties) {
return new DefaultTerminationService();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.plugin.api.exceptions.PluginException;
import org.zalando.nakadi.service.AuthorizationValidator;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.service.CursorConverter;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class StreamingContext implements SubscriptionStreamer {
private final NakadiKpiPublisher kpiPublisher;
private final Span currentSpan;
private final String kpiDataStreamedEventType;
private final TerminationService terminationService;

private final long kpiCollectionFrequencyMs;

Expand Down Expand Up @@ -105,6 +107,7 @@ private StreamingContext(final Builder builder) {
this.kpiCollectionFrequencyMs = builder.kpiCollectionFrequencyMs;
this.streamMemoryLimitBytes = builder.streamMemoryLimitBytes;
this.currentSpan = builder.currentSpan;
this.terminationService = builder.terminationService;
}

public Span getCurrentSpan() {
Expand Down Expand Up @@ -166,7 +169,10 @@ public long getKpiCollectionFrequencyMs() {
@Override
public void stream() throws InterruptedException {
try (Closeable ignore = ShutdownHooks.addHook(this::onNodeShutdown)) { // bugfix ARUHA-485
terminationService.register(this::onInstanceTermination);
streamInternal(new StartingState());
} catch (final PluginException pe) {
log.error("Failed to register instance termination callback for subscription {}", getSubscription(), pe);
} catch (final IOException ex) {
log.error(
"Failed to delete shutdown hook for subscription {}. This method should not throw any exception",
Expand All @@ -175,6 +181,11 @@ public void stream() throws InterruptedException {
}
}

void onInstanceTermination() {
log.info("Instance is about to be terminated. Trying to terminate subscription gracefully");
switchState(new CleanupState(null));
}

void onNodeShutdown() {
log.info("Shutdown hook called. Trying to terminate subscription gracefully");
switchState(new CleanupState(null));
Expand Down Expand Up @@ -372,6 +383,7 @@ public static final class Builder {
private long kpiCollectionFrequencyMs;
private long streamMemoryLimitBytes;
private Span currentSpan;
private TerminationService terminationService;

public Builder setCurrentSpan(final Span span) {
this.currentSpan = span;
Expand Down Expand Up @@ -493,11 +505,15 @@ public Builder setKpiCollectionFrequencyMs(final long kpiCollectionFrequencyMs)
return this;
}

public Builder setTerminationService(final TerminationService terminationService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this is set also in the constructor. Is this used or needed?

this.terminationService = terminationService;
return this;
}

public StreamingContext build() {
return new StreamingContext(this);
}


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public void onEnter() {
} finally {
try {
getContext().unregisterSession();

} finally {
switchState(StreamingContext.DEAD_STATE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ private void addToStreaming(final Partition partition,
LoggerFactory.getLogger(LogPathBuilder.build(
getContext().getSubscription().getId(), getSessionId(), String.valueOf(partition.getKey()))),
System.currentTimeMillis(), this.getContext().getParameters().batchTimespan
);
);

offsets.put(partition.getKey(), pd);
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ nakadi:
factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory
authz:
factory: org.zalando.nakadi.plugin.auth.DefaultAuthorizationServiceFactory
termination:
factory: org.zalando.nakadi.plugin.DefaultTerminationServiceFactory
event.max.bytes: 999000
timeline.wait.timeoutMs: 40000
subscription:
Expand Down