-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(task): fixed the issue where the default startup timeout was producing agreements and activities on many providers #73
Conversation
Beta -> master
Beta -> master
I did test the implementation and there are few issues present: Test code: import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
(async function main() {
const executor = await TaskExecutor.create({
package: "golem/alpine:latest",
logger: pinoPrettyLogger({
level: "info",
}),
maxTaskRetries: 3,
// taskStartupTimeout: 100, // ms - FIXME, THIS DOESN'T WORK - not used by TE
startupTimeout: 100, // ms
maxParallelTasks: 100, // ms
});
try {
await Promise.allSettled(
[1].map((val) =>
executor.run((ctx) => ctx.run(`echo 'Hello World - ${val}'`)).then((res) => console.log(res.stdout)),
),
);
} catch (error) {
console.error("Computation failed:", error);
} finally {
await executor.shutdown();
}
})(); Output:
Actual resultThe task timeout was reached pretty fast (100ms). We poked 4 providers in total, generating 4 invoices as a consequence. If these providers had a starting price set, the payments would be made even if we didn't schedule any work (we didn't create an activity at all because of the bail-out). Expected resultThese invoices shouldn't be generated at all. In general, the expected results described in JST-881 are not achieved with this implementation. Additional issuesWhen testing, I found out that:
What's more, increasing the Code: const executor = await TaskExecutor.create({
package: "golem/alpine:latest",
logger: pinoPrettyLogger({
level: "info",
}),
maxTaskRetries: 3,
// taskStartupTimeout: 100, // ms - FIXME, THIS DOESN'T WORK - not used by TE
startupTimeout: 2000, // ms
maxParallelTasks: 5, // ms
}); Output:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per previous comment.
It has been fixed:
|
@@ -36,7 +36,7 @@ export class ExecutorConfig { | |||
readonly package?: Package | string; | |||
readonly maxParallelTasks: number; | |||
readonly taskTimeout: number; | |||
readonly startupTaskTimeout: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In one way that's a BC-breaking change (removal of the field). We will have to point this out in the release notes. Probably that will bump the version of the TE :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, this is just a change in the config
file. The public ExecutorOptions
remain unchanged as the correct taskStartupTimeout
.
src/executor.ts
Outdated
@@ -540,4 +544,16 @@ export class TaskExecutor { | |||
} | |||
}, this.options.startupTimeout); | |||
} | |||
|
|||
private async waitForAtLeastOneConfirmedProposal() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a more elegant solution would be to move this logic to TaskService.run
. You have a loop there already, you can check the marketService
in that loop for proposals, and if there are none, sleep and don't take a task from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it will be much simpler in TaskService
. I moved this
…pt-in for retries
Results after A/B testing:
Test10x runs which execute 20 tasks on the network import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
type TestConfig = {
name: string;
tasks: number;
maxParallelTasks: number;
maxTaskRetries: number;
taskStartupTimeoutSec: number | null;
taskTimeoutSec: number | null;
taskRetryOnTimeout: boolean | null;
useReputation: boolean | null;
};
/**
* Configs:
*
* - maxParallelTasks, 5
* - maxTaskRetries: 3
* - taskStartupTimeout: undefined, 5s, 15s, 30s, 60s, 90s, 120s
* - taskTimeout: undefined, 5s, 15s, 30s, 60s, 90s, 120s
* - taskRetryOnTimeout: false, true
* - use reputation
*
* Metrics:
*
* - number of tasks
* - number of retries
* - number of successful
* - number of failed
* - number of providers
* - number of agreements
* - number of paid invoices
* - number of unpaid invoices
* - total execution time
*/
async function testRun(config: TestConfig) {
const {
name,
tasks,
maxParallelTasks,
maxTaskRetries,
taskStartupTimeoutSec,
taskTimeoutSec,
taskRetryOnTimeout,
useReputation,
} = config;
const stats = {
tasks,
retries: 0,
successful: 0,
failed: 0,
successRate: 0,
providers: 0,
agreements: 0,
invoicesReceived: 0,
invoicesPaid: 0,
invoicesUnpaid: 0,
invoicesMissing: 0,
invoicePaymentRate: 0,
};
const timeStartMs = Date.now();
const executor = await TaskExecutor.create({
package: "golem/alpine:latest",
logger: pinoPrettyLogger({
level: "silent",
}),
maxTaskRetries,
taskStartupTimeout: taskStartupTimeoutSec ? taskStartupTimeoutSec * 1000 : undefined,
taskTimeout: taskTimeoutSec ? taskTimeoutSec * 1000 : undefined,
taskRetryOnTimeout,
maxParallelTasks,
});
try {
const results = await Promise.allSettled(
new Array(tasks)
.fill(0)
.map((val, index) =>
executor.run((ctx) => ctx.run(`echo 'Hello World - ${index}.${val}'`)).then((res) => res.stdout),
),
);
stats.successful = results.filter((p) => p.status === "fulfilled").length;
stats.failed = results.filter((p) => p.status === "rejected").length;
stats.successRate = stats.successful / (stats.successful + stats.failed);
} catch (error) {
console.error("Computation failed:", error);
} finally {
await executor.shutdown();
}
const timeStopMs = Date.now();
const testDurationSec = (timeStopMs - timeStartMs) / 1000;
const teStats = executor.getStats();
stats.retries = teStats.retries;
stats.providers = teStats.providers;
stats.agreements = teStats.agreements;
stats.invoicesReceived = teStats.invoicesReceived;
stats.invoicesUnpaid = teStats.invoicesUnpaid;
stats.invoicesPaid = teStats.invoicesPaid;
stats.invoicesMissing = teStats.invoicesMissing;
stats.invoicePaymentRate = teStats.invoicePaymentRate;
return {
...config,
...stats,
testDurationSec,
};
}
(async () => {
const variants: TestConfig[] = [
{
name: "New defaults - no timeout and timeout retries",
tasks: 20,
taskTimeoutSec: null,
taskStartupTimeoutSec: null,
maxParallelTasks: 5,
maxTaskRetries: 3,
taskRetryOnTimeout: false,
useReputation: null,
},
{
name: "Short (10s) timeouts without retries",
tasks: 20,
taskTimeoutSec: 10,
taskStartupTimeoutSec: 10,
maxParallelTasks: 5,
maxTaskRetries: 3,
taskRetryOnTimeout: false,
useReputation: null,
},
{
name: "Short (10s) timeouts with retries",
tasks: 20,
taskTimeoutSec: 10,
taskStartupTimeoutSec: 10,
maxParallelTasks: 5,
maxTaskRetries: 3,
taskRetryOnTimeout: true,
useReputation: null,
},
{
name: "Old default timeouts (2m + 5m) timeouts without retries",
tasks: 20,
taskTimeoutSec: 5 * 60,
taskStartupTimeoutSec: 2 * 60,
maxParallelTasks: 5,
maxTaskRetries: 3,
taskRetryOnTimeout: false,
useReputation: null,
},
{
name: "Old default timeouts (2m + 5m) timeouts with retries",
tasks: 20,
taskTimeoutSec: 5 * 60,
taskStartupTimeoutSec: 2 * 60,
maxParallelTasks: 5,
maxTaskRetries: 3,
taskRetryOnTimeout: true,
useReputation: null,
},
];
const attempts = 10;
for (const variant of variants) {
const summary: Awaited<ReturnType<typeof testRun>>[] = [];
for (let i = 0; i < attempts; i++) {
console.log("Running attempt", i + 1, variant.name);
const report = await testRun(variant);
console.log("Test Run Report");
console.table(report);
summary.push(report);
}
console.log("Test Summary:", variant.name);
console.table(summary);
}
})().catch(console.error); ResultsAverage test duration
Note: The new implementation required 0.5s per task more to complete. Average number of agreements created
Note: The new implementation produced 50% fewer agreements and invoices. Task success rate
Note: The new implementation has a better success rate as it's not so aggressive with retries. Payment rate
Note: The new implementation has a slightly better invoice payment rate. This mostly results from the fact that it produces 50% fewer invoices so the probability for failure decreases. |
This pr includes fixes to add additional checks when adding a task to the queue. Also, when starting a task in the service, task status is checked before starting.