Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sbose2k21 committed Oct 8, 2024
1 parent 837e9d3 commit d6e5036
Showing 1 changed file with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,19 @@ public void start(final Buffer<Record<Event>> buffer) {
public void shutDown() {
LOG.info("Stop request received for Kinesis Source");

if (scheduler == null) {
LOG.info("Scheduler not initialized!!");
return;
}

Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
LOG.info("Waiting up to {} seconds for shutdown to complete.", GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS);
try {
gracefulShutdownFuture.get(GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
LOG.error("Exception while executing kinesis consumer graceful shutdown, doing force shutdown", ex);
scheduler.shutdown();
if (scheduler != null) {
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
LOG.info("Waiting up to {} seconds for shutdown to complete.", GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS);
try {
gracefulShutdownFuture.get(GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
LOG.error("Exception while executing kinesis consumer graceful shutdown, doing force shutdown", ex);
scheduler.shutdown();
}
LOG.info("Completed, shutting down now.");
} else {
LOG.info("The Kinesis Scheduler was not initialized.");
}
LOG.info("Completed, shutting down now.");
}

public Scheduler getScheduler(final Buffer<Record<Event>> buffer) {
Expand Down

0 comments on commit d6e5036

Please sign in to comment.