diff --git a/examples/docs-examples/examples/composing-tasks/streams/stream-waitforexit.mjs b/examples/docs-examples/examples/composing-tasks/streams/stream-waitforexit.mjs index c4d0023..8d2c5ea 100644 --- a/examples/docs-examples/examples/composing-tasks/streams/stream-waitforexit.mjs +++ b/examples/docs-examples/examples/composing-tasks/streams/stream-waitforexit.mjs @@ -3,7 +3,7 @@ import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor"; const executor = await TaskExecutor.create({ // What do you want to run package: "golem/alpine:latest", - yagnaOptions: { apiKey: "try_golem" }, + // yagnaOptions: { apiKey: "try_golem" }, logger: pinoPrettyLogger(), budget: 0.5, maxParallelTasks: 1, @@ -11,44 +11,46 @@ const executor = await TaskExecutor.create({ // the example will run a tasks 4 times, in sequence (as maxParallelTasks is 1) for (const i of [1, 2, 3, 4]) { - await executor.run(async (ctx) => { - // each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay - - // the command generating the sequence is saved to script.sh file - await ctx.run( - `echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`, - ); - // permissions are modified to be able to run the script - await ctx.run("chmod 700 ./script.sh"); - - // script is run and stream results, stdout and stderr are processed - let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh"); - - remoteProcess.stdout.on("data", (data) => console.log(`iteration: ${i}:`, "stdout>", data)); - remoteProcess.stderr.on("data", (data) => console.error(`iteration: ${i}:`, "stderr>", data)); - - // For odd tasks, we set streaming timeout to 10 secs, - // the script will end normally, for equal tasks we will exit the run method after 3 secs. - // The exit caused by timeout will terminate the activity on a provider, - // therefore the user cannot run another command on the provider. - // Task executor will run the next task on another provider. - - const timeout = i % 2 === 0 ? 3_000 : 10_000; - const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => { - console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${e.provider.name}`); - ctx - .run("ls -l") - .catch((e) => - console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e), - ); - }); - if (finalResult) { - // if the spawn exited without timeout, the provider is still available - console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${ctx.provider.name}`); - - console.log("Running command after normal runAndStream exit is possible:", (await ctx.run("ls -l")).stdout); - } - }); + await executor + .run(async (ctx) => { + // each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay + + // the command generating the sequence is saved to script.sh file + await ctx.run( + `echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`, + ); + // permissions are modified to be able to run the script + await ctx.run("chmod 700 ./script.sh"); + + // script is run and stream results, stdout and stderr are processed + let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh"); + + remoteProcess.stdout.on("data", (data) => console.log(`iteration: ${i}:`, "stdout>", data)); + remoteProcess.stderr.on("data", (data) => console.error(`iteration: ${i}:`, "stderr>", data)); + + // For odd tasks, we set streaming timeout to 10 secs, + // the script will end normally, for equal tasks we will exit the run method after 3 secs. + // The exit caused by timeout will terminate the activity on a provider, + // therefore the user cannot run another command on the provider. + // Task executor will run the next task on another provider. + + const timeout = i % 2 === 0 ? 3_000 : 10_000; + const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => { + console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${e.provider.name}`); + ctx + .run("ls -l") + .catch((e) => + console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e), + ); + }); + if (finalResult) { + // if the spawn exited without timeout, the provider is still available + console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${ctx.provider.name}`); + + console.log("Running command after normal runAndStream exit is possible:", (await ctx.run("ls -l")).stdout); + } + }) + .catch((error) => console.error("Execution of task failed due to error.", error)); } await executor.shutdown(); diff --git a/package-lock.json b/package-lock.json index d866813..633b917 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "examples" ], "dependencies": { - "@golem-sdk/golem-js": "^2.4.2", + "@golem-sdk/golem-js": "^2.4.3", "eventemitter3": "^5.0.1" }, "devDependencies": { diff --git a/package.json b/package.json index d2323f1..96b0b28 100644 --- a/package.json +++ b/package.json @@ -63,7 +63,7 @@ "node": ">=18.0.0" }, "dependencies": { - "@golem-sdk/golem-js": "^2.4.2", + "@golem-sdk/golem-js": "^2.4.3", "eventemitter3": "^5.0.1" }, "devDependencies": { diff --git a/src/config.ts b/src/config.ts index ddd68ac..665625b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -19,8 +19,7 @@ const DEFAULTS = Object.freeze({ basePath: "http://127.0.0.1:7465", maxParallelTasks: 5, maxTaskRetries: 3, - taskTimeout: 1000 * 60 * 5, // 5 min, - startupTaskTimeout: 1000 * 60 * 2, // 2 min, + taskRetryOnTimeout: false, enableLogging: true, startupTimeout: 1000 * 90, // 90 sec exitOnNoProposals: false, @@ -35,8 +34,8 @@ const DEFAULTS = Object.freeze({ export class ExecutorConfig { readonly package?: Package | string; readonly maxParallelTasks: number; - readonly taskTimeout: number; - readonly startupTaskTimeout: number; + readonly taskTimeout?: number; + readonly taskStartupTimeout?: number; readonly budget: number; readonly subnetTag: string; readonly networkIp?: string; @@ -48,6 +47,7 @@ export class ExecutorConfig { readonly startupTimeout: number; readonly exitOnNoProposals: boolean; readonly agreementMaxPoolSize: number; + readonly taskRetryOnTimeout: boolean; constructor(options: ExecutorOptions & TaskServiceOptions) { const processEnv = !isBrowser @@ -88,9 +88,10 @@ export class ExecutorConfig { }; this.budget = options.budget || DEFAULTS.budget; this.maxParallelTasks = options.maxParallelTasks || DEFAULTS.maxParallelTasks; - this.taskTimeout = options.taskTimeout || DEFAULTS.taskTimeout; - this.startupTaskTimeout = options.startupTimeout || DEFAULTS.startupTaskTimeout; + this.taskTimeout = options.taskTimeout; + this.taskStartupTimeout = options.taskStartupTimeout; this.subnetTag = options.subnetTag || processEnv.env?.YAGNA_SUBNET || DEFAULTS.subnetTag; + this.taskRetryOnTimeout = options.taskRetryOnTimeout ?? DEFAULTS.taskRetryOnTimeout; this.networkIp = options.networkIp; this.logger = (() => { const isLoggingEnabled = options.enableLogging ?? DEFAULTS.enableLogging; @@ -117,18 +118,18 @@ export class ExecutorConfig { export class TaskConfig extends ActivityConfig { public readonly maxParallelTasks: number; public readonly taskRunningInterval: number; - public readonly taskTimeout: number; + public readonly taskTimeout?: number; public readonly activityStateCheckingInterval: number; public readonly activityPreparingTimeout: number; public readonly storageProvider?: StorageProvider; public readonly logger: Logger; constructor(options?: TaskServiceOptions) { - const activityExecuteTimeout = options?.activityExecuteTimeout || options?.taskTimeout || DEFAULTS.taskTimeout; + const activityExecuteTimeout = options?.activityExecuteTimeout || options?.taskTimeout; super({ ...options, activityExecuteTimeout }); this.maxParallelTasks = options?.maxParallelTasks || DEFAULTS.maxParallelTasks; this.taskRunningInterval = options?.taskRunningInterval || DEFAULTS.taskRunningInterval; - this.taskTimeout = options?.taskTimeout || DEFAULTS.taskTimeout; + this.taskTimeout = options?.taskTimeout; this.activityStateCheckingInterval = options?.activityStateCheckingInterval || DEFAULTS.activityStateCheckingInterval; this.logger = options?.logger || defaultLogger("work", { disableAutoPrefix: true }); diff --git a/src/executor.spec.ts b/src/executor.spec.ts index 31d391c..1a718cc 100644 --- a/src/executor.spec.ts +++ b/src/executor.spec.ts @@ -24,6 +24,7 @@ import { // temporarily until the import in golem-js is fixed import { Allocation } from "@golem-sdk/golem-js/dist/payment"; import { PaymentConfig } from "@golem-sdk/golem-js/dist/payment/config"; +import { randomUUID } from "node:crypto"; interface PaymentServiceEvents { error: (err: Error) => void; } @@ -82,6 +83,7 @@ when(gftpStorageProviderMock.init()).thenResolve(anything()); when(gftpStorageProviderMock.close()).thenResolve(anything()); when(taskServiceMock.run()).thenResolve(anything()); when(taskServiceMock.end()).thenResolve(anything()); +when(taskMock.id).thenCall(randomUUID); when(taskMock.getActivity()).thenReturn(activity); when(taskMock.getDetails()).thenReturn({ activityId: "1", agreementId: "1", id: "1", retriesCount: 0 }); @@ -186,8 +188,7 @@ describe("Task Executor", () => { expect(Task).toHaveBeenCalledWith("1", worker, { activityReadySetupFunctions: [], maxRetries: 0, - timeout: 300000, - startupTimeout: 120000, + retryOnTimeout: false, }); await executor.shutdown(); }); @@ -209,8 +210,7 @@ describe("Task Executor", () => { expect(Task).toHaveBeenCalledWith("1", worker, { activityReadySetupFunctions: [], maxRetries: 0, - timeout: 300000, - startupTimeout: 120000, + retryOnTimeout: false, }); await executor.shutdown(); }); @@ -246,6 +246,7 @@ describe("Task Executor", () => { when(taskMock.isRejected()).thenReturn(false).thenReturn(true).thenReturn(false); when(taskMock.getResults()).thenReturn("result 1").thenReturn("result 2"); when(taskMock.getError()).thenReturn(new Error("error 1")); + when(taskMock.id).thenCall(randomUUID); // eslint-disable-next-line @typescript-eslint/no-explicit-any const executorShutdownSpy = jest.spyOn(executor as any, "doShutdown"); diff --git a/src/executor.ts b/src/executor.ts index 0b1edc6..23a848b 100644 --- a/src/executor.ts +++ b/src/executor.ts @@ -89,6 +89,8 @@ export type ExecutorOptions = { * Default is `false`. */ exitOnNoProposals?: boolean; + + taskRetryOnTimeout?: boolean; } & Omit & MarketOptions & PaymentOptions & @@ -231,6 +233,7 @@ export class TaskExecutor { this.yagna.getApi(), this.taskQueue, this.events, + this.marketService, this.agreementPoolService, this.paymentService, this.networkService, @@ -350,11 +353,11 @@ export class TaskExecutor { this.events.emit("end", Date.now()); } - /** - * @Deprecated This feature is no longer supported. It will be removed in the next release. - */ getStats() { - return []; + return { + ...this.statsService.getAll(), + retries: this.taskService.getRetryCount(), + }; } /** @@ -412,8 +415,9 @@ export class TaskExecutor { task = new Task((++this.lastTaskIndex).toString(), worker, { maxRetries: options?.maxRetries ?? this.options.maxTaskRetries, timeout: options?.timeout ?? this.options.taskTimeout, - startupTimeout: options?.startupTimeout ?? this.options.startupTaskTimeout, + startupTimeout: options?.startupTimeout ?? this.options.taskStartupTimeout, activityReadySetupFunctions: this.activityReadySetupFunctions, + retryOnTimeout: options?.retryOnTimeout ?? this.options.taskRetryOnTimeout, }); this.taskQueue.addToEnd(task); this.events.emit("taskQueued", task.getDetails()); diff --git a/src/queue.spec.ts b/src/queue.spec.ts index 0f2279b..a59cd88 100644 --- a/src/queue.spec.ts +++ b/src/queue.spec.ts @@ -5,6 +5,7 @@ import { instance, mock, reset, when } from "@johanblumenberg/ts-mockito"; describe("Task Queue", function () { let testQueue: TaskQueue; const taskMock = mock(Task); + const testWorker = async () => null; beforeEach(function () { testQueue = new TaskQueue(); reset(taskMock); @@ -12,12 +13,12 @@ describe("Task Queue", function () { }); describe("Adding", () => { it("should allow to add Task to the queue", () => { - const task = instance(taskMock); + const task = new Task("1", testWorker); testQueue.addToEnd(task); expect(testQueue.size).toEqual(1); }); it("should add new task on the end of the queue", () => { - const tasksToAdd = ["A", "B", "C"].map(() => instance(taskMock)); + const tasksToAdd = ["A", "B", "C"].map((id) => new Task(id, testWorker)); // Add tree different tasks to the queue tasksToAdd.forEach((task) => testQueue.addToEnd(task)); // Check if the order is the same @@ -27,7 +28,7 @@ describe("Task Queue", function () { }); }); it("should add task on the beginning of the queue", () => { - const tasksToAdd = ["A", "B", "C"].map(() => instance(taskMock)); + const tasksToAdd = ["A", "B", "C"].map((id) => new Task(id, testWorker)); // Add tree different tasks to the queue tasksToAdd.forEach((task) => testQueue.addToBegin(task)); // Reverse expectation and check @@ -41,6 +42,11 @@ describe("Task Queue", function () { when(taskMock.isQueueable()).thenReturn(false); expect(() => testQueue.addToEnd(task)).toThrow("You cannot add a task that is not in the correct state"); }); + it("should throws error if adding an existing task", () => { + const task = new Task("A", testWorker); + testQueue.addToEnd(task); + expect(() => testQueue.addToEnd(task)).toThrow("Task A has already been added to the queue"); + }); }); describe("Getting", () => { @@ -59,9 +65,9 @@ describe("Task Queue", function () { it("should return correct number of items in the queue ", () => { // Add 3 tasks to the queue - testQueue.addToEnd(instance(taskMock)); - testQueue.addToEnd(instance(taskMock)); - testQueue.addToEnd(instance(taskMock)); + testQueue.addToEnd(new Task("1", testWorker)); + testQueue.addToEnd(new Task("2", testWorker)); + testQueue.addToEnd(new Task("3", testWorker)); // Check if is eq 3 expect(testQueue.size).toEqual(3); // Get one @@ -78,5 +84,11 @@ describe("Task Queue", function () { // Check if still is eq 0 expect(testQueue.size).toEqual(0); }); + + it("should check if task belongs to the queue", () => { + const task = instance(taskMock); + testQueue.addToEnd(task); + expect(testQueue.has(task)).toEqual(true); + }); }); }); diff --git a/src/queue.ts b/src/queue.ts index f220166..2bc8bdf 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -5,6 +5,7 @@ import { GolemInternalError } from "@golem-sdk/golem-js"; * @internal */ export interface QueueableTask { + id: string; isQueueable(): boolean; } @@ -32,7 +33,16 @@ export class TaskQueue { return this.itemsStack.shift(); } + has(task: T) { + return this.itemsStack.some((t) => t.id === task.id); + } + private checkIfTaskIsEligibleForAdd(task: T) { - if (!task.isQueueable()) throw new GolemInternalError("You cannot add a task that is not in the correct state"); + if (!task.isQueueable()) { + throw new GolemInternalError("You cannot add a task that is not in the correct state"); + } + if (this.has(task)) { + throw new GolemInternalError(`Task ${task.id} has already been added to the queue`); + } } } diff --git a/src/service.spec.ts b/src/service.spec.ts index e08b2fe..b8482e1 100644 --- a/src/service.spec.ts +++ b/src/service.spec.ts @@ -5,11 +5,14 @@ import { ActivityStateEnum, Agreement, AgreementPoolService, + GolemWorkError, + MarketService, NetworkService, PaymentService, Result, ResultState, WorkContext, + WorkErrorCode, YagnaApi, } from "@golem-sdk/golem-js"; import { TaskService } from "./service"; @@ -21,10 +24,12 @@ import { sleep } from "./utils"; let queue: TaskQueue; const paymentServiceMock = mock(PaymentService); +const marketServiceMock = mock(MarketService); const agreementPoolServiceMock = mock(AgreementPoolService); const networkServiceMock = mock(NetworkService); const paymentService = instance(paymentServiceMock); const agreementPoolService = instance(agreementPoolServiceMock); +const marketService = instance(marketServiceMock); const networkService = instance(networkServiceMock); const yagnaApiMock = imock(); const yagnaApi = instance(yagnaApiMock); @@ -34,6 +39,7 @@ const activityMock = mock(Activity); const agreement = instance(agreementMock); const activity = instance(activityMock); when(agreementPoolServiceMock.getAgreement()).thenResolve(agreement); +when(marketServiceMock.getProposalsCount()).thenReturn({ confirmed: 1, initial: 1, rejected: 0 }); when(agreementPoolServiceMock.releaseAgreement(anything(), anything())).thenResolve(); when(activityMock.agreement).thenReturn(agreement); when(activityMock.getState()).thenResolve(ActivityStateEnum.Ready); @@ -63,10 +69,19 @@ describe("Task Service", () => { const cb = jest.fn(); events.on("taskStarted", cb); events.on("taskCompleted", cb); - const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, { - taskRunningInterval: 10, - activityStateCheckingInterval: 10, - }); + const service = new TaskService( + yagnaApi, + queue, + events, + marketService, + agreementPoolService, + paymentService, + networkService, + { + taskRunningInterval: 10, + activityStateCheckingInterval: 10, + }, + ); service.run().catch((e) => console.error(e)); await sleep(200, true); expect(task.isFinished()).toEqual(true); @@ -85,11 +100,20 @@ describe("Task Service", () => { queue.addToEnd(task1); queue.addToEnd(task2); queue.addToEnd(task3); - const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, { - taskRunningInterval: 10, - activityStateCheckingInterval: 10, - maxParallelTasks: 2, - }); + const service = new TaskService( + yagnaApi, + queue, + events, + marketService, + agreementPoolService, + paymentService, + networkService, + { + taskRunningInterval: 10, + activityStateCheckingInterval: 10, + maxParallelTasks: 2, + }, + ); service.run().catch((e) => console.error(e)); expect(task1.isQueued()).toEqual(true); expect(task2.isQueued()).toEqual(true); @@ -98,7 +122,7 @@ describe("Task Service", () => { await service.end(); }); - it("should retry task if it failed", async () => { + it("should retry task if a GolemWorkError occurred", async () => { const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); const task = new Task("1", worker, { maxRetries: 3 }); queue.addToEnd(task); @@ -110,11 +134,22 @@ describe("Task Service", () => { }); const cb = jest.fn(); events.on("taskRetried", cb); - when(activityMock.execute(anything(), false, undefined)).thenReject(new Error("Test error")); - const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, { - taskRunningInterval: 10, - activityStateCheckingInterval: 10, - }); + when(activityMock.execute(anything(), false, undefined)).thenReject( + new GolemWorkError("Test error", WorkErrorCode.ScriptExecutionFailed), + ); + const service = new TaskService( + yagnaApi, + queue, + events, + marketService, + agreementPoolService, + paymentService, + networkService, + { + taskRunningInterval: 10, + activityStateCheckingInterval: 10, + }, + ); service.run().then(); await sleep(800, true); expect(cb).toHaveBeenCalledTimes(3); @@ -122,15 +157,57 @@ describe("Task Service", () => { await service.end(); }); + it("should not retry task if a TypeError occurred", async () => { + const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); + const task = new Task("1", worker, { maxRetries: 3 }); + queue.addToEnd(task); + const readable = new Readable({ + objectMode: true, + read() { + readable.destroy(new Error("Test error")); + }, + }); + const cb = jest.fn(); + events.on("taskRetried", cb); + when(activityMock.execute(anything(), false, undefined)).thenReject(new Error("Test error")); + const service = new TaskService( + yagnaApi, + queue, + events, + marketService, + agreementPoolService, + paymentService, + networkService, + { + taskRunningInterval: 10, + activityStateCheckingInterval: 10, + }, + ); + service.run().then(); + await sleep(800, true); + expect(cb).toHaveBeenCalledTimes(0); + expect(task.isRejected()).toEqual(true); + await service.end(); + }); + it("should not retry task if it failed and maxRetries is zero", async () => { const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); const task = new Task("1", worker, { maxRetries: 0 }); queue.addToEnd(task); when(activityMock.execute(anything(), false, undefined)).thenReject(new Error("Test error")); - const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, { - taskRunningInterval: 10, - activityStateCheckingInterval: 10, - }); + const service = new TaskService( + yagnaApi, + queue, + events, + marketService, + agreementPoolService, + paymentService, + networkService, + { + taskRunningInterval: 10, + activityStateCheckingInterval: 10, + }, + ); service.run().catch((e) => console.error(e)); await sleep(200, true); expect(task.isRetry()).toEqual(false); @@ -150,11 +227,22 @@ describe("Task Service", () => { const task = new Task("1", worker, { maxRetries: 1 }); const cb = jest.fn(); events.on("taskRetried", cb); - when(activityMock.execute(anything(), false, undefined)).thenReject(new Error("Test error")); - const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, { - taskRunningInterval: 10, - activityStateCheckingInterval: 10, - }); + when(activityMock.execute(anything(), false, undefined)).thenReject( + new GolemWorkError("Test error", WorkErrorCode.ScriptExecutionFailed), + ); + const service = new TaskService( + yagnaApi, + queue, + events, + marketService, + agreementPoolService, + paymentService, + networkService, + { + taskRunningInterval: 10, + activityStateCheckingInterval: 10, + }, + ); queue.addToEnd(task); service.run().catch((e) => console.error(e)); await sleep(500, true); @@ -174,11 +262,20 @@ describe("Task Service", () => { queue.addToEnd(task1); queue.addToEnd(task2); queue.addToEnd(task3); - const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, { - taskRunningInterval: 10, - activityStateCheckingInterval: 10, - maxParallelTasks: 2, - }); + const service = new TaskService( + yagnaApi, + queue, + events, + marketService, + agreementPoolService, + paymentService, + networkService, + { + taskRunningInterval: 10, + activityStateCheckingInterval: 10, + maxParallelTasks: 2, + }, + ); const activitySetupDoneSpy = spy(service["activitySetupDone"]); service.run().then(); await sleep(500, true); diff --git a/src/service.ts b/src/service.ts index f7c4c51..e88ec36 100644 --- a/src/service.ts +++ b/src/service.ts @@ -13,6 +13,10 @@ import { NetworkService, Activity, ActivityOptions, + GolemWorkError, + GolemInternalError, + GolemTimeoutError, + MarketService, } from "@golem-sdk/golem-js"; import { TaskConfig } from "./config"; import { sleep } from "./utils"; @@ -40,11 +44,16 @@ export class TaskService { private isRunning = false; private logger: Logger; private options: TaskConfig; + private retryOnTimeout: boolean = true; + + /** To keep track of the stat */ + private retryCount = 0; constructor( private yagnaApi: YagnaApi, private tasksQueue: TaskQueue, private events: EventEmitter, + private marketService: MarketService, private agreementPoolService: AgreementPoolService, private paymentService: PaymentService, private networkService?: NetworkService, @@ -58,7 +67,8 @@ export class TaskService { this.isRunning = true; this.logger.info("Task Service has started"); while (this.isRunning) { - if (this.activeTasksCount >= this.options.maxParallelTasks) { + const proposalsCount = this.marketService.getProposalsCount(); + if (this.activeTasksCount >= this.options.maxParallelTasks || proposalsCount.confirmed === 0) { await sleep(this.options.taskRunningInterval, true); continue; } @@ -90,21 +100,42 @@ export class TaskService { .catch((error) => this.logger.warn(`Stopping activity failed`, { activityId: activity.id, error })), ), ); - this.logger.info("Task Service has been stopped"); + this.logger.info("Task Service has been stopped", { + stats: { + retryCount: this.retryCount, + }, + }); } private async startTask(task: Task) { - task.init(); - this.logger.debug(`Starting task`, { taskId: task.id, attempt: task.getRetriesCount() + 1 }); - ++this.activeTasksCount; + try { + task.init(); + this.logger.debug(`Starting task`, { taskId: task.id, attempt: task.getRetriesCount() + 1 }); + ++this.activeTasksCount; - const agreement = await this.agreementPoolService.getAgreement(); - let activity: Activity | undefined; - let networkNode: NetworkNode | undefined; + if (task.isFailed()) { + throw new GolemInternalError(`Execution of task ${task.id} aborted due to error. ${task.getError()}`); + } - try { + // TODO: This should be able to be canceled if the task state has changed + const agreement = await this.agreementPoolService.getAgreement(); this.startAcceptingAgreementPayments(agreement); - activity = await this.getOrCreateActivity(agreement); + + if (task.isFailed()) { + /** + * only in this case it is necessary to manually terminate the agreement, + * in other cases after the creation of the activity this will be done by the releaseTaskResources method + */ + await this.agreementPoolService + .releaseAgreement(agreement.id, false) + .catch((error) => + this.logger.error(`Releasing agreement failed`, { agreementId: activity.agreement.id, error }), + ); + throw new GolemInternalError(`Execution of task ${task.id} aborted due to error. ${task.getError()}`); + } + + let networkNode: NetworkNode | undefined; + const activity = await this.getOrCreateActivity(agreement); task.start(activity, networkNode); this.events.emit("taskStarted", task.getDetails()); this.logger.info(`Task started`, { @@ -129,16 +160,27 @@ export class TaskService { activityStateCheckingInterval: this.options.activityStateCheckingInterval, }); - await ctx.before(); + if (task.isFailed()) { + throw new GolemInternalError(`Execution of task ${task.id} aborted due to error. ${task.getError()}`); + } + await ctx.before(); if (activityReadySetupFunctions.length && !this.activitySetupDone.has(activity.id)) { this.activitySetupDone.add(activity.id); this.logger.debug(`Activity setup completed`, { activityId: activity.id }); } + + if (task.isFailed()) { + throw new GolemInternalError(`Execution of task ${task.id} aborted due to error. ${task.getError()}`); + } const results = await worker(ctx); task.stop(results); } catch (error) { - task.stop(undefined, error); + task.stop( + undefined, + error, + error instanceof GolemWorkError || (error instanceof GolemTimeoutError && this.retryOnTimeout), + ); } finally { --this.activeTasksCount; } @@ -177,7 +219,13 @@ export class TaskService { attempt: task.getRetriesCount(), reason, }); - this.tasksQueue.addToBegin(task); + if (!this.tasksQueue.has(task)) { + this.retryCount++; + this.tasksQueue.addToBegin(task); + this.logger.debug(`Task ${task.id} added to the queue`); + } else { + this.logger.warn(`Task ${task.id} has been already added to the queue`); + } } private async stopTask(task: Task) { @@ -229,4 +277,8 @@ export class TaskService { .catch((error) => this.logger.error(`Removing network node failed`, { nodeId: networkNode.id, error })); } } + + getRetryCount() { + return this.retryCount; + } } diff --git a/src/stats.ts b/src/stats.ts index 3777007..ce339ec 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -167,4 +167,16 @@ export class StatsService { private unsubscribeAllEvents() { this.listeners.forEach((listener, event) => this.events.removeListener(event, listener)); } + + getAll() { + return { + providers: this.providers.size, + agreements: this.agreements.size, + invoicesReceived: this.invoices.size, + invoicesPaid: this.payments.size, + invoicesUnpaid: this.invoices.size - this.payments.size, + invoicesMissing: this.agreements.size - this.invoices.size, + invoicePaymentRate: this.payments.size / this.agreements.size, + }; + } } diff --git a/src/task.spec.ts b/src/task.spec.ts index 6cc1dde..575ce3c 100644 --- a/src/task.spec.ts +++ b/src/task.spec.ts @@ -18,14 +18,21 @@ describe("Task", function () { it("should start task", () => { const task = new Task("1", worker); + task.init(); task.start(activity); expect(task.getState()).toEqual(TaskState.Pending); task.stop(); task.cleanup(); }); + it("should not start task that is not queued", () => { + const task = new Task("1", worker); + expect(() => task.start(activity)).toThrow("You cannot start a task that is not queued"); + }); + it("should complete task with results", () => { const task = new Task("1", worker); + task.init(); task.start(activity); const result = new Result({ index: 0, @@ -40,6 +47,7 @@ describe("Task", function () { it("should complete task with error", () => { const task = new Task("1", worker); + task.init(); task.start(activity); const error = new Error("test"); task.stop(undefined, error, false); @@ -48,29 +56,56 @@ describe("Task", function () { it("should retry task", () => { const task = new Task("1", worker); + task.init(); task.start(activity); const error = new Error("test"); task.stop(undefined, error, true); expect(task.getState()).toEqual(TaskState.Retry); }); - it("should stop the task with a timeout error if the task does not complete within the specified time", async () => { - const task = new Task("1", worker, { timeout: 1, maxRetries: 0 }); - task.start(activity); - await sleep(2, true); - expect(task.getError()).toEqual(new GolemTimeoutError("Task 1 timeout.")); - expect(task.getState() === TaskState.Rejected); + describe("task execution timeout", () => { + it("should stop the task with a timeout error if the task does not complete within the specified time", async () => { + const task = new Task("1", worker, { timeout: 1, maxRetries: 0 }); + task.init(); + task.start(activity); + await sleep(2, true); + expect(task.getError()).toEqual(new GolemTimeoutError("Task 1 timeout.")); + expect(task.getState() === TaskState.Rejected); + }); + + it("should retry the task if the retryOnTimeout is set to 'true'", async () => { + const task = new Task("1", worker, { timeout: 1, maxRetries: 1, retryOnTimeout: true }); + task.init(); + task.start(activity); + await sleep(2, true); + expect(task.getError()).toEqual(new GolemTimeoutError("Task 1 timeout.")); + expect(task.getState() === TaskState.Retry); + }); }); - it("should stop the task with a timeout error if the task does not started within the specified time", async () => { - const task = new Task("1", worker, { startupTimeout: 1, maxRetries: 0 }); - task.init(); - await sleep(2, true); - expect(task.getError()).toEqual( - new GolemTimeoutError( - "Task startup 1 timeout. Failed to sign an agreement with the provider within the specified time", - ), - ); - expect(task.getState() === TaskState.Rejected); + describe("task startup timeout", () => { + it("should stop the task with a timeout error if the task does not started within the specified time", async () => { + const task = new Task("1", worker, { startupTimeout: 1, maxRetries: 0 }); + task.init(); + await sleep(2, true); + expect(task.getError()).toEqual( + new GolemTimeoutError( + "Task 1 startup timeout. Failed to prepare the runtime environment within the specified time.", + ), + ); + expect(task.getState() === TaskState.Rejected); + }); + + it("should retry the task if the retryOnTimeout is set to 'true'", async () => { + const task = new Task("1", worker, { startupTimeout: 1, maxRetries: 1, retryOnTimeout: true }); + task.init(); + await sleep(2, true); + expect(task.getError()).toEqual( + new GolemTimeoutError( + "Task 1 startup timeout. Failed to prepare the runtime environment within the specified time.", + ), + ); + expect(task.getState() === TaskState.Retry); + }); }); }); diff --git a/src/task.ts b/src/task.ts index e971781..7408841 100644 --- a/src/task.ts +++ b/src/task.ts @@ -1,5 +1,12 @@ import { QueueableTask } from "./queue"; -import { Activity, GolemConfigError, GolemTimeoutError, NetworkNode, Worker } from "@golem-sdk/golem-js"; +import { + Activity, + GolemConfigError, + GolemInternalError, + GolemTimeoutError, + NetworkNode, + Worker, +} from "@golem-sdk/golem-js"; export interface ProviderInfo { name: string; @@ -19,10 +26,20 @@ export enum TaskState { export type TaskOptions = { /** maximum number of retries if task failed due to provider reason, default = 5 */ maxRetries?: number; + + /** + * Opt-in for retries of the tasks when the {@link TaskOptions.timeout} {@link TaskOptions.startupTimeout} are reached + * + * @default false + */ + retryOnTimeout?: boolean; + /** timeout in ms for task execution, measured for one attempt from start to stop, default = 300_000 (5min) */ timeout?: number; + /** timeout in ms for task startup, measured from initialization to start, default = 120_000 (2min) */ startupTimeout?: number; + /** array of setup functions to run on each activity */ activityReadySetupFunctions?: Worker[]; }; @@ -38,8 +55,6 @@ export type TaskDetails = { const DEFAULTS = { MAX_RETRIES: 5, - TIMEOUT: 1000 * 60 * 5, - STARTUP_TIMEOUT: 1000 * 60 * 2, }; /** @@ -52,11 +67,12 @@ export class Task implements QueueableTask { private results?: OutputType; private error?: Error; private retriesCount = 0; + private retryOnTimeout; private listeners = new Set<(state: TaskState) => void>(); private timeoutId?: NodeJS.Timeout; private startupTimeoutId?: NodeJS.Timeout; - private readonly timeout: number; - private readonly startupTimeout: number; + private readonly timeout?: number; + private readonly startupTimeout?: number; private readonly maxRetries: number; private readonly activityReadySetupFunctions: Worker[]; private activity?: Activity; @@ -67,15 +83,18 @@ export class Task implements QueueableTask { private worker: Worker, options?: TaskOptions, ) { - this.timeout = options?.timeout ?? DEFAULTS.TIMEOUT; - this.startupTimeout = options?.startupTimeout ?? DEFAULTS.STARTUP_TIMEOUT; + this.timeout = options?.timeout; + this.startupTimeout = options?.startupTimeout; this.maxRetries = options?.maxRetries ?? DEFAULTS.MAX_RETRIES; + + this.retryOnTimeout = options?.retryOnTimeout ?? false; + this.activityReadySetupFunctions = options?.activityReadySetupFunctions ?? []; + if (this.maxRetries < 0) { throw new GolemConfigError("The maxRetries parameter cannot be less than zero"); } } - onStateChange(listener: (state: TaskState) => void) { this.listeners.add(listener); } @@ -84,30 +103,36 @@ export class Task implements QueueableTask { this.listeners.clear(); } init() { - this.state = TaskState.Queued; - this.startupTimeoutId = setTimeout( - () => - this.stop( - undefined, - new GolemTimeoutError( - `Task startup ${this.id} timeout. Failed to sign an agreement with the provider within the specified time`, + this.updateState(TaskState.Queued); + if (this.startupTimeout) { + this.startupTimeoutId = setTimeout( + () => + this.stop( + undefined, + new GolemTimeoutError( + `Task ${this.id} startup timeout. Failed to prepare the runtime environment within the specified time.`, + ), + this.retryOnTimeout, ), - true, - ), - this.startupTimeout, - ); + this.startupTimeout, + ); + } } start(activity: Activity, networkNode?: NetworkNode) { - this.state = TaskState.Pending; + if (this.state !== TaskState.Queued) { + throw new GolemInternalError("You cannot start a task that is not queued"); + } + this.updateState(TaskState.Pending); clearTimeout(this.startupTimeoutId); this.activity = activity; this.networkNode = networkNode; - this.listeners.forEach((listener) => listener(this.state)); - this.timeoutId = setTimeout( - () => this.stop(undefined, new GolemTimeoutError(`Task ${this.id} timeout.`), true), - this.timeout, - ); + if (this.timeout) { + this.timeoutId = setTimeout( + () => this.stop(undefined, new GolemTimeoutError(`Task ${this.id} timeout.`), this.retryOnTimeout), + this.timeout, + ); + } } stop(results?: OutputType, error?: Error, retry = true) { if (this.isFinished() || this.isRetry()) { @@ -115,19 +140,19 @@ export class Task implements QueueableTask { } clearTimeout(this.timeoutId); clearTimeout(this.startupTimeoutId); + if (error) { this.error = error; if (retry && this.retriesCount < this.maxRetries) { - this.state = TaskState.Retry; + this.updateState(TaskState.Retry); ++this.retriesCount; } else { - this.state = TaskState.Rejected; + this.updateState(TaskState.Rejected); } } else { - this.state = TaskState.Done; + this.updateState(TaskState.Done); this.results = results; } - this.listeners.forEach((listener) => listener(this.state)); } isQueueable(): boolean { return this.state === TaskState.New || this.state === TaskState.Retry; @@ -180,6 +205,12 @@ export class Task implements QueueableTask { getState(): TaskState { return this.state; } + + private updateState(newSate: TaskState) { + this.state = newSate; + this.listeners.forEach((listener) => listener(this.state)); + } + getDetails(): TaskDetails { return { id: this.id,