Skip to content

Commit

Permalink
Merge pull request #73 from golemfactory/bugfix/JST-881/task-retrying
Browse files Browse the repository at this point in the history
fix(task): fixed the issue where the default startup timeout was producing agreements and activities on many providers
  • Loading branch information
mgordel authored May 10, 2024
2 parents baef766 + e9453a7 commit cb571c6
Show file tree
Hide file tree
Showing 13 changed files with 410 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,54 @@ import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";
const executor = await TaskExecutor.create({
// What do you want to run
package: "golem/alpine:latest",
yagnaOptions: { apiKey: "try_golem" },
// yagnaOptions: { apiKey: "try_golem" },
logger: pinoPrettyLogger(),
budget: 0.5,
maxParallelTasks: 1,
});

// the example will run a tasks 4 times, in sequence (as maxParallelTasks is 1)
for (const i of [1, 2, 3, 4]) {
await executor.run(async (ctx) => {
// each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay

// the command generating the sequence is saved to script.sh file
await ctx.run(
`echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
);
// permissions are modified to be able to run the script
await ctx.run("chmod 700 ./script.sh");

// script is run and stream results, stdout and stderr are processed
let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh");

remoteProcess.stdout.on("data", (data) => console.log(`iteration: ${i}:`, "stdout>", data));
remoteProcess.stderr.on("data", (data) => console.error(`iteration: ${i}:`, "stderr>", data));

// For odd tasks, we set streaming timeout to 10 secs,
// the script will end normally, for equal tasks we will exit the run method after 3 secs.
// The exit caused by timeout will terminate the activity on a provider,
// therefore the user cannot run another command on the provider.
// Task executor will run the next task on another provider.

const timeout = i % 2 === 0 ? 3_000 : 10_000;
const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => {
console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${e.provider.name}`);
ctx
.run("ls -l")
.catch((e) =>
console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e),
);
});
if (finalResult) {
// if the spawn exited without timeout, the provider is still available
console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${ctx.provider.name}`);

console.log("Running command after normal runAndStream exit is possible:", (await ctx.run("ls -l")).stdout);
}
});
await executor
.run(async (ctx) => {
// each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay

// the command generating the sequence is saved to script.sh file
await ctx.run(
`echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
);
// permissions are modified to be able to run the script
await ctx.run("chmod 700 ./script.sh");

// script is run and stream results, stdout and stderr are processed
let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh");

remoteProcess.stdout.on("data", (data) => console.log(`iteration: ${i}:`, "stdout>", data));
remoteProcess.stderr.on("data", (data) => console.error(`iteration: ${i}:`, "stderr>", data));

// For odd tasks, we set streaming timeout to 10 secs,
// the script will end normally, for equal tasks we will exit the run method after 3 secs.
// The exit caused by timeout will terminate the activity on a provider,
// therefore the user cannot run another command on the provider.
// Task executor will run the next task on another provider.

const timeout = i % 2 === 0 ? 3_000 : 10_000;
const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => {
console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${e.provider.name}`);
ctx
.run("ls -l")
.catch((e) =>
console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e),
);
});
if (finalResult) {
// if the spawn exited without timeout, the provider is still available
console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${ctx.provider.name}`);

console.log("Running command after normal runAndStream exit is possible:", (await ctx.run("ls -l")).stdout);
}
})
.catch((error) => console.error("Execution of task failed due to error.", error));
}

