Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(task): fixed the issue where the default startup timeout was producing agreements and activities on many providers #73

Merged
merged 19 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
08f9a27
Merge pull request #34 from golemfactory/beta
mgordel Mar 15, 2024
b4d2e89
Merge pull request #53 from golemfactory/beta
mgordel Mar 21, 2024
9ebb1b7
fix(queue): fixed adding existing task to the queue when retrying
mgordel May 7, 2024
d57d453
fix(service): added task state check to prevent double execution
mgordel May 8, 2024
02c2af3
chore: cleanup code
mgordel May 8, 2024
438ab59
fix(task): fixed taskStartupTimeout config parameter
mgordel May 9, 2024
184f2e8
fix(task): removed defaults for task timeout and task startup timeout
mgordel May 9, 2024
c9ffc29
fix(task): disabled task retry in case of timeout
mgordel May 9, 2024
9d79a6b
fix(task): task retry only in case of GolemWorkError
mgordel May 9, 2024
63edf60
test: fixed unit tests with retrying tasks
mgordel May 9, 2024
f58ef63
fix(service): added additional task status checks during execution
mgordel May 9, 2024
6f88bcd
chore: changed error message
mgordel May 9, 2024
d9531cd
docs: added error handling to docs-example
mgordel May 10, 2024
4ada9e6
fix(executor): start task service only when there is at least one con…
mgordel May 10, 2024
29dd5f9
feat(task): added option retryOnTimeout that will allow the user to o…
grisha87 May 10, 2024
059947c
refactor: removed unused default timeouts
mgordel May 10, 2024
af70f85
Merge remote-tracking branch 'origin/beta' into bugfix/JST-881/task-r…
mgordel May 10, 2024
f6d68a8
refactor(executor): moved checking proposal to taskService
mgordel May 10, 2024
e9453a7
test: fixed stats unit tests and updated golem-js to latest
mgordel May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,54 @@ import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";
const executor = await TaskExecutor.create({
// What do you want to run
package: "golem/alpine:latest",
yagnaOptions: { apiKey: "try_golem" },
// yagnaOptions: { apiKey: "try_golem" },
logger: pinoPrettyLogger(),
budget: 0.5,
maxParallelTasks: 1,
});

// the example will run a tasks 4 times, in sequence (as maxParallelTasks is 1)
for (const i of [1, 2, 3, 4]) {
await executor.run(async (ctx) => {
// each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay

// the command generating the sequence is saved to script.sh file
await ctx.run(
`echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
);
// permissions are modified to be able to run the script
await ctx.run("chmod 700 ./script.sh");

// script is run and stream results, stdout and stderr are processed
let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh");

remoteProcess.stdout.on("data", (data) => console.log(`iteration: ${i}:`, "stdout>", data));
remoteProcess.stderr.on("data", (data) => console.error(`iteration: ${i}:`, "stderr>", data));

// For odd tasks, we set streaming timeout to 10 secs,
// the script will end normally, for equal tasks we will exit the run method after 3 secs.
// The exit caused by timeout will terminate the activity on a provider,
// therefore the user cannot run another command on the provider.
// Task executor will run the next task on another provider.

const timeout = i % 2 === 0 ? 3_000 : 10_000;
const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => {
console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${e.provider.name}`);
ctx
.run("ls -l")
.catch((e) =>
console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e),
);
});
if (finalResult) {
// if the spawn exited without timeout, the provider is still available
console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${ctx.provider.name}`);

console.log("Running command after normal runAndStream exit is possible:", (await ctx.run("ls -l")).stdout);
}
});
await executor
.run(async (ctx) => {
// each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay

// the command generating the sequence is saved to script.sh file
await ctx.run(
`echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
);
// permissions are modified to be able to run the script
await ctx.run("chmod 700 ./script.sh");

// script is run and stream results, stdout and stderr are processed
let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh");

remoteProcess.stdout.on("data", (data) => console.log(`iteration: ${i}:`, "stdout>", data));
remoteProcess.stderr.on("data", (data) => console.error(`iteration: ${i}:`, "stderr>", data));

// For odd tasks, we set streaming timeout to 10 secs,
// the script will end normally, for equal tasks we will exit the run method after 3 secs.
// The exit caused by timeout will terminate the activity on a provider,
// therefore the user cannot run another command on the provider.
// Task executor will run the next task on another provider.

