Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPD-410: stop functionality #16

Merged
merged 36 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c15f026
Add RunActor base and RunActor stopping child
lnauta Jul 19, 2024
ce1fd5c
Add Stop in Actor and example
lnauta Jul 19, 2024
3a3acf6
Add stop logic to run method
lnauta Jul 22, 2024
f8bde84
Fix time limit stop fn in actors
lnauta Jul 22, 2024
d1bd311
Add handler when picas is killed
lnauta Jul 25, 2024
2ca2246
Rename reset value
lnauta Jul 25, 2024
86fbb7a
Change executor and handler to gracefully kill token proc
lnauta Jul 25, 2024
d85c94d
Change executor and handler to gracefully kill token proc
lnauta Jul 25, 2024
e12ccd8
Add no resetting of token option
lnauta Jul 26, 2024
26b613d
Add unittests stopping code
lnauta Jul 29, 2024
e7f3fa0
Add handler tests
lnauta Jul 30, 2024
ec303b0
Add docstrings actor tests
lnauta Jul 30, 2024
04e6f8d
Add logger to actors
lnauta Jul 30, 2024
a1a5878
Fix import actors: subprocess
lnauta Jul 30, 2024
46f2251
Fix reset bug on final finished token
lnauta Jul 31, 2024
f2259e8
Fix stop fn failing on Nonetype
lnauta Aug 1, 2024
782d5f9
Fix reset token to use done instead of exit
lnauta Aug 1, 2024
d87cbbe
Improve exit code default
lnauta Aug 1, 2024
a252432
Update test log assertion
lnauta Aug 1, 2024
6244b8c
Add token reset fix for when running is ended
lnauta Aug 2, 2024
1922e84
Clean local example
lnauta Aug 2, 2024
b8eb4a7
Fix style
lnauta Aug 12, 2024
5284b8f
Fix style
lnauta Aug 12, 2024
feb96d4
Merge branch 'master' into SPD-410
lnauta Sep 3, 2024
9fff08c
Fix test return value
lnauta Sep 3, 2024
64dec5f
Clean test
lnauta Sep 3, 2024
610f99d
Update picas/actors.py
lnauta Sep 4, 2024
9237687
Add description to docstring actors
lnauta Sep 4, 2024
ebf1e8f
Fix execute proc fn
lnauta Sep 4, 2024
ee69025
Fix execute proc fn
lnauta Sep 4, 2024
5f426ad
Change actor names for backwards compatibility
lnauta Sep 6, 2024
b4f623e
Fix typo
lnauta Sep 12, 2024
4050ce0
Update examples/local-example.py
lnauta Sep 16, 2024
7584f1f
Update examples/local-example.py
lnauta Sep 16, 2024
8c4cd63
Update picas/executers.py
lnauta Sep 16, 2024
68f7898
Revert executers test back
lnauta Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions examples/local-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,33 @@
Run main job (process_task.sh) with the input argument
When done, return the exit code to the token
Attach the logs to the token


