diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 890c7daf7a111..80109e0624145 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -188,6 +188,40 @@ describe('TaskManagerPlugin', () => { }); describe('stop', () => { + test('should stop task polling lifecycle if it is defined', async () => { + const pluginInitializerContext = coreMock.createPluginInitializerContext( + pluginInitializerContextParams + ); + pluginInitializerContext.node.roles.backgroundTasks = true; + const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext); + taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined }); + taskManagerPlugin.start(coreStart, { + cloud: cloudMock.createStart(), + }); + + expect(TaskPollingLifecycle as jest.Mock).toHaveBeenCalledTimes(1); + + await taskManagerPlugin.stop(); + + expect(mockTaskPollingLifecycle.stop).toHaveBeenCalled(); + }); + + test('should not call stop task polling lifecycle if it is not defined', async () => { + const pluginInitializerContext = coreMock.createPluginInitializerContext( + pluginInitializerContextParams + ); + pluginInitializerContext.node.roles.backgroundTasks = false; + const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext); + taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined }); + taskManagerPlugin.start(coreStart, { + cloud: cloudMock.createStart(), + }); + + await taskManagerPlugin.stop(); + + expect(mockTaskPollingLifecycle.stop).not.toHaveBeenCalled(); + }); + test('should remove the current from discovery service', async () => { const pluginInitializerContext = coreMock.createPluginInitializerContext( pluginInitializerContextParams diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 56f73ed1cc6c3..45960195be216 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -409,6 +409,11 @@ export class TaskManagerPlugin } public async stop() { + // Stop polling for tasks + if (this.taskPollingLifecycle) { + this.taskPollingLifecycle.stop(); + } + if (this.kibanaDiscoveryService?.isStarted()) { this.kibanaDiscoveryService.stop(); try { diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.ts b/x-pack/plugins/task_manager/server/polling/task_poller.ts index 7db6f65bedb7b..d61f417d40805 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts @@ -27,6 +27,12 @@ interface Opts { work: WorkFn; } +export interface TaskPoller { + start: () => void; + stop: () => void; + events$: Observable>>; +} + /** * constructs a new TaskPoller stream, which emits events on demand and on a scheduled interval, waiting for capacity to be available before emitting more events. * @@ -45,11 +51,7 @@ export function createTaskPoller({ pollIntervalDelay$, getCapacity, work, -}: Opts): { - start: () => void; - stop: () => void; - events$: Observable>>; -} { +}: Opts): TaskPoller { const hasCapacity = () => getCapacity() > 0; let running: boolean = false; let timeoutId: NodeJS.Timeout | null = null; diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts index d970b2d3a90c7..004729ca2b122 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts @@ -12,6 +12,7 @@ export const taskPollingLifecycleMock = { create(opts: { isStarted?: boolean; events$?: Observable }) { return { attemptToRun: jest.fn(), + stop: jest.fn(), get isStarted() { return opts.isStarted ?? true; }, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index ce874833b5c38..6e3b7416ad787 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -223,6 +223,29 @@ describe('TaskPollingLifecycle', () => { expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled(); }); + test('stops polling if stop() is called', () => { + const elasticsearchAndSOAvailability$ = new Subject(); + const pollingLifecycle = new TaskPollingLifecycle({ + elasticsearchAndSOAvailability$, + ...taskManagerOpts, + config: { + ...taskManagerOpts.config, + poll_interval: 100, + }, + }); + + expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(0); + elasticsearchAndSOAvailability$.next(true); + + clock.tick(50); + expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1); + + pollingLifecycle.stop(); + + clock.tick(100); + expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1); + }); + test('restarts polling once the ES and SavedObjects services become available again', () => { const elasticsearchAndSOAvailability$ = new Subject(); new TaskPollingLifecycle({ diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 001886e03183e..7d8be75c2330c 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -44,6 +44,7 @@ import { delayOnClaimConflicts } from './polling'; import { TaskClaiming } from './queries/task_claiming'; import { ClaimOwnershipResult } from './task_claimers'; import { TaskPartitioner } from './lib/task_partitioner'; +import { TaskPoller } from './polling/task_poller'; const MAX_BUFFER_OPERATIONS = 100; @@ -86,7 +87,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter; + public pool: TaskPool; + // all task related events (task claimed, task marked as running, etc.) are emitted through events$ private events$ = new Subject(); @@ -170,7 +174,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay))))); } - const poller = createTaskPoller({ + this.poller = createTaskPoller({ logger, initialPollInterval: pollInterval, pollInterval$: pollIntervalConfiguration$, @@ -192,17 +196,17 @@ export class TaskPollingLifecycle implements ITaskEventEmitter { if (areESAndSOAvailable) { // start polling for work - poller.start(); + this.poller.start(); } else if (!areESAndSOAvailable) { this.logger.info( `Stopping the task poller because Elasticsearch and/or saved-objects service became unavailable` ); - poller.stop(); + this.poller.stop(); this.pool.cancelRunningTasks(); } }); @@ -212,6 +216,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter { this.events$.next(event); };