await executor.shutdown();
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"node": ">=18.0.0"
},
"dependencies": {
"@golem-sdk/golem-js": "^2.4.2",
"@golem-sdk/golem-js": "^2.4.3",
"eventemitter3": "^5.0.1"
},
"devDependencies": {
Expand Down
19 changes: 10 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ const DEFAULTS = Object.freeze({
basePath: "http://127.0.0.1:7465",
maxParallelTasks: 5,
maxTaskRetries: 3,
taskTimeout: 1000 * 60 * 5, // 5 min,
startupTaskTimeout: 1000 * 60 * 2, // 2 min,
taskRetryOnTimeout: false,
enableLogging: true,
startupTimeout: 1000 * 90, // 90 sec
exitOnNoProposals: false,
Expand All @@ -35,8 +34,8 @@ const DEFAULTS = Object.freeze({
export class ExecutorConfig {
readonly package?: Package | string;
readonly maxParallelTasks: number;
readonly taskTimeout: number;
readonly startupTaskTimeout: number;
readonly taskTimeout?: number;
readonly taskStartupTimeout?: number;
readonly budget: number;
readonly subnetTag: string;
readonly networkIp?: string;
Expand All @@ -48,6 +47,7 @@ export class ExecutorConfig {
readonly startupTimeout: number;
readonly exitOnNoProposals: boolean;
readonly agreementMaxPoolSize: number;
readonly taskRetryOnTimeout: boolean;

constructor(options: ExecutorOptions & TaskServiceOptions) {
const processEnv = !isBrowser
Expand Down Expand Up @@ -88,9 +88,10 @@ export class ExecutorConfig {
};
this.budget = options.budget || DEFAULTS.budget;
this.maxParallelTasks = options.maxParallelTasks || DEFAULTS.maxParallelTasks;
this.taskTimeout = options.taskTimeout || DEFAULTS.taskTimeout;
this.startupTaskTimeout = options.startupTimeout || DEFAULTS.startupTaskTimeout;
this.taskTimeout = options.taskTimeout;
this.taskStartupTimeout = options.taskStartupTimeout;
this.subnetTag = options.subnetTag || processEnv.env?.YAGNA_SUBNET || DEFAULTS.subnetTag;
this.taskRetryOnTimeout = options.taskRetryOnTimeout ?? DEFAULTS.taskRetryOnTimeout;
this.networkIp = options.networkIp;
this.logger = (() => {
const isLoggingEnabled = options.enableLogging ?? DEFAULTS.enableLogging;
Expand All @@ -117,18 +118,18 @@ export class ExecutorConfig {
export class TaskConfig extends ActivityConfig {
public readonly maxParallelTasks: number;
public readonly taskRunningInterval: number;
public readonly taskTimeout: number;
public readonly taskTimeout?: number;
public readonly activityStateCheckingInterval: number;
public readonly activityPreparingTimeout: number;
public readonly storageProvider?: StorageProvider;
public readonly logger: Logger;

constructor(options?: TaskServiceOptions) {
const activityExecuteTimeout = options?.activityExecuteTimeout || options?.taskTimeout || DEFAULTS.taskTimeout;
const activityExecuteTimeout = options?.activityExecuteTimeout || options?.taskTimeout;
super({ ...options, activityExecuteTimeout });
this.maxParallelTasks = options?.maxParallelTasks || DEFAULTS.maxParallelTasks;
this.taskRunningInterval = options?.taskRunningInterval || DEFAULTS.taskRunningInterval;
this.taskTimeout = options?.taskTimeout || DEFAULTS.taskTimeout;
this.taskTimeout = options?.taskTimeout;
this.activityStateCheckingInterval =
options?.activityStateCheckingInterval || DEFAULTS.activityStateCheckingInterval;
this.logger = options?.logger || defaultLogger("work", { disableAutoPrefix: true });
Expand Down
9 changes: 5 additions & 4 deletions src/executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
// temporarily until the import in golem-js is fixed
import { Allocation } from "@golem-sdk/golem-js/dist/payment";
import { PaymentConfig } from "@golem-sdk/golem-js/dist/payment/config";
import { randomUUID } from "node:crypto";
interface PaymentServiceEvents {
error: (err: Error) => void;
}
Expand Down Expand Up @@ -82,6 +83,7 @@ when(gftpStorageProviderMock.init()).thenResolve(anything());
when(gftpStorageProviderMock.close()).thenResolve(anything());
when(taskServiceMock.run()).thenResolve(anything());
when(taskServiceMock.end()).thenResolve(anything());
when(taskMock.id).thenCall(randomUUID);
when(taskMock.getActivity()).thenReturn(activity);
when(taskMock.getDetails()).thenReturn({ activityId: "1", agreementId: "1", id: "1", retriesCount: 0 });

Expand Down Expand Up @@ -186,8 +188,7 @@ describe("Task Executor", () => {
expect(Task).toHaveBeenCalledWith("1", worker, {
activityReadySetupFunctions: [],
maxRetries: 0,
timeout: 300000,
startupTimeout: 120000,
retryOnTimeout: false,
});
await executor.shutdown();
});
Expand All @@ -209,8 +210,7 @@ describe("Task Executor", () => {
expect(Task).toHaveBeenCalledWith("1", worker, {
activityReadySetupFunctions: [],
maxRetries: 0,
timeout: 300000,
startupTimeout: 120000,
retryOnTimeout: false,
});
await executor.shutdown();
});
Expand Down Expand Up @@ -246,6 +246,7 @@ describe("Task Executor", () => {
when(taskMock.isRejected()).thenReturn(false).thenReturn(true).thenReturn(false);
when(taskMock.getResults()).thenReturn("result 1").thenReturn("result 2");
when(taskMock.getError()).thenReturn(new Error("error 1"));
when(taskMock.id).thenCall(randomUUID);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const executorShutdownSpy = jest.spyOn(executor as any, "doShutdown");

Expand Down
14 changes: 9 additions & 5 deletions src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export type ExecutorOptions = {
* Default is `false`.
*/
exitOnNoProposals?: boolean;

taskRetryOnTimeout?: boolean;
} & Omit<PackageOptions, "imageHash" | "imageTag"> &
MarketOptions &
PaymentOptions &
Expand Down Expand Up @@ -231,6 +233,7 @@ export class TaskExecutor {
this.yagna.getApi(),
this.taskQueue,
this.events,
this.marketService,
this.agreementPoolService,
this.paymentService,
this.networkService,
Expand Down Expand Up @@ -350,11 +353,11 @@ export class TaskExecutor {
this.events.emit("end", Date.now());
}

/**
* @Deprecated This feature is no longer supported. It will be removed in the next release.
*/
getStats() {
return [];
return {
...this.statsService.getAll(),
retries: this.taskService.getRetryCount(),
};
}

/**
Expand Down Expand Up @@ -412,8 +415,9 @@ export class TaskExecutor {
task = new Task((++this.lastTaskIndex).toString(), worker, {
maxRetries: options?.maxRetries ?? this.options.maxTaskRetries,
timeout: options?.timeout ?? this.options.taskTimeout,
startupTimeout: options?.startupTimeout ?? this.options.startupTaskTimeout,
startupTimeout: options?.startupTimeout ?? this.options.taskStartupTimeout,
activityReadySetupFunctions: this.activityReadySetupFunctions,
retryOnTimeout: options?.retryOnTimeout ?? this.options.taskRetryOnTimeout,
});
this.taskQueue.addToEnd(task);
this.events.emit("taskQueued", task.getDetails());
Expand Down
24 changes: 18 additions & 6 deletions src/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ import { instance, mock, reset, when } from "@johanblumenberg/ts-mockito";
describe("Task Queue", function () {
let testQueue: TaskQueue<Task>;
const taskMock = mock(Task);
const testWorker = async () => null;
beforeEach(function () {
testQueue = new TaskQueue<Task>();
reset(taskMock);
when(taskMock.isQueueable()).thenReturn(true);
});
describe("Adding", () => {
it("should allow to add Task to the queue", () => {
const task = instance(taskMock);
const task = new Task("1", testWorker);
testQueue.addToEnd(task);
expect(testQueue.size).toEqual(1);
});
it("should add new task on the end of the queue", () => {
const tasksToAdd = ["A", "B", "C"].map(() => instance(taskMock));
const tasksToAdd = ["A", "B", "C"].map((id) => new Task(id, testWorker));
// Add tree different tasks to the queue
tasksToAdd.forEach((task) => testQueue.addToEnd(task));
// Check if the order is the same
Expand All @@ -27,7 +28,7 @@ describe("Task Queue", function () {
});
});
it("should add task on the beginning of the queue", () => {
const tasksToAdd = ["A", "B", "C"].map(() => instance(taskMock));
const tasksToAdd = ["A", "B", "C"].map((id) => new Task(id, testWorker));
// Add tree different tasks to the queue
tasksToAdd.forEach((task) => testQueue.addToBegin(task));
// Reverse expectation and check
Expand All @@ -41,6 +42,11 @@ describe("Task Queue", function () {
when(taskMock.isQueueable()).thenReturn(false);
expect(() => testQueue.addToEnd(task)).toThrow("You cannot add a task that is not in the correct state");
});
it("should throws error if adding an existing task", () => {
const task = new Task("A", testWorker);
testQueue.addToEnd(task);
expect(() => testQueue.addToEnd(task)).toThrow("Task A has already been added to the queue");
});
});

describe("Getting", () => {
Expand All @@ -59,9 +65,9 @@ describe("Task Queue", function () {

it("should return correct number of items in the queue ", () => {
// Add 3 tasks to the queue
testQueue.addToEnd(instance(taskMock));
testQueue.addToEnd(instance(taskMock));
testQueue.addToEnd(instance(taskMock));
testQueue.addToEnd(new Task("1", testWorker));
testQueue.addToEnd(new Task("2", testWorker));
testQueue.addToEnd(new Task("3", testWorker));
// Check if is eq 3
expect(testQueue.size).toEqual(3);
// Get one
Expand All @@ -78,5 +84,11 @@ describe("Task Queue", function () {
// Check if still is eq 0
expect(testQueue.size).toEqual(0);
});

it("should check if task belongs to the queue", () => {
const task = instance(taskMock);
testQueue.addToEnd(task);
expect(testQueue.has(task)).toEqual(true);
});
});
});
12 changes: 11 additions & 1 deletion src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { GolemInternalError } from "@golem-sdk/golem-js";
* @internal
*/
export interface QueueableTask {
id: string;
isQueueable(): boolean;
}

Expand Down Expand Up @@ -32,7 +33,16 @@ export class TaskQueue<T extends QueueableTask = Task> {
return this.itemsStack.shift();
}

has(task: T) {
return this.itemsStack.some((t) => t.id === task.id);
}

private checkIfTaskIsEligibleForAdd(task: T) {
if (!task.isQueueable()) throw new GolemInternalError("You cannot add a task that is not in the correct state");
if (!task.isQueueable()) {
throw new GolemInternalError("You cannot add a task that is not in the correct state");
}
if (this.has(task)) {
throw new GolemInternalError(`Task ${task.id} has already been added to the queue`);
}
}
}
Loading

0 comments on commit cb571c6

Please sign in to comment.