'''

#python imports
import logging
import os
import time
import couchdb
import picasconfig

#picas imports
from picas.actors import RunActor
from picas.actors import RunActor, RunActorWithStop
lnauta marked this conversation as resolved.
Show resolved Hide resolved
from picas.clients import CouchDB
from picas.executers import execute
from picas.iterators import TaskViewIterator
from picas.iterators import EndlessViewIterator
from picas.modifiers import BasicTokenModifier
from picas.executers import execute
from picas.util import Timer

class ExampleActor(RunActor):
log = logging.getLogger(__name__)

class ExampleActor(RunActorWithStop):
lnauta marked this conversation as resolved.
Show resolved Hide resolved
"""
The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.
Feel free to adjust to whatever you need, a template can be found at: example-template.py
"""
def __init__(self, db, modifier, view="todo", **viewargs):
super(ExampleActor, self).__init__(db, view=view, **viewargs)
self.timer = Timer()
self.iterator = EndlessViewIterator(self.iterator, stop_callback=self.time_elapsed) # overwrite default iterator from super().init()
self.iterator = EndlessViewIterator(self.iterator)
self.modifier = modifier
self.client = db

Expand All @@ -52,12 +51,12 @@ def process_task(self, token):
# /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out
command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out"

out = execute(command,shell=True)
out = execute(command, shell=True)
self.subprocess = out[0]

## Get the job exit code in the token
token['exit_code'] = out[0]
# Get the job exit code and done in the token
token['exit_code'] = out[1]
token = self.modifier.close(token)
#self.client.db[token['_id']] = token # necessary?

# Attach logs in token
curdate = time.strftime("%d/%m/%Y_%H:%M:%S_")
Expand Down Expand Up @@ -93,7 +92,7 @@ def main():
# Create actor
actor = ExampleActor(client, modifier)
# Start work!
actor.run()
actor.run(max_tasks=2, stop_function=actor.time_elapsed, elapsed=11)
hailihu marked this conversation as resolved.
Show resolved Hide resolved

if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion examples/pushTokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def loadTokens(db):
'exit_code': ''
}
tokens.append(token)
i = i +1
i = i + 1
db.update(tokens)

def get_db():
Expand Down
210 changes: 168 additions & 42 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,91 +5,217 @@
@author: Jan Bot, Joris Borgdorff
"""

from couchdb.http import ResourceConflict
import logging
import signal
import subprocess

from .util import Timer
from .iterators import TaskViewIterator
from .picaslogger import picaslogger
from .iterators import TaskViewIterator, EndlessViewIterator

from couchdb.http import ResourceConflict

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

class RunActor:
lnauta marked this conversation as resolved.
Show resolved Hide resolved

"""Executor class to be overwritten in the client implementation.
class AbstractRunActor(object):
"""
Executor class to be overwritten in the client implementation.
"""

def __init__(self, db, iterator=None, view='todo', **view_params):
def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], **view_params):
"""
@param database: the database to get the tasks from.
@param token_reset_values: values to use in the token when PiCaS is terminated, defaults to values of 'todo' ([0,0])
"""
if db is None:
raise ValueError("Database must be initialized")
self.db = db
self.iterator = iterator
self.token_reset_values = token_reset_values

# current task is needed to reset it when PiCaS is killed
self.current_task = None
# the subprocess running the token code is necessary s.t. the handler can cleanly kill it
self.subprocess = None
self.tasks_processed = 0

self.iterator = iterator
if iterator is None:
self.iterator = TaskViewIterator(self.db, view, **view_params)
else:
self.iterator = iterator

def run(self, maxtime=None, avg_time_factor=0.0):
"""Run method of the actor, executes the application code by iterating
def _run(self, task):
"""
Execution of the work on the iterator used in the run method.
"""
self.prepare_run()
# current task is set s.t. the handler can reset the task that is being worked on
self.current_task = task

try:
self.process_task(task)
except Exception as ex:
msg = ("Exception {0} occurred during processing: {1}"
.format(type(ex), ex))
task.error(msg, exception=ex)
log.info(msg)

while True:
try:
self.db.save(task)
break
except ResourceConflict:
# simply overwrite changes - model results are more
# important
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev

self.cleanup_run()
self.tasks_processed += 1

def run(self):
lnauta marked this conversation as resolved.
Show resolved Hide resolved
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB.
"""
time = Timer()
# The error handler for when SLURM (or other scheduler / user) kills PiCaS, to reset the
# token back to 'todo' state (or other state defined through the token_reset_values)
self.setup_handler()

self.time = Timer()
self.prepare_env()
try:
for task in self.iterator:
self.prepare_run()

try:
self.process_task(task)
except Exception as ex:
msg = f"Exception {type(ex)} occurred during processing: {ex}"
task.error(msg, exception=ex)
picaslogger.info(msg)

while True:
try:
self.db.save(task)
break
except ResourceConflict:
# simply overwrite changes - model results are more
# important
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev

self.cleanup_run()
self.tasks_processed += 1

if maxtime is not None:
will_elapse = ((avg_time_factor + self.tasks_processed) *
time.elapsed() / self.tasks_processed)
if will_elapse > maxtime:
break
self._run(task)
self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()

def handler(self, signum, frame):
"""
Signal handler method. It sets the tokens values of 'lock' and 'done' fields to the values
passed to token_reset_values. This method ensures that when PiCaS is killed by the
scheduler or user, it automatically resets the token that was being worked on back to some
state (default: 'todo' state).

@param signum: signal to listen to and act upon
@param frame: stack frame, defaults to None, see https://docs.python.org/3/library/signal.html#signal.signal
"""
log.info(f'PiCaS shutting down, called with signal {signum}')

# gracefully kill the process running token code, it needs to stop before we update the token state
if self.subprocess and self.subprocess.poll() is None:
log.info('Terminating execution of token')
self.subprocess.terminate()
try:
self.subprocess.communicate(timeout=30) # wait 30 seconds for termination, value chosen to allow complex processes to stop
except subprocess.TimeoutExpired:
log.info('Killing subprocess')
self.subprocess.kill()
self.subprocess.communicate()

# update the token state, if reset vaue is None, do nothing.
if self.current_task and self.token_reset_values is not None:
self.current_task['lock'] = self.token_reset_values[0]
self.current_task['done'] = self.token_reset_values[1]
self.db.save(self.current_task)

self.cleanup_env()
exit(0)

def setup_handler(self):
"""
Method to set up the handler in the run method with lower redundancy
"""
log.info('Setting up signal handlers')
signal.signal(signal.SIGTERM, self.handler)
signal.signal(signal.SIGINT, self.handler)

def prepare_env(self, *args, **kwargs):
"""Method to be called to prepare the environment to run the
"""
Method to be called to prepare the environment to run the
application.
"""

def prepare_run(self, *args, **kwargs):
"""Code to run before a task gets processed. Used e.g. for fetching
"""
Code to run before a task gets processed. Used e.g. for fetching
inputs.
"""

def process_task(self, task):
"""The function to override, which processes the tasks themselves.
"""
The function to override, which processes the tasks themselves.
@param task: the task to process
"""
raise NotImplementedError

def cleanup_run(self, *args, **kwargs):
"""Code to run after a task has been processed.
"""
Code to run after a task has been processed.
"""

def cleanup_env(self, *args, **kwargs):
"""Method which gets called after the run method has completed.
"""
Method which gets called after the run method has completed.
"""


class RunActor(AbstractRunActor):
"""
RunActor class with added stopping functionality.
"""

def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args):
lnauta marked this conversation as resolved.
Show resolved Hide resolved
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB, including stop logic. The stop
logic is also extended into the EndlessViewIterator to break it when
the condition is met, otherwise it never stops.

@param max_time: maximum time to run picas before stopping
@param avg_time_factor: used for estimating when to stop with `max_time`,
value is average time per token to run
@param max_tasks: number of tasks that are performed before stopping
@param stop_function: custom function to stop the execution, must return bool
@param stop_function_args: kwargs to supply to stop_function
"""
self.time = Timer()
self.prepare_env()

# handler needs to be setup in overwritten method
self.setup_handler()

# Special case to break the while loop of the EndlessViewIterator:
# The while loop cant reach the stop condition in the for loop below,
# so pass the condition into the stop mechanism of the EVI, then the
# iterator is stopped from EVI and not the RunActorWithStop
if isinstance(self.iterator, EndlessViewIterator):
self.iterator.stop_callback = stop_function
self.iterator.stop_callback_args = stop_function_args

try:
for task in self.iterator:
self._run(task)

logging.debug("Tasks executed: ", self.tasks_processed)

if (stop_function is not None and
stop_function(**stop_function_args)):
break

# break if number of tasks processed is max set
if max_tasks and self.tasks_processed == max_tasks:
break

if max_time is not None:
# for a large number of tokens the avg time will be better (due to statistics)
# resulting in a better estimate of whether time.elapsed + avg_time (what will
# be added on the next iteration) is larger than the max_time.
will_elapse = (self.time.elapsed() + avg_time_factor)
if will_elapse > max_time:
break
self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()
1 change: 1 addition & 0 deletions picas/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class Task(Document):
'hostname': '',
'scrub_count': 0,
'input': {},
'exit_code': '',
'output': {},
'uploads': {},
'error': [],
Expand Down
3 changes: 2 additions & 1 deletion picas/executers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ def execute(args, shell=False):
return (proc.returncode, stdout, stderr)
lnauta marked this conversation as resolved.
Show resolved Hide resolved



def execute_old(cmd):
"""Helper functino to execute an external application.
"""Helper function to execute an external application.
@param cmd: the command to be executed.
@return the exit code of the executed program.
"""
Expand Down
5 changes: 3 additions & 2 deletions picas/srm.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def remote_exists(self, loc):
surl = self.srm_host + loc
cmd = ['srmls', surl]
picaslogger.info(" ".join(cmd))
(returncode, stdout, _) = execute(cmd)
(proc, returncode, stdout, _) = execute(cmd)

if returncode == 0:
bn = path.basename(loc)
lines = stdout.split("\n")
Expand Down Expand Up @@ -150,7 +151,7 @@ def upload(self, local_file, srm_dir, check=False):
cmd = ['srmcp', '-2', '-server_mode=passive',
'file:///' + local_file, srm_url]
picaslogger.info(cmd)
(returncode, _, _) = execute(cmd)
(proc, returncode, _, _) = execute(cmd)
if returncode == 0:
pass
else:
Expand Down
Loading