Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): ✨ Adds task_id_function to TaskScheduler #1212

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

natephysics
Copy link
Contributor

Related Issue \ discussion

#1211
https://clearml.slack.com/archives/CTK20V944/p1708447659999379?thread_ts=1708445057.172119&cid=CTK20V944

Patch Description

Adds new functionality to the task scheduler by adding a new parameter task_id_function to TaskScheduler.add_task() that takes a callable that has an expected return of a task_id. This task_id_function function is run at runtime (when the task scheduler would normally execute the scheduled task) and uses the task_id returned by the function + the other parameters from .add_task() as the scheduled task.

Motivation

Why is this useful: there's a host of reasons but the biggest one: it gives users much more control over the tasks that are run by the task scheduler. Currently, as far as I can tell, if I wanted to run the most recent task (at runtime) from a given project with a specific tag, it's not possible to do with the task scheduler. I can use the schedule_function parameter and create a function that finds and runs the task but then I lose the core advantages of .add_task(), no way to specify queues, task_parameters, and task_overrides. Naturally, I could wrap all of that into the function called by task_parameters but then I'm basically just writing my own scheduler at that point. This will also let you do some preprocessing before returning the task_id, for example, if you wanted to clean up old tasks.

Testing Instructions

Define a function that returns a task_id by whatever means:

def yourfunction():
    return 'some_task_id_string'

With the task scheduler call .add_task(..., schedule_task_id=None, task_id_function=yourfunction, ...). At the time the task was scheduled to execute, the function should run and the output task_id will be passed as if it had originally passed to schedule_task_id.

Adds new functionality to the task scheduler by adding a new parameter task_id_function to TaskScheduler.add_task() that takes a callable which has an expected return of a task_id. This task_id_function function is run at runtime (when the task scheduler would normally execute the scheduled task) and uses the task_id returned by the function + the other parameters from .add_task() as the scheduled task.
@jkhenning
Copy link
Member

Thanks @natephysics, we'll take a look at it soon!

Copy link
Collaborator

@eugen-ajechiloae-clearml eugen-ajechiloae-clearml left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution. I left a few comments.

@@ -60,11 +65,12 @@ def run(self, task_id):
self._executed_instances.append(str(task_id))

def get_resolved_target_project(self):
if not self.base_task_id or not self.target_project:
if not self.base_task_id or not self.target_project or not self.base_task_id_function:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we either have base_task_id or base_task_id_function, this will always evaluate to true. The condition should be: if (not self.base_task_id and not self.base_task_id_function) or not self.target_project:.

if self.base_task_id_function:
# validate retrevial of a valid task id
try:
base_task_id = self.base_task_id_function()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should cache the result of self.base_task_id_function such that it is not called twice.
This is because self.get_task_id is called in this function and it is also called before running the job itself.
If the self.base_task_id_function has side-effects, such as cloning a task, then these effects will happen twice, so one might end up with multiple clones for example.

clearml/automation/scheduler.py Show resolved Hide resolved
@eugen-ajechiloae-clearml
Copy link
Collaborator

Also, can you please update https://github.com/allegroai/clearml/blob/master/examples/scheduler/cron_example.py to include a use of base_task_id_function?

@jkhenning
Copy link
Member

@natephysics any update?

@natephysics
Copy link
Contributor Author

natephysics commented Mar 10, 2024

Hi Jake.

Yes. I started implementing the suggested changes but the point about not running the function multiple times was something I overlooked. I fixed most of the issues and in theory the function should only run once but when I test it out it appears to run twice still. I tried to figure out exactly why this is the case but I ran out of time. I still plan to finish it but I have to finish up another project first.

I was thinking of splitting out the preprocessing and the function that returns a task ID in the worst case scenario. But I'd rather solve the problem directly if I could.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants