Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPD-408 Add jobid #22

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def cleanup_env(self, *args, **kwargs):
Method which gets called after the run method has completed.
"""


class RunActor(AbstractRunActor):
"""
RunActor class with added stopping functionality.
Expand Down Expand Up @@ -201,8 +201,7 @@ def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=Non

logging.debug("Tasks executed: ", self.tasks_processed)

if (stop_function is not None and
stop_function(**stop_function_args)):
if (stop_function is not None and stop_function(**stop_function_args)):
break

# break if number of tasks processed is max set
Expand Down
6 changes: 3 additions & 3 deletions picas/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import traceback
from uuid import uuid4

from . import batchid
from . import jobid
from .util import merge_dicts, seconds


Expand Down Expand Up @@ -202,7 +202,7 @@ def __init__(self, task=None):
def lock(self):
"""Function which modifies the task such that it is locked."""
self.doc['lock'] = seconds()
batchid.add_batch_management_id(self.doc)
jobid.add_job_id(self.doc)
return self._update_hostname()

def done(self):
Expand Down Expand Up @@ -253,7 +253,7 @@ def scrub(self):
self.doc['scrub_count'] += 1
self.doc['done'] = 0
self.doc['lock'] = 0
batchid.remove_batch_management_id(self.doc)
jobid.remove_job_id(self.doc)
return self._update_hostname()

def error(self, msg=None, exception=None):
Expand Down
1 change: 0 additions & 1 deletion picas/executers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def execute(args, shell=False):
return (proc, proc.returncode, stdout, stderr)



def execute_old(cmd):
"""Helper function to execute an external application.
@param cmd: the command to be executed.
Expand Down
25 changes: 18 additions & 7 deletions picas/batchid.py → picas/jobid.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@
from os import environ


def add_batch_management_id(doc):
def add_job_id(doc):
"""
Add job number id of the batch system to a token/document
Adds information of the highest level of batch system,
Add job number id to a token/document. For batch jobs,
adds information of the highest level of batch system,
since job submision systems may be layered e.g:
A glite wms system makes underneath use of a cream system which makes use
of PBS. I such a case only the glite wms id instead of all of them.
of PBS. In such a case only the glite wms id instead of all of them.
"""
dirac_jobid = environ.get("DIRACJOBID")
slurm_jobid = environ.get("SLURM_JOB_ID")
wms_jobid = environ.get("GLITE_WMS_JOBID")
cream_jobid = environ.get("CREAM_JOBID")
pbs_jobid = environ.get("PBS_JOBID")
if wms_jobid is not None:

if slurm_jobid is not None:
doc["slurm_job_id"] = slurm_jobid
if dirac_jobid is not None:
doc["dirac_job_id"] = dirac_jobid
elif wms_jobid is not None:
if not wms_jobid.startswith("http"):
wms_jobid = None
doc["wms_job_id"] = wms_jobid
Expand All @@ -27,10 +34,14 @@ def add_batch_management_id(doc):
doc["pbs_job_id"] = pbs_jobid


def remove_batch_management_id(doc):
def remove_job_id(doc):
"""
removes all batch id from doc/token
removes all job id from doc/token
"""
if "slurm_job_id" in doc:
del doc["slurm_job_id"]
if "dirac_job_id" in doc:
del doc["dirac_job_id"]
if "wms_job_id" in doc:
del doc["wms_job_id"]
if "cream_job_id" in doc:
Expand Down
13 changes: 5 additions & 8 deletions picas/modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import socket
import time

from os import environ
from . import batchid
from . import jobid


class TokenModifier:
Expand Down Expand Up @@ -64,15 +63,13 @@ def lock(self, token):
@return: modified token.
"""

dirac_jobid = environ.get("DIRACJOBID")
lock_content = {
'hostname': socket.gethostname(),
'lock': int(time.time()),
'dirac_jobid': dirac_jobid
}

# try to include glite wms job id if present
batchid.add_batch_management_id(token)
# try to include job id if present
jobid.add_job_id(token)

token.update(lock_content)
return token
Expand All @@ -87,7 +84,7 @@ def unlock(self, token):
'hostname': socket.gethostname(),
'lock': 0
}
batchid.remove_batch_management_id(token)
jobid.remove_job_id(token)

token.update(lock_content)
return token
Expand Down Expand Up @@ -115,7 +112,7 @@ def unclose(self, token):
'done': 0
}
token.update(done_content)
batchid.remove_batch_management_id(token)
jobid.remove_job_id(token)
return token

def add_output(self, token, output):
Expand Down
36 changes: 36 additions & 0 deletions tests/test_jobid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import unittest
from os import environ

from picas.jobid import add_job_id, remove_job_id
from picas.documents import Document


class TestJobid(unittest.TestCase):

def setUp(self):
self.doc = Document()
self.env_vars = {
"DIRACJOBID": "dirac_job_id",
"SLURM_JOB_ID": "slurm_job_id",
"GLITE_WMS_JOBID": "wms_job_id",
"CREAM_JOBID": "cream_job_id",
"PBS_JOBID": "pbs_job_id"
}

def test_add_job_id(self):
""" Test if job_id is added to token"""
for test in self.env_vars:
lnauta marked this conversation as resolved.
Show resolved Hide resolved
environ[test] = "http/jobid"
add_job_id(self.doc)
self.assertTrue(self.doc[self.env_vars[test]] == "http/jobid")
environ.pop(test)
environ["GLITE_WMS_JOBID"] = "jobid"
add_job_id(self.doc)
self.assertTrue(self.doc["wms_job_id"] is None)

def test_remove_job_id(self):
""" Test if job_id is removed from token"""
for test in self.env_vars.values():
self.doc[test] = "jobid"
remove_job_id(self.doc)
self.assertRaises(KeyError, self.doc.__getitem__, test)
1 change: 0 additions & 1 deletion tests/test_modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def test_lock(self):
self.modifier.lock(self.token)
self.assertTrue(self.token['hostname'] != "")
self.assertTrue(self.token['lock'] > 0)
self.assertTrue(self.token['dirac_jobid'] is None)

def test_unlock(self):
self.modifier.unlock(self.token)
Expand Down