Skip to content

Commit

Permalink
runs: fix create run; update task send off
Browse files Browse the repository at this point in the history
  • Loading branch information
yashlamba committed May 29, 2024
1 parent 92438ab commit 1c513b2
Show file tree
Hide file tree
Showing 4 changed files with 457 additions and 14 deletions.
2 changes: 1 addition & 1 deletion invenio_jobs/services/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def from_job(cls, job):
name=job.title,
schedule=job.parsed_schedule,
kwargs={"kwargs": job.default_args},
task="invenio_jobs.tasks.execute_run", # TODO Make a constant/import
task="invenio_jobs.tasks.execute_run", # TODO Make a constant/import
options={"queue": job.default_queue},
last_run_at=(job.last_run and job.last_run.created),
)
Expand Down
23 changes: 16 additions & 7 deletions invenio_jobs/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

"""Service definitions."""

import uuid

import sqlalchemy as sa
from celery import current_app
from invenio_records_resources.services.base import LinksTemplate
Expand All @@ -16,9 +18,12 @@
from invenio_records_resources.services.uow import (
ModelCommitOp,
ModelDeleteOp,
TaskOp,
unit_of_work,
)

from invenio_jobs.tasks import execute_run

from ..models import Job, Run, RunStatusEnum
from ..proxies import current_jobs
from .errors import JobNotFoundError, RunNotFoundError, RunStatusChangeError
Expand Down Expand Up @@ -233,17 +238,21 @@ def create(self, identity, job_id, data, uow=None):

valid_data.setdefault("queue", job.default_queue)
run = Run(
id=str(uuid.uuid4()),
job=job,
args=job.default_args,
queue=job.default_queue,
task_id=uuid.uuid4(),
started_by_id=identity.id,
status=RunStatusEnum.QUEUED,
**valid_data,
)
uow.register(ModelCommitOp(run))

task = current_app.tasks.get(job.task)
# TODO how to pass data?
# if task:
# task.apply_async
uow.register(
TaskOp.for_async_apply(
execute_run,
kwargs={"run_id": run.id, "kwargs": run.args},
task_id=str(run.task_id),
)
)

return self.result_item(self, identity, run, links_tpl=self.links_item_tpl)

Expand Down
Loading

0 comments on commit 1c513b2

Please sign in to comment.