diff --git a/packages/api-headless-cms-import-export/src/crud/utils/makeSureModelsAreIdentical.ts b/packages/api-headless-cms-import-export/src/crud/utils/makeSureModelsAreIdentical.ts index f91cc8f8741..51cbbb5dbc8 100644 --- a/packages/api-headless-cms-import-export/src/crud/utils/makeSureModelsAreIdentical.ts +++ b/packages/api-headless-cms-import-export/src/crud/utils/makeSureModelsAreIdentical.ts @@ -66,7 +66,9 @@ export const makeSureModelsAreIdentical = (params: IMakeSureModelsAreIdenticalPa message: `Field "${value.field.fieldId}" not found in the model provided via the JSON data.`, code: "MODEL_FIELD_NOT_FOUND", data: { - ...value + field: value, + targetValues, + modelValues } }); } diff --git a/packages/api-headless-cms-import-export/src/tasks/domain/ImportFromUrlController.ts b/packages/api-headless-cms-import-export/src/tasks/domain/ImportFromUrlController.ts index 213ac2f1665..c6cc700a30f 100644 --- a/packages/api-headless-cms-import-export/src/tasks/domain/ImportFromUrlController.ts +++ b/packages/api-headless-cms-import-export/src/tasks/domain/ImportFromUrlController.ts @@ -1,7 +1,8 @@ import type { ITaskResponseResult, ITaskRunParams } from "@webiny/tasks"; -import type { +import { IImportFromUrlController, IImportFromUrlControllerInput, + IImportFromUrlControllerInputStepsStep, IImportFromUrlControllerOutput } from "~/tasks/domain/abstractions/ImportFromUrlController"; import { IImportFromUrlControllerInputStep } from "~/tasks/domain/abstractions/ImportFromUrlController"; @@ -10,7 +11,7 @@ import { ImportFromUrlControllerDownloadStep } from "~/tasks/domain/importFromUr import { ImportFromUrlControllerProcessEntriesStep } from "./importFromUrlControllerSteps/ImportFromUrlControllerProcessEntriesStep"; import { ImportFromUrlControllerProcessAssetsStep } from "./importFromUrlControllerSteps/ImportFromUrlControllerProcessAssetsStep"; -const getDefaultStepValues = () => { +const getDefaultStepValues = (): IImportFromUrlControllerInputStepsStep => { return { files: [], triggered: false, @@ -56,7 +57,7 @@ export class ImportFromUrlController< const downloadStep = steps[IImportFromUrlControllerInputStep.DOWNLOAD] || getDefaultStepValues(); - if (!downloadStep.done) { + if (!downloadStep.finished) { const step = new ImportFromUrlControllerDownloadStep(); return await step.execute(params); } else if (downloadStep.failed.length) { @@ -69,7 +70,7 @@ export class ImportFromUrlController< const processEntriesStep = steps[IImportFromUrlControllerInputStep.PROCESS_ENTRIES] || getDefaultStepValues(); - if (!processEntriesStep.done) { + if (!processEntriesStep.finished) { const step = new ImportFromUrlControllerProcessEntriesStep(); return await step.execute(params); } else if (processEntriesStep.failed.length) { @@ -82,7 +83,7 @@ export class ImportFromUrlController< const processAssetsStep = steps[IImportFromUrlControllerInputStep.PROCESS_ASSETS] || getDefaultStepValues(); - if (!processAssetsStep.done) { + if (!processAssetsStep.finished) { const step = new ImportFromUrlControllerProcessAssetsStep(); return await step.execute(params); } else if (processAssetsStep.failed.length) { diff --git a/packages/api-headless-cms-import-export/src/tasks/domain/abstractions/ImportFromUrlController.ts b/packages/api-headless-cms-import-export/src/tasks/domain/abstractions/ImportFromUrlController.ts index ea93fa7f0f3..21a0a88d46e 100644 --- a/packages/api-headless-cms-import-export/src/tasks/domain/abstractions/ImportFromUrlController.ts +++ b/packages/api-headless-cms-import-export/src/tasks/domain/abstractions/ImportFromUrlController.ts @@ -16,34 +16,20 @@ export enum IImportFromUrlControllerInputStep { PROCESS_ASSETS = "processAssets" } +export interface IImportFromUrlControllerInputStepsStep { + files: string[]; + triggered: boolean; + finished: boolean; + done: string[]; + failed: string[]; + invalid: string[]; + aborted: string[]; +} + export interface IImportFromUrlControllerInputSteps { - [IImportFromUrlControllerInputStep.DOWNLOAD]?: { - files: string[]; - triggered: boolean; - finished: boolean; - done: string[]; - failed: string[]; - invalid: string[]; - aborted: string[]; - }; - [IImportFromUrlControllerInputStep.PROCESS_ENTRIES]?: { - files: string[]; - triggered: boolean; - finished: boolean; - done: string[]; - failed: string[]; - invalid: string[]; - aborted: string[]; - }; - [IImportFromUrlControllerInputStep.PROCESS_ASSETS]?: { - files: string[]; - triggered: boolean; - finished: boolean; - done: string[]; - failed: string[]; - invalid: string[]; - aborted: string[]; - }; + [IImportFromUrlControllerInputStep.DOWNLOAD]?: IImportFromUrlControllerInputStepsStep; + [IImportFromUrlControllerInputStep.PROCESS_ENTRIES]?: IImportFromUrlControllerInputStepsStep; + [IImportFromUrlControllerInputStep.PROCESS_ASSETS]?: IImportFromUrlControllerInputStepsStep; } export interface IImportFromUrlControllerInput { diff --git a/packages/api-headless-cms-import-export/src/tasks/domain/importFromUrlControllerSteps/getChildTasks.ts b/packages/api-headless-cms-import-export/src/tasks/domain/importFromUrlControllerSteps/getChildTasks.ts index e37a7187778..45a2a2143ea 100644 --- a/packages/api-headless-cms-import-export/src/tasks/domain/importFromUrlControllerSteps/getChildTasks.ts +++ b/packages/api-headless-cms-import-export/src/tasks/domain/importFromUrlControllerSteps/getChildTasks.ts @@ -1,6 +1,7 @@ import type { ITask, ITaskResponseDoneResultOutput } from "@webiny/tasks"; import { TaskDataStatus } from "@webiny/tasks"; import type { Context } from "~/types"; +import { IStepFunctionServiceFetchResult } from "@webiny/tasks/service/StepFunctionServicePlugin"; export interface IGetChildTasksParams { context: Context; @@ -8,6 +9,31 @@ export interface IGetChildTasksParams { definition: string; } +const mapServiceStatusToTaskStatus = ( + task: ITask, + serviceInfo: IStepFunctionServiceFetchResult | null +) => { + if (!serviceInfo) { + console.log(`Service info is missing for task ${task.id} (${task.definitionId}).`); + return null; + } + if (serviceInfo.status === "RUNNING") { + return TaskDataStatus.RUNNING; + } else if (serviceInfo.status === "SUCCEEDED") { + return TaskDataStatus.SUCCESS; + } else if (serviceInfo.status === "FAILED") { + return TaskDataStatus.FAILED; + } else if (serviceInfo.status === "ABORTED") { + return TaskDataStatus.ABORTED; + } else if (serviceInfo.status === "TIMED_OUT" || serviceInfo.status === "PENDING_REDRIVE") { + console.log( + `Service status is ${serviceInfo.status} for task ${task.id} (${task.definitionId}).` + ); + return null; + } + return TaskDataStatus.PENDING; +}; + export const getChildTasks = async ({ context, task, @@ -24,14 +50,33 @@ export const getChildTasks = async ( where: { parentId: task.id, definitionId: definition - } + }, + limit: 100000 }); for (const task of items) { collection.push(task); + if ( task.taskStatus === TaskDataStatus.RUNNING || task.taskStatus === TaskDataStatus.PENDING ) { + /** + * We also need to check the actual status of the service. + * It can happen that the task is marked as running, but the service is not running. + */ + const serviceInfo = await context.tasks.fetchServiceInfo(task); + const status = mapServiceStatusToTaskStatus(task, serviceInfo); + + if (status === null || !serviceInfo) { + invalid.push(task.id); + continue; + } else if (status !== task.taskStatus) { + console.error( + `Status of the task is not same as the status of the service (task: ${task.taskStatus}, service: ${status} / ${serviceInfo.status}).` + ); + invalid.push(task.id); + continue; + } running.push(task.id); continue; } else if (task.taskStatus === TaskDataStatus.SUCCESS) { diff --git a/packages/tasks/src/context.ts b/packages/tasks/src/context.ts index 90cc88f38f4..bdb12e5b065 100644 --- a/packages/tasks/src/context.ts +++ b/packages/tasks/src/context.ts @@ -6,7 +6,7 @@ import { createDefinitionCrud } from "./crud/definition.tasks"; import { createServiceCrud } from "~/crud/service.tasks"; import { createTaskCrud } from "./crud/crud.tasks"; import { createTestingRunTask } from "~/tasks/testingRunTask"; -import { createTransportPlugins } from "~/crud/transport"; +import { createServicePlugins } from "~/service"; const createTasksCrud = () => { const plugin = new ContextPlugin(async context => { @@ -23,7 +23,7 @@ const createTasksCrud = () => { }; const createTasksContext = (): Plugin[] => { - return [...createTransportPlugins(), ...createTaskModel(), createTasksCrud()]; + return [...createServicePlugins(), ...createTaskModel(), createTasksCrud()]; }; export const createBackgroundTaskContext = (): Plugin[] => { diff --git a/packages/tasks/src/crud/service.tasks.ts b/packages/tasks/src/crud/service.tasks.ts index 1a393999eb6..5856cff9455 100644 --- a/packages/tasks/src/crud/service.tasks.ts +++ b/packages/tasks/src/crud/service.tasks.ts @@ -12,7 +12,8 @@ import type { } from "~/types"; import { TaskDataStatus, TaskLogItemType } from "~/types"; import { NotFoundError } from "@webiny/handler-graphql"; -import { createService } from "~/service/createService"; +import { createService } from "~/service"; +import { IStepFunctionServiceFetchResult } from "~/service/StepFunctionServicePlugin"; const MAX_DELAY_DAYS = 355; const MAX_DELAY_SECONDS = MAX_DELAY_DAYS * 24 * 60 * 60; @@ -101,7 +102,9 @@ export const createServiceCrud = (context: Context): ITasksContextServiceObject eventResponse: result }); }, - fetchServiceInfo: async (input: ITask | string) => { + fetchServiceInfo: async ( + input: ITask | string + ): Promise => { const task = typeof input === "object" ? input : await context.tasks.getTask(input); if (!task && typeof input === "string") { throw new NotFoundError(`Task "${input}" was not found!`); @@ -112,7 +115,7 @@ export const createServiceCrud = (context: Context): ITasksContextServiceObject } try { - return await service.fetch(task); + return (await service.fetch(task)) as IStepFunctionServiceFetchResult | null; } catch (ex) { console.log("Service fetch error."); console.error(ex); diff --git a/packages/tasks/src/plugins/TaskServicePlugin.ts b/packages/tasks/src/plugins/TaskServicePlugin.ts index 4808f516192..be0beaa223d 100644 --- a/packages/tasks/src/plugins/TaskServicePlugin.ts +++ b/packages/tasks/src/plugins/TaskServicePlugin.ts @@ -1,6 +1,5 @@ import { Plugin } from "@webiny/plugins"; import { Context, ITask } from "~/types"; -import { GenericRecord } from "@webiny/api/types"; export interface ITaskServiceCreatePluginParams { context: Context; @@ -10,16 +9,16 @@ export interface ITaskServiceCreatePluginParams { export type ITaskServiceTask = Pick; -export interface ITaskService { - send(task: ITaskServiceTask, delay: number): Promise; - fetch(task: ITask): Promise; +export interface ITaskService { + send(task: ITaskServiceTask, delay: number): Promise; + fetch(task: ITask): Promise; } export interface ITaskServicePluginParams { default?: boolean; } -export abstract class TaskServicePlugin extends Plugin { +export abstract class TaskServicePlugin extends Plugin { public static override readonly type: string = "tasks.taskService"; public readonly default: boolean; @@ -28,5 +27,5 @@ export abstract class TaskServicePlugin extends Plugin { this.default = !!params?.default; } - public abstract createService(params: ITaskServiceCreatePluginParams): ITaskService; + public abstract createService(params: ITaskServiceCreatePluginParams): ITaskService; } diff --git a/packages/tasks/src/crud/transport/EventBridgeEventTransportPlugin.ts b/packages/tasks/src/service/EventBridgeEventTransportPlugin.ts similarity index 97% rename from packages/tasks/src/crud/transport/EventBridgeEventTransportPlugin.ts rename to packages/tasks/src/service/EventBridgeEventTransportPlugin.ts index aafb20ebd69..76cdcd1be84 100644 --- a/packages/tasks/src/crud/transport/EventBridgeEventTransportPlugin.ts +++ b/packages/tasks/src/service/EventBridgeEventTransportPlugin.ts @@ -10,7 +10,7 @@ import { EventBridgeClient, PutEventsCommand } from "@webiny/aws-sdk/client-even import { WebinyError } from "@webiny/error"; import { GenericRecord } from "@webiny/api/types"; -class EventBridgeService implements ITaskService { +class EventBridgeService implements ITaskService { protected readonly context: Context; protected readonly getTenant: () => string; protected readonly getLocale: () => string; diff --git a/packages/tasks/src/crud/transport/StepFunctionServicePlugin.ts b/packages/tasks/src/service/StepFunctionServicePlugin.ts similarity index 91% rename from packages/tasks/src/crud/transport/StepFunctionServicePlugin.ts rename to packages/tasks/src/service/StepFunctionServicePlugin.ts index db606a6c843..de538b86948 100644 --- a/packages/tasks/src/crud/transport/StepFunctionServicePlugin.ts +++ b/packages/tasks/src/service/StepFunctionServicePlugin.ts @@ -4,9 +4,9 @@ import { ITaskServiceTask, TaskServicePlugin } from "~/plugins"; -import { GenericRecord } from "@webiny/api/types"; import { createStepFunctionClient, + DescribeExecutionCommandOutput, describeExecutionFactory, triggerStepFunctionFactory } from "@webiny/aws-sdk/client-sfn"; @@ -15,11 +15,13 @@ import { generateAlphaNumericId } from "@webiny/utils"; import { ServiceDiscovery } from "@webiny/api"; import { ITask } from "~/types"; +export type IStepFunctionServiceFetchResult = DescribeExecutionCommandOutput; + export interface IDetailWrapper { detail: T; } -class StepFunctionService implements ITaskService { +class StepFunctionService implements ITaskService { private readonly getTenant: () => string; private readonly getLocale: () => string; private readonly trigger: ReturnType; @@ -71,7 +73,7 @@ class StepFunctionService implements ITaskService { } } - public async fetch(task: ITask): Promise { + public async fetch(task: ITask): Promise { const executionArn = task.eventResponse?.executionArn; if (!executionArn) { console.error(`Execution ARN not found in task "${task.id}".`); @@ -81,7 +83,7 @@ class StepFunctionService implements ITaskService { const result = await this.get({ executionArn }); - return (result || null) as R; + return result || null; } catch (ex) { console.log("Could not get the execution details."); console.error(ex); @@ -90,7 +92,7 @@ class StepFunctionService implements ITaskService { } } -export class StepFunctionServicePlugin extends TaskServicePlugin { +export class StepFunctionServicePlugin extends TaskServicePlugin { public override name = "task.stepFunctionTriggerTransport"; public createService(params: ITaskServiceCreatePluginParams) { diff --git a/packages/tasks/src/crud/transport/index.ts b/packages/tasks/src/service/index.ts similarity index 78% rename from packages/tasks/src/crud/transport/index.ts rename to packages/tasks/src/service/index.ts index d6f4c691da0..1d6f503219b 100644 --- a/packages/tasks/src/crud/transport/index.ts +++ b/packages/tasks/src/service/index.ts @@ -1,9 +1,11 @@ import { EventBridgeEventTransportPlugin } from "./EventBridgeEventTransportPlugin"; import { StepFunctionServicePlugin } from "./StepFunctionServicePlugin"; -export const createTransportPlugins = () => { +export const createServicePlugins = () => { return [ new StepFunctionServicePlugin({ default: true }), new EventBridgeEventTransportPlugin() ]; }; + +export * from "./createService"; diff --git a/packages/tasks/src/types.ts b/packages/tasks/src/types.ts index 04a38c8d2fc..cad417447a0 100644 --- a/packages/tasks/src/types.ts +++ b/packages/tasks/src/types.ts @@ -16,6 +16,7 @@ import { import { IIsCloseToTimeoutCallable, ITaskManagerStore } from "./runner/abstractions"; import { SecurityPermission } from "@webiny/api-security/types"; import { GenericRecord } from "@webiny/api/types"; +import { IStepFunctionServiceFetchResult } from "~/service/StepFunctionServicePlugin"; export * from "./handler/types"; export * from "./response/abstractions"; @@ -303,7 +304,9 @@ export interface ITasksContextServiceObject { >( params: ITaskAbortParams ) => Promise>; - fetchServiceInfo: (input: ITask | string) => Promise; + fetchServiceInfo: ( + input: ITask | string + ) => Promise; } export interface ITasksContextObject