const timeout = i % 2 === 0 ? 3_000 : 10_000;
const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => {
console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${e.provider.name}`);
ctx
.run("ls -l")
.catch((e) =>
console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e),
);
});
if (finalResult) {
// if the spawn exited without timeout, the provider is still available
console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${ctx.provider.name}`);

console.log("Running command after normal runAndStream exit is possible:", (await ctx.run("ls -l")).stdout);
}
})
.catch((error) => console.error("Execution of task failed due to error.", error));
}

await executor.shutdown();
6 changes: 3 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const DEFAULTS = Object.freeze({
maxParallelTasks: 5,
maxTaskRetries: 3,
taskTimeout: 1000 * 60 * 5, // 5 min,
mgordel marked this conversation as resolved.
Show resolved Hide resolved
startupTaskTimeout: 1000 * 60 * 2, // 2 min,
taskStartupTimeout: 1000 * 60 * 2, // 2 min,
enableLogging: true,
startupTimeout: 1000 * 90, // 90 sec
exitOnNoProposals: false,
Expand All @@ -36,7 +36,7 @@ export class ExecutorConfig {
readonly package?: Package | string;
readonly maxParallelTasks: number;
readonly taskTimeout: number;
readonly startupTaskTimeout: number;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one way that's a BC-breaking change (removal of the field). We will have to point this out in the release notes. Probably that will bump the version of the TE :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, this is just a change in the config file. The public ExecutorOptions remain unchanged as the correct taskStartupTimeout.

readonly taskStartupTimeout: number;
readonly budget: number;
readonly subnetTag: string;
readonly networkIp?: string;
Expand Down Expand Up @@ -89,7 +89,7 @@ export class ExecutorConfig {
this.budget = options.budget || DEFAULTS.budget;
this.maxParallelTasks = options.maxParallelTasks || DEFAULTS.maxParallelTasks;
this.taskTimeout = options.taskTimeout || DEFAULTS.taskTimeout;
this.startupTaskTimeout = options.startupTimeout || DEFAULTS.startupTaskTimeout;
this.taskStartupTimeout = options.taskStartupTimeout || DEFAULTS.taskStartupTimeout;
this.subnetTag = options.subnetTag || processEnv.env?.YAGNA_SUBNET || DEFAULTS.subnetTag;
this.networkIp = options.networkIp;
this.logger = (() => {
Expand Down
3 changes: 3 additions & 0 deletions src/executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
// temporarily until the import in golem-js is fixed
import { Allocation } from "@golem-sdk/golem-js/dist/payment";
import { PaymentConfig } from "@golem-sdk/golem-js/dist/payment/config";
import { randomUUID } from "node:crypto";
interface PaymentServiceEvents {
error: (err: Error) => void;
}
Expand Down Expand Up @@ -82,6 +83,7 @@ when(gftpStorageProviderMock.init()).thenResolve(anything());
when(gftpStorageProviderMock.close()).thenResolve(anything());
when(taskServiceMock.run()).thenResolve(anything());
when(taskServiceMock.end()).thenResolve(anything());
when(taskMock.id).thenCall(randomUUID);
when(taskMock.getActivity()).thenReturn(activity);
when(taskMock.getDetails()).thenReturn({ activityId: "1", agreementId: "1", id: "1", retriesCount: 0 });

Expand Down Expand Up @@ -246,6 +248,7 @@ describe("Task Executor", () => {
when(taskMock.isRejected()).thenReturn(false).thenReturn(true).thenReturn(false);
when(taskMock.getResults()).thenReturn("result 1").thenReturn("result 2");
when(taskMock.getError()).thenReturn(new Error("error 1"));
when(taskMock.id).thenCall(randomUUID);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const executorShutdownSpy = jest.spyOn(executor as any, "doShutdown");

Expand Down
20 changes: 18 additions & 2 deletions src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ export class TaskExecutor {
private configOptions: ExecutorOptions;
private isCanceled = false;
private startupTimeoutId?: NodeJS.Timeout;
private waitingForProposalIntervalId?: NodeJS.Timeout;
private yagna: Yagna;

/**
Expand Down Expand Up @@ -297,7 +298,9 @@ export class TaskExecutor {
}
});

this.taskService.run().catch((e) => this.handleCriticalError(e));
this.waitForAtLeastOneConfirmedProposal().then(() =>
this.taskService.run().catch((e) => this.handleCriticalError(e)),
);

if (isNode) this.installSignalHandlers();
// this.options.eventTarget.dispatchEvent(new Events.ComputationStarted());
Expand Down Expand Up @@ -338,6 +341,7 @@ export class TaskExecutor {
this.events.emit("beforeEnd", Date.now());
if (isNode) this.removeSignalHandlers();
clearTimeout(this.startupTimeoutId);
clearInterval(this.waitingForProposalIntervalId);
if (!this.configOptions.storageProvider) await this.storageProvider?.close();
await this.networkService?.end();
await Promise.all([this.taskService.end(), this.agreementPoolService.end(), this.marketService.end()]);
Expand Down Expand Up @@ -412,7 +416,7 @@ export class TaskExecutor {
task = new Task((++this.lastTaskIndex).toString(), worker, {
maxRetries: options?.maxRetries ?? this.options.maxTaskRetries,
timeout: options?.timeout ?? this.options.taskTimeout,
startupTimeout: options?.startupTimeout ?? this.options.startupTaskTimeout,
startupTimeout: options?.startupTimeout ?? this.options.taskStartupTimeout,
activityReadySetupFunctions: this.activityReadySetupFunctions,
});
this.taskQueue.addToEnd(task);
Expand Down Expand Up @@ -540,4 +544,16 @@ export class TaskExecutor {
}
}, this.options.startupTimeout);
}

private async waitForAtLeastOneConfirmedProposal() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more elegant solution would be to move this logic to TaskService.run. You have a loop there already, you can check the marketService in that loop for proposals, and if there are none, sleep and don't take a task from the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it will be much simpler in TaskService. I moved this

return new Promise((res) => {
this.waitingForProposalIntervalId = setInterval(async () => {
const proposalsCount = this.marketService.getProposalsCount();
if (proposalsCount.confirmed > 0) {
clearInterval(this.waitingForProposalIntervalId);
res(true);
}
}, 2_000);
});
}
}
24 changes: 18 additions & 6 deletions src/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ import { instance, mock, reset, when } from "@johanblumenberg/ts-mockito";
describe("Task Queue", function () {
let testQueue: TaskQueue<Task>;
const taskMock = mock(Task);
const testWorker = async () => null;
beforeEach(function () {
testQueue = new TaskQueue<Task>();
reset(taskMock);
when(taskMock.isQueueable()).thenReturn(true);
});
describe("Adding", () => {
it("should allow to add Task to the queue", () => {
const task = instance(taskMock);
const task = new Task("1", testWorker);
testQueue.addToEnd(task);
expect(testQueue.size).toEqual(1);
});
it("should add new task on the end of the queue", () => {
const tasksToAdd = ["A", "B", "C"].map(() => instance(taskMock));
const tasksToAdd = ["A", "B", "C"].map((id) => new Task(id, testWorker));
// Add tree different tasks to the queue
tasksToAdd.forEach((task) => testQueue.addToEnd(task));
// Check if the order is the same
Expand All @@ -27,7 +28,7 @@ describe("Task Queue", function () {
});
});
it("should add task on the beginning of the queue", () => {
const tasksToAdd = ["A", "B", "C"].map(() => instance(taskMock));
const tasksToAdd = ["A", "B", "C"].map((id) => new Task(id, testWorker));
// Add tree different tasks to the queue
tasksToAdd.forEach((task) => testQueue.addToBegin(task));
// Reverse expectation and check
Expand All @@ -41,6 +42,11 @@ describe("Task Queue", function () {
when(taskMock.isQueueable()).thenReturn(false);
expect(() => testQueue.addToEnd(task)).toThrow("You cannot add a task that is not in the correct state");
});
it("should throws error if adding an existing task", () => {
const task = new Task("A", testWorker);
testQueue.addToEnd(task);
expect(() => testQueue.addToEnd(task)).toThrow("Task A has already been added to the queue");
});
});

describe("Getting", () => {
Expand All @@ -59,9 +65,9 @@ describe("Task Queue", function () {

it("should return correct number of items in the queue ", () => {
// Add 3 tasks to the queue
testQueue.addToEnd(instance(taskMock));
testQueue.addToEnd(instance(taskMock));
testQueue.addToEnd(instance(taskMock));
testQueue.addToEnd(new Task("1", testWorker));
testQueue.addToEnd(new Task("2", testWorker));
testQueue.addToEnd(new Task("3", testWorker));
// Check if is eq 3
expect(testQueue.size).toEqual(3);
// Get one
Expand All @@ -78,5 +84,11 @@ describe("Task Queue", function () {
// Check if still is eq 0
expect(testQueue.size).toEqual(0);
});

it("should check if task belongs to the queue", () => {
const task = instance(taskMock);
testQueue.addToEnd(task);
expect(testQueue.has(task)).toEqual(true);
});
});
});
12 changes: 11 additions & 1 deletion src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { GolemInternalError } from "@golem-sdk/golem-js";
* @internal
*/
export interface QueueableTask {
id: string;
isQueueable(): boolean;
}

Expand Down Expand Up @@ -32,7 +33,16 @@ export class TaskQueue<T extends QueueableTask = Task> {
return this.itemsStack.shift();
}

has(task: T) {
return this.itemsStack.some((t) => t.id === task.id);
}

private checkIfTaskIsEligibleForAdd(task: T) {
if (!task.isQueueable()) throw new GolemInternalError("You cannot add a task that is not in the correct state");
if (!task.isQueueable()) {
throw new GolemInternalError("You cannot add a task that is not in the correct state");
}
if (this.has(task)) {
throw new GolemInternalError(`Task ${task.id} has already been added to the queue`);
}
}
}
36 changes: 33 additions & 3 deletions src/service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import {
ActivityStateEnum,
Agreement,
AgreementPoolService,
GolemWorkError,
NetworkService,
PaymentService,
Result,
ResultState,
WorkContext,
WorkErrorCode,
YagnaApi,
} from "@golem-sdk/golem-js";
import { TaskService } from "./service";
Expand Down Expand Up @@ -98,7 +100,7 @@ describe("Task Service", () => {
await service.end();
});

it("should retry task if it failed", async () => {
it("should retry task if a GolemWorkError occurred", async () => {
const worker = async (ctx: WorkContext) => ctx.run("some_shell_command");
const task = new Task("1", worker, { maxRetries: 3 });
queue.addToEnd(task);
Expand All @@ -110,7 +112,9 @@ describe("Task Service", () => {
});
const cb = jest.fn();
events.on("taskRetried", cb);
when(activityMock.execute(anything(), false, undefined)).thenReject(new Error("Test error"));
when(activityMock.execute(anything(), false, undefined)).thenReject(
new GolemWorkError("Test error", WorkErrorCode.ScriptExecutionFailed),
);
const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, {
taskRunningInterval: 10,
activityStateCheckingInterval: 10,
Expand All @@ -122,6 +126,30 @@ describe("Task Service", () => {
await service.end();
});

it("should not retry task if a TypeError occurred", async () => {
const worker = async (ctx: WorkContext) => ctx.run("some_shell_command");
const task = new Task("1", worker, { maxRetries: 3 });
queue.addToEnd(task);
const readable = new Readable({
objectMode: true,
read() {
readable.destroy(new Error("Test error"));
},
});
const cb = jest.fn();
events.on("taskRetried", cb);
when(activityMock.execute(anything(), false, undefined)).thenReject(new Error("Test error"));
const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, {
taskRunningInterval: 10,
activityStateCheckingInterval: 10,
});
service.run().then();
await sleep(800, true);
expect(cb).toHaveBeenCalledTimes(0);
expect(task.isRejected()).toEqual(true);
await service.end();
});

it("should not retry task if it failed and maxRetries is zero", async () => {
const worker = async (ctx: WorkContext) => ctx.run("some_shell_command");
const task = new Task("1", worker, { maxRetries: 0 });
Expand Down Expand Up @@ -150,7 +178,9 @@ describe("Task Service", () => {
const task = new Task("1", worker, { maxRetries: 1 });
const cb = jest.fn();
events.on("taskRetried", cb);
when(activityMock.execute(anything(), false, undefined)).thenReject(new Error("Test error"));
when(activityMock.execute(anything(), false, undefined)).thenReject(
new GolemWorkError("Test error", WorkErrorCode.ScriptExecutionFailed),
);
const service = new TaskService(yagnaApi, queue, events, agreementPoolService, paymentService, networkService, {
taskRunningInterval: 10,
activityStateCheckingInterval: 10,
Expand Down
Loading