Skip to content

Commit

Permalink
fix(api-headless-cms-bulk-actions): fix task execution for large entr…
Browse files Browse the repository at this point in the history
…ies list (#4283)

Co-authored-by: Bruno Zorić <[email protected]>
  • Loading branch information
leopuleo and brunozoric authored Sep 26, 2024
1 parent af4195f commit 4e1127e
Show file tree
Hide file tree
Showing 38 changed files with 463 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { IntrospectionField, IntrospectionInterfaceType } from "graphql";
import { useGraphQlHandler } from "~tests/context/useGraphQLHandler";
import { createBulkAction } from "~/plugins";
import { createMockModels, createPrivateMockModels } from "~tests/mocks";
import { createBulkActionEntriesTasks } from "~/tasks";

interface GraphQlType {
kind: Uppercase<string>;
Expand Down Expand Up @@ -36,7 +37,7 @@ const defaultBulkActionsEnumNames = [
describe("createBulkAction", () => {
it("should create GraphQL schema with default bulk actions ENUMS", async () => {
const { introspect } = useGraphQlHandler({
plugins: [...createMockModels()]
plugins: [...createMockModels(), createBulkActionEntriesTasks()]
});

const [result] = await introspect();
Expand All @@ -59,7 +60,7 @@ describe("createBulkAction", () => {

it("should NOT create bulk actions ENUMS in case of a private model", async () => {
const { introspect } = useGraphQlHandler({
plugins: [...createPrivateMockModels()]
plugins: [...createPrivateMockModels(), createBulkActionEntriesTasks()]
});

const [result] = await introspect();
Expand All @@ -79,6 +80,7 @@ describe("createBulkAction", () => {
const { introspect } = useGraphQlHandler({
plugins: [
...createMockModels(),
createBulkActionEntriesTasks(),
createBulkAction({
name: "print",
dataLoader,
Expand Down
9 changes: 3 additions & 6 deletions packages/api-headless-cms-bulk-actions/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import { createTasks } from "~/tasks";
import { createHandlers } from "~/handlers";
import { createDefaultGraphQL } from "~/plugins";

export * from "./abstractions";
export * from "./handlers";
export * from "./useCases";
export * from "./plugins";
export * from "./tasks";

export const createHcmsBulkActions = () => [
createTasks(),
createHandlers(),
createDefaultGraphQL()
];
export const createHcmsBulkActions = () => [createHandlers(), createDefaultGraphQL()];
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export interface CreateBulkActionConfig {
dataLoader: (context: HcmsBulkActionsContext) => IListEntries;
dataProcessor: (context: HcmsBulkActionsContext) => IProcessEntry;
modelIds?: string[];
batchSize?: number;
}

function toPascalCase(str: string) {
Expand All @@ -31,7 +32,8 @@ export const createBulkAction = (config: CreateBulkActionConfig) => {
createBulkActionTasks({
name,
dataLoader: config.dataLoader,
dataProcessor: config.dataProcessor
dataProcessor: config.dataProcessor,
batchSize: config.batchSize
}),
createBulkActionGraphQL({
name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createPrivateTaskDefinition } from "@webiny/tasks";
import { createPrivateTaskDefinition, ITask } from "@webiny/tasks";
import { IListEntries, IProcessEntry } from "~/abstractions";
import {
ChildTasksCleanup,
Expand All @@ -7,6 +7,7 @@ import {
ProcessTasksByModel
} from "~/useCases/internals";
import {
BulkActionOperationByModelAction,
HcmsBulkActionsContext,
IBulkActionOperationByModelInput,
IBulkActionOperationByModelOutput,
Expand All @@ -18,17 +19,22 @@ export interface CreateBackgroundTasksConfig {
name: string;
dataLoader: (context: HcmsBulkActionsContext) => IListEntries;
dataProcessor: (context: HcmsBulkActionsContext) => IProcessEntry;
batchSize?: number;
}

const BATCH_SIZE = 100; // Default number of entries to fetch in each batch

class BulkActionTasks {
private readonly name: string;
private readonly dataLoader: (context: HcmsBulkActionsContext) => IListEntries;
private readonly dataProcessor: (context: HcmsBulkActionsContext) => IProcessEntry;
private readonly batchSize: number;

constructor(config: CreateBackgroundTasksConfig) {
this.name = config.name;
this.dataLoader = config.dataLoader;
this.dataProcessor = config.dataProcessor;
this.batchSize = config.batchSize || BATCH_SIZE;
}

public createListTaskDefinition() {
Expand All @@ -40,6 +46,7 @@ class BulkActionTasks {
id: this.createListTaskDefinitionName(),
title: `Headless CMS: list "${this.name}" entries by model`,
maxIterations: 500,
disableDatabaseLogs: true,
run: async params => {
const { response, input, context } = params;

Expand All @@ -48,35 +55,49 @@ class BulkActionTasks {
return response.error(`Missing "modelId" in the input.`);
}

if (input.processing) {
const processTasks = new ProcessTasksByModel(
this.createProcessTaskDefinitionName()
);
return await processTasks.execute(params);
}
const action = this.getCurrentAction(input);

const createTasks = new CreateTasksByModel(
this.createProcessTaskDefinitionName(),
this.dataLoader(context)
);
return await createTasks.execute(params);
switch (action) {
case BulkActionOperationByModelAction.PROCESS_SUBTASKS: {
const processTasks = new ProcessTasksByModel(
this.createProcessTaskDefinitionName()
);
return await processTasks.execute(params);
}
case BulkActionOperationByModelAction.CREATE_SUBTASKS:
case BulkActionOperationByModelAction.CHECK_MORE_SUBTASKS: {
const createTasks = new CreateTasksByModel(
this.createProcessTaskDefinitionName(),
this.dataLoader(context),
this.batchSize
);
return await createTasks.execute(params);
}
case BulkActionOperationByModelAction.END_TASK: {
return response.done(
`Task done: task "${this.createProcessTaskDefinitionName()}" has been successfully processed for entries from "${
input.modelId
}" model.`
);
}
default:
return response.error(`Unknown action: ${action}`);
}
} catch (ex) {
return response.error(ex.message ?? "Error while executing list task");
}
},
onDone: async ({ context, task }) => {
/**
* We want to clean all child tasks and logs, which have no errors.
*/
const childTasksCleanup = new ChildTasksCleanup();
try {
await childTasksCleanup.execute({
context,
task
});
} catch (ex) {
console.error("Error while cleaning list child tasks.", ex);
}
await this.onCreateListTaskDefinitionFinish(context, task, "done");
},
onError: async ({ context, task }) => {
await this.onCreateListTaskDefinitionFinish(context, task, "error");
},
onAbort: async ({ context, task }) => {
await this.onCreateListTaskDefinitionFinish(context, task, "abort");
},
onMaxIterations: async ({ context, task }) => {
await this.onCreateListTaskDefinitionFinish(context, task, "maxIterations");
}
});
}
Expand All @@ -90,6 +111,7 @@ class BulkActionTasks {
id: this.createProcessTaskDefinitionName(),
title: `Headless CMS: process "${this.name}" entries`,
maxIterations: 2,
disableDatabaseLogs: true,
run: async params => {
const { response, context } = params;

Expand All @@ -99,20 +121,6 @@ class BulkActionTasks {
} catch (ex) {
return response.error(ex.message ?? "Error while executing process task");
}
},
onDone: async ({ context, task }) => {
/**
* We want to clean all child tasks and logs, which have no errors.
*/
const childTasksCleanup = new ChildTasksCleanup();
try {
await childTasksCleanup.execute({
context,
task
});
} catch (ex) {
console.error("Error while cleaning process child tasks.", ex);
}
}
});
}
Expand All @@ -124,6 +132,32 @@ class BulkActionTasks {
private createProcessTaskDefinitionName() {
return `hcmsBulkProcess${this.name}Entries`;
}

private getCurrentAction(input: IBulkActionOperationByModelInput) {
return input.action ?? BulkActionOperationByModelAction.CREATE_SUBTASKS;
}

private async onCreateListTaskDefinitionFinish(
context: HcmsBulkActionsContext,
task: ITask,
cause: string
) {
/**
* We want to clean all child tasks and logs, which have no errors.
*/
const childTasksCleanup = new ChildTasksCleanup();
try {
await childTasksCleanup.execute({
context,
task
});
} catch (ex) {
console.error(
`Error while cleaning "${this.createListTaskDefinitionName()} child tasks - ${cause}."`,
ex
);
}
}
}

export const createBulkActionTasks = (config: CreateBackgroundTasksConfig) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
createDeleteEntry,
createListDeletedEntries,
createListLatestEntries,
createListNotPublishedEntries,
createListPublishedEntries,
createMoveEntryToFolder,
createMoveEntryToTrash,
Expand Down Expand Up @@ -30,7 +31,7 @@ export const createBulkActionEntriesTasks = () => {
}),
createBulkAction({
name: "publish",
dataLoader: createListLatestEntries,
dataLoader: createListNotPublishedEntries,
dataProcessor: createPublishEntry
}),
createBulkAction({
Expand Down
8 changes: 2 additions & 6 deletions packages/api-headless-cms-bulk-actions/src/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
import { createBulkActionEntriesTasks } from "./createBulkActionEntriesTasks";
import { createEmptyTrashBinsTask } from "./createEmptyTrashBinsTask";

export const createTasks = () => {
return [createBulkActionEntriesTasks(), createEmptyTrashBinsTask()];
};
export * from "./createBulkActionEntriesTasks";
export * from "./createEmptyTrashBinsTask";
13 changes: 9 additions & 4 deletions packages/api-headless-cms-bulk-actions/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@ export type IBulkActionOperationTaskParams = ITaskRunParams<
* Bulk Action Operation By Model
*/

export enum BulkActionOperationByModelAction {
CREATE_SUBTASKS = "CREATE_SUBTASKS",
CHECK_MORE_SUBTASKS = "CHECK_MORE_SUBTASKS",
PROCESS_SUBTASKS = "PROCESS_SUBTASKS",
END_TASK = "END_TASK"
}

export interface IBulkActionOperationByModelInput {
modelId: string;
identity?: SecurityIdentity;
where?: Record<string, any>;
search?: string;
data?: Record<string, any>;
after?: string | null;
currentBatch?: number;
processing?: boolean;
totalCount?: number;
data?: Record<string, any>;
action?: BulkActionOperationByModelAction;
}

export interface IBulkActionOperationByModelOutput extends ITaskResponseDoneResultOutput {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { CmsEntryListParams } from "@webiny/api-headless-cms/types";
import { IListEntries } from "~/abstractions";
import { HcmsBulkActionsContext } from "~/types";

class ListNotPublishedEntries implements IListEntries {
private readonly context: HcmsBulkActionsContext;

constructor(context: HcmsBulkActionsContext) {
this.context = context;
}

async execute(modelId: string, params: CmsEntryListParams) {
const model = await this.context.cms.getModel(modelId);

if (!model) {
throw new Error(`Model with ${modelId} not found!`);
}

const [entries, meta] = await this.context.cms.listLatestEntries(model, {
...params,
where: {
...params.where,
status_not: "published"
}
});

return {
entries,
meta
};
}
}

export const createListNotPublishedEntries = (context: HcmsBulkActionsContext) => {
return new ListNotPublishedEntries(context);
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "./DeleteEntry";
export * from "./ListDeletedEntries";
export * from "./ListNotPublishedEntries";
export * from "./ListLatestEntries";
export * from "./ListPublishedEntries";
export * from "./MoveEntryToFolder";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ITask, Context, TaskLogItemType } from "@webiny/tasks";
import { IUseCase } from "~/abstractions";
import { HcmsBulkActionsContext } from "~/types";

export interface IChildTasksCleanupExecuteParams {
context: Context;
Expand Down Expand Up @@ -35,6 +36,13 @@ export class ChildTasksCleanup implements IUseCase<IChildTasksCleanupExecutePara
limit: 10000
});

/**
* No logs found. Proceed with deleting the child tasks.
*/
if (childLogs.length === 0) {
await this.deleteTasks(context, childTaskIdList);
}

const deletedChildTaskLogIdList: string[] = [];
/**
* First, we need to remove all the logs which have no errors.
Expand All @@ -52,8 +60,15 @@ export class ChildTasksCleanup implements IUseCase<IChildTasksCleanupExecutePara
/**
* Now we can remove the tasks.
*/
for (const childTaskId of deletedChildTaskLogIdList) {
await context.tasks.deleteTask(childTaskId);
await this.deleteTasks(context, deletedChildTaskLogIdList);
}

/**
* Helper method to delete tasks by ID.
*/
private async deleteTasks(context: HcmsBulkActionsContext, taskIds: string[]): Promise<void> {
for (const taskId of taskIds) {
await context.tasks.deleteTask(taskId);
}
}
}
Loading

0 comments on commit 4e1127e

Please sign in to comment.