Skip to content

Commit

Permalink
Added killing job related methods in populse_db and used black on who…
Browse files Browse the repository at this point in the history
…le project
  • Loading branch information
sapetnioc committed Mar 20, 2024
1 parent 8cac1e9 commit 19cd8b1
Show file tree
Hide file tree
Showing 9 changed files with 735 additions and 605 deletions.
2 changes: 1 addition & 1 deletion capsul/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __exit__(self, exception_type, exception_value, exception_traceback):
def workers_command(self, engine_id):
db_config = dict(self.worker_database_config(self.engine_id))
# fix db path in case it is different from the initial config
# (happens if path == '')
# (happens if path == "")
db_config["path"] = self.path
db_config = json.dumps(db_config, separators=(",", ":"))
workers_command = []
Expand Down
21 changes: 20 additions & 1 deletion capsul/database/populse_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"execution_id": [str, {"primary_key": True}],
"job_id": [str, {"primary_key": True}],
"job": dict,
"killed": bool,
}
],
},
Expand Down Expand Up @@ -311,7 +312,10 @@ def store_execution(
dispose=False,
)
for job in jobs:
db.capsul_job[engine_id, execution_id, job["uuid"]] = {"job": job}
db.capsul_job[engine_id, execution_id, job["uuid"]] = {
"job": job,
"killed": False,
}
return execution_id

def execution_context_json(self, engine_id, execution_id):
Expand Down Expand Up @@ -491,6 +495,21 @@ def job_json(self, engine_id, execution_id, job_id):
with self.storage.data() as db:
return db.capsul_job[engine_id, execution_id, job_id].job.get()

def kill_jobs(self, engine_id, execution_id, job_ids=None):
"""Request killing of jobs"""
# we just set a flag to 1 associated with the jobs to be killed.
# Workers will poll for it while jobs are running, and react
# accordingly.
with self.storage.data(write=True) as db:
if job_ids is None:
job_ids = db.capsul_execution[engine_id, execution_id].ongoing.get()
for job_id in job_ids:
db.capsul_job[engine_id, execution_id, job_id].killed = True

def job_kill_requested(self, engine_id, execution_id, job_id):
with self.storage.data(write=True) as db:
return db.capsul_job[engine_id, execution_id, job_id].killed.get()

def execution_report_json(self, engine_id, execution_id):
if os.path.exists(self.path):
with self.storage.data() as db:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ def run(self):
# Item content is a string or buffer
item_content = [x.replace("\n", "") for x in fo.readlines()]
for string_content in item_content:
new_content.append(
str(string_content), source=self.content
)
new_content.append(str(string_content), source=self.content)
fo.close()
except MyError as e:
item_content = "Can't open the resource file " "'{0}'".format(
Expand Down
Loading

0 comments on commit 19cd8b1

Please sign in to comment.