From e80adfed3812eb536c1eacb9da9da4b1cc881dfb Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Fri, 2 Aug 2024 13:37:27 +0200 Subject: [PATCH 01/13] Add scrubbing example --- examples/scrub-example.py | 91 +++++++++++++++++++++++++++++++++++++++ examples/scrubExample.txt | 3 ++ 2 files changed, 94 insertions(+) create mode 100755 examples/scrub-example.py create mode 100644 examples/scrubExample.txt diff --git a/examples/scrub-example.py b/examples/scrub-example.py new file mode 100755 index 0000000..6ccc56a --- /dev/null +++ b/examples/scrub-example.py @@ -0,0 +1,91 @@ +import logging +import os +import time +import couchdb +import picasconfig + +from picas.actors import RunActor, RunActorWithStop +from picas.clients import CouchDB +from picas.iterators import TaskViewIterator +from picas.iterators import EndlessViewIterator +from picas.modifiers import BasicTokenModifier +from picas.executers import execute +from picas.util import Timer + +log = logging.getLogger("Scrub example") + +class ExampleActor(RunActorWithStop): + """ + The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. + Example for scrubbing tokens and rerunning them. + """ + def __init__(self, db, modifier, view="todo", scrub_count=2, **viewargs): + super(ExampleActor, self).__init__(db, view=view, **viewargs) + self.timer = Timer() + self.iterator = EndlessViewIterator(self.iterator) + self.modifier = modifier + self.client = db + self.scrub_limit = scrub_count + + def process_task(self, token): + # Print token information + print("-----------------------") + print("Working on token: " +token['_id']) + for key, value in token.doc.items(): + print(key, value) + print("-----------------------") + + # Start running the main job + # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out + command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out" + + out = execute(command, shell=True) + self.subprocess = out[0] + + # Get the job exit code and done in the token + token['exit_code'] = out[1] + token = self.modifier.close(token) + + if (token['scrub_count'] < self.scrub_limit) and (out[1] != 0): + log.info(f"Scrubbing token {token['_id']}") + token = self.modifier.unclose(token) + token.scrub() + + # Attach logs in token + curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") + try: + logsout = "logs_" + str(token['_id']) + ".out" + log_handle = open(logsout, 'rb') + token.put_attachment(logsout, log_handle.read()) + + logserr = "logs_" + str(token['_id']) + ".err" + log_handle = open(logserr, 'rb') + token.put_attachment(logserr, log_handle.read()) + except: + pass + + def time_elapsed(self, elapsed=30.): + """ + This function returns whether the class has been alive for more than `elapsed` seconds. This is needed because currently the maxtime argument in RunActor.run is broken: + The run method will break when the iterator is non-empty and then it checks if the maxtime has passed. If the iterator stays empty, it will run until a new token is + processed, and after processing the if statement is true, and run breaks. + + @param elapsed: lifetime of the Actor in seconds + + @returns: bool + """ + return self.timer.elapsed() > elapsed + +def main(): + # setup connection to db + client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD) + print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE)) + # Create token modifier + modifier = BasicTokenModifier() + # Create actor + actor = ExampleActor(client, modifier, scrub_count=2) + # Start work! + actor.run() + +if __name__ == '__main__': + main() diff --git a/examples/scrubExample.txt b/examples/scrubExample.txt new file mode 100644 index 0000000..46e7a81 --- /dev/null +++ b/examples/scrubExample.txt @@ -0,0 +1,3 @@ +sleep 9 +exit(1) +sleep 9 From f9caf9e17661e3da6ce2643b332c26298dd55c4f Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Fri, 2 Aug 2024 13:54:26 +0200 Subject: [PATCH 02/13] Clean scrub example --- examples/scrub-example.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/examples/scrub-example.py b/examples/scrub-example.py index 6ccc56a..8ea8f7c 100755 --- a/examples/scrub-example.py +++ b/examples/scrub-example.py @@ -4,9 +4,8 @@ import couchdb import picasconfig -from picas.actors import RunActor, RunActorWithStop +from picas.actors import RunActorWithStop from picas.clients import CouchDB -from picas.iterators import TaskViewIterator from picas.iterators import EndlessViewIterator from picas.modifiers import BasicTokenModifier from picas.executers import execute @@ -64,18 +63,6 @@ def process_task(self, token): except: pass - def time_elapsed(self, elapsed=30.): - """ - This function returns whether the class has been alive for more than `elapsed` seconds. This is needed because currently the maxtime argument in RunActor.run is broken: - The run method will break when the iterator is non-empty and then it checks if the maxtime has passed. If the iterator stays empty, it will run until a new token is - processed, and after processing the if statement is true, and run breaks. - - @param elapsed: lifetime of the Actor in seconds - - @returns: bool - """ - return self.timer.elapsed() > elapsed - def main(): # setup connection to db client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD) From 73e9fdb0a0ce9b17999f307a507ca36253b6d039 Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Fri, 2 Aug 2024 15:17:51 +0200 Subject: [PATCH 03/13] Add scrub after time expires example --- examples/scrub-timer-example.py | 79 +++++++++++++++++++++++++++++++++ examples/scrubExample.txt | 4 +- examples/scrubTimerExample.txt | 3 ++ 3 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 examples/scrub-timer-example.py create mode 100644 examples/scrubTimerExample.txt diff --git a/examples/scrub-timer-example.py b/examples/scrub-timer-example.py new file mode 100644 index 0000000..17b6724 --- /dev/null +++ b/examples/scrub-timer-example.py @@ -0,0 +1,79 @@ +import logging +import os +import time +import couchdb +import picasconfig + +from picas.actors import RunActorWithStop +from picas.clients import CouchDB +from picas.iterators import EndlessViewIterator +from picas.modifiers import BasicTokenModifier +from picas.executers import execute +from picas.util import Timer + +log = logging.getLogger("Scrub example") + +class ExampleActor(RunActorWithStop): + """ + The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. + Example for scrubbing tokens and rerunning them. + """ + def __init__(self, db, modifier, view="todo", time_limit=1, scrub_count=0, **viewargs): + super(ExampleActor, self).__init__(db, view=view, **viewargs) + self.timer = Timer() + self.iterator = EndlessViewIterator(self.iterator) + self.modifier = modifier + self.client = db + self.time_limit = time_limit + self.scrub_limit = scrub_count + + def process_task(self, token): + # Print token information + print("-----------------------") + print("Working on token: " +token['_id']) + for key, value in token.doc.items(): + print(key, value) + print("-----------------------") + + # Start running the main job + # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out + command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out" + + out = execute(command, shell=True) + self.subprocess = out[0] + + # Get the job exit code and done in the token + token['exit_code'] = out[1] + token = self.modifier.close(token) + + if (self.time_limit < self.timer.elapsed()) and (token['scrub_count'] < self.scrub_limit): + log.info(f"Scrubbing token {token['_id']}") + token = self.modifier.unclose(token) + token.scrub() + + # Attach logs in token + curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") + try: + logsout = "logs_" + str(token['_id']) + ".out" + log_handle = open(logsout, 'rb') + token.put_attachment(logsout, log_handle.read()) + + logserr = "logs_" + str(token['_id']) + ".err" + log_handle = open(logserr, 'rb') + token.put_attachment(logserr, log_handle.read()) + except: + pass + +def main(): + # setup connection to db + client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD) + print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE)) + # Create token modifier + modifier = BasicTokenModifier() + # Create actor + actor = ExampleActor(client, modifier, time_limit=3, scrub_count=2) + # Start work! + actor.run() + +if __name__ == '__main__': + main() diff --git a/examples/scrubExample.txt b/examples/scrubExample.txt index 46e7a81..b3ce0ce 100644 --- a/examples/scrubExample.txt +++ b/examples/scrubExample.txt @@ -1,3 +1,3 @@ -sleep 9 +sleep 10 exit(1) -sleep 9 +sleep 10 diff --git a/examples/scrubTimerExample.txt b/examples/scrubTimerExample.txt new file mode 100644 index 0000000..f3157dc --- /dev/null +++ b/examples/scrubTimerExample.txt @@ -0,0 +1,3 @@ +sleep 5 +sleep 5 +sleep 5 From 6813625da9fe029fe17c629ee36af7757a295e77 Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Fri, 2 Aug 2024 15:46:01 +0200 Subject: [PATCH 04/13] Add notes to scrub examples --- examples/scrub-example.py | 5 ++++- examples/scrub-timer-example.py | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/examples/scrub-example.py b/examples/scrub-example.py index 8ea8f7c..66ae5e4 100755 --- a/examples/scrub-example.py +++ b/examples/scrub-example.py @@ -16,7 +16,8 @@ class ExampleActor(RunActorWithStop): """ The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. - Example for scrubbing tokens and rerunning them. + Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish + properly and the user wants to rerun it. """ def __init__(self, db, modifier, view="todo", scrub_count=2, **viewargs): super(ExampleActor, self).__init__(db, view=view, **viewargs) @@ -24,6 +25,7 @@ def __init__(self, db, modifier, view="todo", scrub_count=2, **viewargs): self.iterator = EndlessViewIterator(self.iterator) self.modifier = modifier self.client = db + # scrub limit is the amount of retries self.scrub_limit = scrub_count def process_task(self, token): @@ -45,6 +47,7 @@ def process_task(self, token): token['exit_code'] = out[1] token = self.modifier.close(token) + # Scrub the token N times if it failed, scrubbing puts it back in 'todo' state if (token['scrub_count'] < self.scrub_limit) and (out[1] != 0): log.info(f"Scrubbing token {token['_id']}") token = self.modifier.unclose(token) diff --git a/examples/scrub-timer-example.py b/examples/scrub-timer-example.py index 17b6724..778286c 100644 --- a/examples/scrub-timer-example.py +++ b/examples/scrub-timer-example.py @@ -11,12 +11,13 @@ from picas.executers import execute from picas.util import Timer -log = logging.getLogger("Scrub example") +log = logging.getLogger("Scrub with timer example") class ExampleActor(RunActorWithStop): """ The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. - Example for scrubbing tokens and rerunning them. + Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish + within a given time limit and the user wants to rerun it. """ def __init__(self, db, modifier, view="todo", time_limit=1, scrub_count=0, **viewargs): super(ExampleActor, self).__init__(db, view=view, **viewargs) @@ -46,6 +47,7 @@ def process_task(self, token): token['exit_code'] = out[1] token = self.modifier.close(token) + # Scrub the token N times if it went over time, scrubbing puts it back in 'todo' state if (self.time_limit < self.timer.elapsed()) and (token['scrub_count'] < self.scrub_limit): log.info(f"Scrubbing token {token['_id']}") token = self.modifier.unclose(token) From ff36c5fb4ad2cab6550904593d465e00277f4328 Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Fri, 2 Aug 2024 16:39:04 +0200 Subject: [PATCH 05/13] Clean local example --- examples/local-example.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/local-example.py b/examples/local-example.py index bb26739..355fd24 100755 --- a/examples/local-example.py +++ b/examples/local-example.py @@ -13,13 +13,12 @@ ''' -#python imports +import logging import os import time import couchdb import picasconfig -#picas imports from picas.actors import RunActor, RunActorWithStop from picas.clients import CouchDB from picas.iterators import TaskViewIterator @@ -28,6 +27,8 @@ from picas.executers import execute from picas.util import Timer +log = logging.getLogger(__name__) + class ExampleActor(RunActorWithStop): """ The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. @@ -36,7 +37,7 @@ class ExampleActor(RunActorWithStop): def __init__(self, db, modifier, view="todo", **viewargs): super(ExampleActor, self).__init__(db, view=view, **viewargs) self.timer = Timer() - self.iterator = EndlessViewIterator(self.iterator)#, stop_callback=self.time_elapsed, elapsed=15) # overwrite default iterator from super().init() + self.iterator = EndlessViewIterator(self.iterator) self.modifier = modifier self.client = db @@ -55,10 +56,9 @@ def process_task(self, token): out = execute(command, shell=True) self.subprocess = out[0] - ## Get the job exit code in the token + # Get the job exit code and done in the token token['exit_code'] = out[1] token = self.modifier.close(token) - #self.client.db[token['_id']] = token # necessary? # Attach logs in token curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") @@ -94,7 +94,7 @@ def main(): # Create actor actor = ExampleActor(client, modifier) # Start work! - actor.run(max_tasks=2, stop_function=actor.time_elapsed, elapsed=11) + actor.run() if __name__ == '__main__': main() From 9961c7b0f3bf894d8b4d1e6fe5800742ce58b5da Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Thu, 26 Sep 2024 10:06:25 +0200 Subject: [PATCH 06/13] Add stopit fnality to stop single token --- picas/actors.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/picas/actors.py b/picas/actors.py index c7b3b18..6924771 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -13,6 +13,7 @@ from .iterators import TaskViewIterator, EndlessViewIterator from couchdb.http import ResourceConflict +from stopit import threading_timeoutable as timeoutable logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) @@ -45,7 +46,7 @@ def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], ** else: self.iterator = iterator - def _run(self, task): + def _run(self, task, timeout): """ Execution of the work on the iterator used in the run method. """ @@ -54,7 +55,7 @@ def _run(self, task): self.current_task = task try: - self.process_task(task) + self.process_task(task, timeout=timeout) except Exception as ex: msg = ("Exception {0} occurred during processing: {1}" .format(type(ex), ex)) @@ -74,7 +75,7 @@ def _run(self, task): self.cleanup_run() self.tasks_processed += 1 - def run(self): + def run(self, max_token_time=None): """ Run method of the actor, executes the application code by iterating over the available tasks in CouchDB. @@ -87,7 +88,7 @@ def run(self): self.prepare_env() try: for task in self.iterator: - self._run(task) + self._run(task, timeout=max_token_time) self.current_task = None # set to None so the handler leaves the token alone when picas is killed finally: self.cleanup_env() @@ -144,6 +145,7 @@ def prepare_run(self, *args, **kwargs): inputs. """ + @timeoutable() def process_task(self, task): """ The function to override, which processes the tasks themselves. @@ -167,13 +169,14 @@ class RunActor(AbstractRunActor): RunActor class with added stopping functionality. """ - def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args): + def run(self, max_token_time=None, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args): """ Run method of the actor, executes the application code by iterating over the available tasks in CouchDB, including stop logic. The stop logic is also extended into the EndlessViewIterator to break it when the condition is met, otherwise it never stops. + @param max_token_time: maximum time to run a single token before stopping @param max_time: maximum time to run picas before stopping @param avg_time_factor: used for estimating when to stop with `max_time`, value is average time per token to run @@ -197,7 +200,7 @@ def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=Non try: for task in self.iterator: - self._run(task) + self._run(task, timeout=max_token_time) logging.debug("Tasks executed: ", self.tasks_processed) From 73d46c7bacc6c444c3fa043a269405711e7adb6d Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Thu, 26 Sep 2024 10:07:29 +0200 Subject: [PATCH 07/13] Add scrubbing to the handler --- picas/actors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/picas/actors.py b/picas/actors.py index 6924771..bd6ce04 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -118,6 +118,8 @@ def handler(self, signum, frame): # update the token state, if reset vaue is None, do nothing. if self.current_task and self.token_reset_values is not None: + # scrub goes first, as it reset lock and done to defaults, which could be overwritten below + self.current_task.scrub() self.current_task['lock'] = self.token_reset_values[0] self.current_task['done'] = self.token_reset_values[1] self.db.save(self.current_task) From 90e529195037565b47de3f49490586392552fcec Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Thu, 26 Sep 2024 10:09:05 +0200 Subject: [PATCH 08/13] Add stopit to CICD --- .github/workflows/python-app.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 08012ef..49257c3 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -34,7 +34,7 @@ jobs: python -m pip install --upgrade pip pip install -U . pip install -U ".[test]" - pip install flake8 pytest nose + pip install flake8 pytest nose stopit if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Lint with flake8 run: | From d9b37eaae6e76d6657a5482befe10061a1f667a6 Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Thu, 26 Sep 2024 10:19:29 +0200 Subject: [PATCH 09/13] Add decorator to tests b/c inheritance --- tests/test_mock.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_mock.py b/tests/test_mock.py index aa3d34a..111c308 100644 --- a/tests/test_mock.py +++ b/tests/test_mock.py @@ -1,6 +1,7 @@ import random from picas.actors import AbstractRunActor, RunActor from picas.documents import Document +from stopit import threading_timeoutable as timeoutable class MockDB(object): @@ -48,6 +49,7 @@ def __init__(self, callback): self.callback = callback + @timeoutable(default=None) def process_task(self, task): self.callback(task) @@ -60,5 +62,6 @@ def __init__(self, callback): self.callback = callback + @timeoutable(default=None) def process_task(self, task): self.callback(task) From 01bbc4ac0f772df97ed6431702c1984ae3f0a4aa Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Thu, 26 Sep 2024 10:23:07 +0200 Subject: [PATCH 10/13] Add setuptools for p3.12 --- .github/workflows/python-app.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 49257c3..2fb4157 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -34,7 +34,7 @@ jobs: python -m pip install --upgrade pip pip install -U . pip install -U ".[test]" - pip install flake8 pytest nose stopit + pip install flake8 pytest nose stopit setuptools if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Lint with flake8 run: | From 941efba7905820cc8548fe3f5f36aaa3b0ba9f16 Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Thu, 26 Sep 2024 10:24:29 +0200 Subject: [PATCH 11/13] Improve setup requirements --- picas/actors.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/picas/actors.py b/picas/actors.py index bd6ce04..bc50089 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -147,7 +147,7 @@ def prepare_run(self, *args, **kwargs): inputs. """ - @timeoutable() + @timeoutable(default=None) def process_task(self, task): """ The function to override, which processes the tasks themselves. diff --git a/setup.py b/setup.py index a0673b1..adab77f 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ url='https://github.com/sara-nl/picasclient', download_url='https://github.com/sara-nl/picasclient/tarball/0.3.0', packages=['picas'], - install_requires=['couchdb'], + install_requires=['couchdb', 'stopit'], license="MIT", extras_require={ 'test': ['flake8', 'pytest'], From 4d4de45af1d2bfe670449b50415ae375a817b14b Mon Sep 17 00:00:00 2001 From: Haili Hu Date: Mon, 7 Oct 2024 15:15:04 +0200 Subject: [PATCH 12/13] SPD-409 fixes (#23) * Catch TimeoutExeption * Move scrubbing to RunActor class * Simplify max_time implementation * Fix tests * Check max_total_time for all iterators * Added unit tests --------- Co-authored-by: hailihu@gmail.com --- examples/local-example.py | 13 +----- examples/scrub-example.py | 81 --------------------------------- examples/scrub-timer-example.py | 81 --------------------------------- examples/scrubExample.txt | 3 -- examples/scrubTimerExample.txt | 3 -- picas/actors.py | 57 ++++++++++++----------- picas/util.py | 11 +++++ tests/test_actors.py | 52 ++++++++++++++++++--- tests/test_mock.py | 26 +++++++++-- 9 files changed, 109 insertions(+), 218 deletions(-) delete mode 100755 examples/scrub-example.py delete mode 100644 examples/scrub-timer-example.py delete mode 100644 examples/scrubExample.txt delete mode 100644 examples/scrubTimerExample.txt diff --git a/examples/local-example.py b/examples/local-example.py index 4628840..fc0100a 100755 --- a/examples/local-example.py +++ b/examples/local-example.py @@ -71,17 +71,6 @@ def process_task(self, token): except: pass - def time_elapsed(self, elapsed=30.): - """ - This function returns whether the class has been alive for more than `elapsed` seconds. This is needed because currently the maxtime argument in RunActor.run is broken: - The run method will break when the iterator is non-empty and then it checks if the maxtime has passed. If the iterator stays empty, it will run until a new token is - processed, and after processing the if statement is true, and run breaks. - - @param elapsed: lifetime of the Actor in seconds - - @returns: bool - """ - return self.timer.elapsed() > elapsed def main(): # setup connection to db @@ -92,7 +81,7 @@ def main(): # Create actor actor = ExampleActor(client, modifier) # Start work! - actor.run() + actor.run(max_token_time=10, max_total_time=100, max_tasks=10, max_scrub=2) if __name__ == '__main__': main() diff --git a/examples/scrub-example.py b/examples/scrub-example.py deleted file mode 100755 index 66ae5e4..0000000 --- a/examples/scrub-example.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -import os -import time -import couchdb -import picasconfig - -from picas.actors import RunActorWithStop -from picas.clients import CouchDB -from picas.iterators import EndlessViewIterator -from picas.modifiers import BasicTokenModifier -from picas.executers import execute -from picas.util import Timer - -log = logging.getLogger("Scrub example") - -class ExampleActor(RunActorWithStop): - """ - The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. - Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish - properly and the user wants to rerun it. - """ - def __init__(self, db, modifier, view="todo", scrub_count=2, **viewargs): - super(ExampleActor, self).__init__(db, view=view, **viewargs) - self.timer = Timer() - self.iterator = EndlessViewIterator(self.iterator) - self.modifier = modifier - self.client = db - # scrub limit is the amount of retries - self.scrub_limit = scrub_count - - def process_task(self, token): - # Print token information - print("-----------------------") - print("Working on token: " +token['_id']) - for key, value in token.doc.items(): - print(key, value) - print("-----------------------") - - # Start running the main job - # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out - command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out" - - out = execute(command, shell=True) - self.subprocess = out[0] - - # Get the job exit code and done in the token - token['exit_code'] = out[1] - token = self.modifier.close(token) - - # Scrub the token N times if it failed, scrubbing puts it back in 'todo' state - if (token['scrub_count'] < self.scrub_limit) and (out[1] != 0): - log.info(f"Scrubbing token {token['_id']}") - token = self.modifier.unclose(token) - token.scrub() - - # Attach logs in token - curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") - try: - logsout = "logs_" + str(token['_id']) + ".out" - log_handle = open(logsout, 'rb') - token.put_attachment(logsout, log_handle.read()) - - logserr = "logs_" + str(token['_id']) + ".err" - log_handle = open(logserr, 'rb') - token.put_attachment(logserr, log_handle.read()) - except: - pass - -def main(): - # setup connection to db - client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD) - print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE)) - # Create token modifier - modifier = BasicTokenModifier() - # Create actor - actor = ExampleActor(client, modifier, scrub_count=2) - # Start work! - actor.run() - -if __name__ == '__main__': - main() diff --git a/examples/scrub-timer-example.py b/examples/scrub-timer-example.py deleted file mode 100644 index 778286c..0000000 --- a/examples/scrub-timer-example.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -import os -import time -import couchdb -import picasconfig - -from picas.actors import RunActorWithStop -from picas.clients import CouchDB -from picas.iterators import EndlessViewIterator -from picas.modifiers import BasicTokenModifier -from picas.executers import execute -from picas.util import Timer - -log = logging.getLogger("Scrub with timer example") - -class ExampleActor(RunActorWithStop): - """ - The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. - Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish - within a given time limit and the user wants to rerun it. - """ - def __init__(self, db, modifier, view="todo", time_limit=1, scrub_count=0, **viewargs): - super(ExampleActor, self).__init__(db, view=view, **viewargs) - self.timer = Timer() - self.iterator = EndlessViewIterator(self.iterator) - self.modifier = modifier - self.client = db - self.time_limit = time_limit - self.scrub_limit = scrub_count - - def process_task(self, token): - # Print token information - print("-----------------------") - print("Working on token: " +token['_id']) - for key, value in token.doc.items(): - print(key, value) - print("-----------------------") - - # Start running the main job - # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out - command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out" - - out = execute(command, shell=True) - self.subprocess = out[0] - - # Get the job exit code and done in the token - token['exit_code'] = out[1] - token = self.modifier.close(token) - - # Scrub the token N times if it went over time, scrubbing puts it back in 'todo' state - if (self.time_limit < self.timer.elapsed()) and (token['scrub_count'] < self.scrub_limit): - log.info(f"Scrubbing token {token['_id']}") - token = self.modifier.unclose(token) - token.scrub() - - # Attach logs in token - curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") - try: - logsout = "logs_" + str(token['_id']) + ".out" - log_handle = open(logsout, 'rb') - token.put_attachment(logsout, log_handle.read()) - - logserr = "logs_" + str(token['_id']) + ".err" - log_handle = open(logserr, 'rb') - token.put_attachment(logserr, log_handle.read()) - except: - pass - -def main(): - # setup connection to db - client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD) - print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE)) - # Create token modifier - modifier = BasicTokenModifier() - # Create actor - actor = ExampleActor(client, modifier, time_limit=3, scrub_count=2) - # Start work! - actor.run() - -if __name__ == '__main__': - main() diff --git a/examples/scrubExample.txt b/examples/scrubExample.txt deleted file mode 100644 index b3ce0ce..0000000 --- a/examples/scrubExample.txt +++ /dev/null @@ -1,3 +0,0 @@ -sleep 10 -exit(1) -sleep 10 diff --git a/examples/scrubTimerExample.txt b/examples/scrubTimerExample.txt deleted file mode 100644 index f3157dc..0000000 --- a/examples/scrubTimerExample.txt +++ /dev/null @@ -1,3 +0,0 @@ -sleep 5 -sleep 5 -sleep 5 diff --git a/picas/actors.py b/picas/actors.py index bc50089..adf3374 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -9,14 +9,16 @@ import signal import subprocess -from .util import Timer +from .util import Timer, time_elapsed from .iterators import TaskViewIterator, EndlessViewIterator from couchdb.http import ResourceConflict -from stopit import threading_timeoutable as timeoutable +from stopit import ThreadingTimeout logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) +stopit_logger = logging.getLogger('stopit') +stopit_logger.setLevel(logging.ERROR) class AbstractRunActor(object): @@ -55,13 +57,18 @@ def _run(self, task, timeout): self.current_task = task try: - self.process_task(task, timeout=timeout) + with ThreadingTimeout(timeout, swallow_exc=False) as context_manager: + self.process_task(task) except Exception as ex: msg = ("Exception {0} occurred during processing: {1}" .format(type(ex), ex)) task.error(msg, exception=ex) log.info(msg) + if context_manager.state == context_manager.TIMED_OUT: + msg = ("Token execution exceeded timeout limit of {0} seconds".format(timeout)) + log.info(msg) + while True: try: self.db.save(task) @@ -147,7 +154,6 @@ def prepare_run(self, *args, **kwargs): inputs. """ - @timeoutable(default=None) def process_task(self, task): """ The function to override, which processes the tasks themselves. @@ -165,13 +171,14 @@ def cleanup_env(self, *args, **kwargs): Method which gets called after the run method has completed. """ - + class RunActor(AbstractRunActor): """ RunActor class with added stopping functionality. """ - def run(self, max_token_time=None, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args): + def run(self, max_token_time=None, max_total_time=None, max_tasks=None, max_scrub=0, + stop_function=None, **stop_function_args): """ Run method of the actor, executes the application code by iterating over the available tasks in CouchDB, including stop logic. The stop @@ -179,26 +186,22 @@ def run(self, max_token_time=None, max_time=None, avg_time_factor=0.0, max_tasks the condition is met, otherwise it never stops. @param max_token_time: maximum time to run a single token before stopping - @param max_time: maximum time to run picas before stopping - @param avg_time_factor: used for estimating when to stop with `max_time`, - value is average time per token to run + @param max_total_time: maximum time to run picas before stopping @param max_tasks: number of tasks that are performed before stopping + @param max_scrub: number of times a token can be reset ('scrubbed') after failing @param stop_function: custom function to stop the execution, must return bool @param stop_function_args: kwargs to supply to stop_function """ - self.time = Timer() + timer = Timer() self.prepare_env() # handler needs to be setup in overwritten method self.setup_handler() - # Special case to break the while loop of the EndlessViewIterator: - # The while loop cant reach the stop condition in the for loop below, - # so pass the condition into the stop mechanism of the EVI, then the - # iterator is stopped from EVI and not the RunActorWithStop - if isinstance(self.iterator, EndlessViewIterator): - self.iterator.stop_callback = stop_function - self.iterator.stop_callback_args = stop_function_args + # Break the while loop of the EndlessViewIterator if max_total_time is exceeded + if max_total_time is not None and isinstance(self.iterator, EndlessViewIterator): + self.iterator.stop_callback = time_elapsed + self.iterator.stop_callback_args = {"timer": timer, "max": max_total_time} try: for task in self.iterator: @@ -206,21 +209,23 @@ def run(self, max_token_time=None, max_time=None, avg_time_factor=0.0, max_tasks logging.debug("Tasks executed: ", self.tasks_processed) - if (stop_function is not None and - stop_function(**stop_function_args)): + # Scrub the token if it failed, scrubbing puts it back in 'todo' state + if (task['scrub_count'] < max_scrub) and (task['exit_code'] != 0): + log.info(f"Scrubbing token {task['_id']}") + task.scrub() + self.db.save(task) + + if (stop_function is not None and stop_function(**stop_function_args)): break # break if number of tasks processed is max set if max_tasks and self.tasks_processed == max_tasks: break - if max_time is not None: - # for a large number of tokens the avg time will be better (due to statistics) - # resulting in a better estimate of whether time.elapsed + avg_time (what will - # be added on the next iteration) is larger than the max_time. - will_elapse = (self.time.elapsed() + avg_time_factor) - if will_elapse > max_time: - break + # break if max_total_time is exceeded (needed because only EndlessViewIterator has stop callback) + if max_total_time is not None and timer.elapsed() > max_total_time: + break + self.current_task = None # set to None so the handler leaves the token alone when picas is killed finally: self.cleanup_env() diff --git a/picas/util.py b/picas/util.py index 07f2a33..0be0166 100644 --- a/picas/util.py +++ b/picas/util.py @@ -6,6 +6,17 @@ from copy import deepcopy +def time_elapsed(timer, max=30.): + """ + This function returns True whether the elapsed time is more than `max` seconds. + @param timer: Timer + @param max: maximum allowed time + + @returns: bool + """ + return timer.elapsed() > max + + def merge_dicts(dict1, dict2): """merge two dicts""" merge = deepcopy(dict1) diff --git a/tests/test_actors.py b/tests/test_actors.py index 890fd52..a07133a 100644 --- a/tests/test_actors.py +++ b/tests/test_actors.py @@ -4,7 +4,7 @@ import time import unittest -from test_mock import MockDB, MockRun, MockRunWithStop +from test_mock import MockDB, MockRun, MockRunWithStop, MockRunEmpty from unittest.mock import patch from picas import actors @@ -68,17 +68,55 @@ def _callback_timer(self, task): time.sleep(0.5) # force one token to "take" 0.5 s task['exit_code'] = 0 - def test_max_time(self): + def test_max_total_time(self): """ Test to stop running when the max time is about to be reached. """ self.count = 0 - self.max_time = 0.5 # one token takes 0.5, so it quits after 1 token - self.avg_time_fac = 0.5 - self.test_number = 1 + max_time = 1. runner = MockRunWithStop(self._callback_timer) - runner.run(max_time=self.max_time, avg_time_factor=self.avg_time_fac) - self.assertEqual(self.count, self.test_number) + start = time.time() + runner.run(max_total_time=max_time) + end = time.time() + exec_time = end-start + self.assertAlmostEqual(max_time, exec_time, 1) + + def test_max_total_time_empty(self): + + self.count = 0 + max_time = 1. + runner = MockRunEmpty(self._callback_timer) + start = time.time() + runner.run(max_total_time=max_time) + end = time.time() + exec_time = end-start + self.assertAlmostEqual(max_time, exec_time, 1) + + def _callback_error(self, task): + """ + Callback function that simulates an error. + """ + self.assertTrue(task.id in [t['_id'] for t in MockDB.TASKS]) + self.assertTrue(task['lock'] > 0) + self.count += 1 + task['exit_code'] = 1 + + def test_scrub(self): + """ + Test how many times a token is scrubbed. We can only test max_scrub + 0 or 1 because of the limitations of MockDB. + """ + self.count = 0 + max_scrub = 0 + runner = MockRunWithStop(self._callback_error) + runner.run(max_scrub=max_scrub) + for t in runner.db.saved: + self.assertEqual(runner.db.saved[t]["scrub_count"], max_scrub) + max_scrub = 1 + runner = MockRunWithStop(self._callback_error) + runner.run(max_scrub=max_scrub) + for t in runner.db.saved: + self.assertEqual(runner.db.saved[t]["scrub_count"], max_scrub) @patch('picas.actors.log') @patch('signal.signal') diff --git a/tests/test_mock.py b/tests/test_mock.py index 111c308..bff6a6c 100644 --- a/tests/test_mock.py +++ b/tests/test_mock.py @@ -1,11 +1,13 @@ import random from picas.actors import AbstractRunActor, RunActor from picas.documents import Document -from stopit import threading_timeoutable as timeoutable +from picas.iterators import EndlessViewIterator class MockDB(object): - TASKS = [{'_id': 'a', 'lock': 0}, {'_id': 'b', 'lock': 0}, {'_id': 'c', 'lock': 0}] + TASKS = [{'_id': 'a', 'lock': 0, 'scrub_count': 0}, + {'_id': 'b', 'lock': 0, 'scrub_count': 0}, + {'_id': 'c', 'lock': 0, 'scrub_count': 0}] JOBS = [{'_id': 'myjob'}] def __init__(self): @@ -41,15 +43,18 @@ def save(self, doc): return doc +class EmptyMockDB(MockDB): + TASKS = [] + JOBS = [] + + class MockRun(AbstractRunActor): def __init__(self, callback): db = MockDB() super(MockRun, self).__init__(db) - self.callback = callback - @timeoutable(default=None) def process_task(self, task): self.callback(task) @@ -59,9 +64,20 @@ class MockRunWithStop(RunActor): def __init__(self, callback): db = MockDB() super(MockRunWithStop, self).__init__(db) + self.callback = callback + # self.iterator = EndlessViewIterator(self.iterator) + + def process_task(self, task): + self.callback(task) + +class MockRunEmpty(RunActor): + + def __init__(self, callback): + db = EmptyMockDB() + super(MockRunEmpty, self).__init__(db) self.callback = callback + self.iterator = EndlessViewIterator(self.iterator) - @timeoutable(default=None) def process_task(self, task): self.callback(task) From ea948ea4ebfd2d7512d1f3ea8891eb37693a6b17 Mon Sep 17 00:00:00 2001 From: Haili Hu Date: Thu, 10 Oct 2024 09:33:30 +0200 Subject: [PATCH 13/13] Change errorcode for Exception from -1 to 99 to fix visibility in Picas DB Views (#25) Co-authored-by: hailihu@gmail.com --- picas/documents.py | 6 +++--- picas/modifiers.py | 4 ++-- tests/test_document.py | 4 ++-- tests/test_modifiers.py | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/picas/documents.py b/picas/documents.py index d07dd04..de7757e 100644 --- a/picas/documents.py +++ b/picas/documents.py @@ -265,8 +265,8 @@ def error(self, msg=None, exception=None): if exception is not None: error['exception'] = traceback.format_exc() - self.doc['lock'] = -1 - self.doc['done'] = -1 + self.doc['lock'] = 99 + self.doc['done'] = 99 if 'error' not in self.doc: self.doc['error'] = [] self.doc['error'].append(error) @@ -274,7 +274,7 @@ def error(self, msg=None, exception=None): def has_error(self): """Bool: check if document has an error""" - return self.doc['lock'] == -1 + return self.doc['lock'] == 99 def get_errors(self): """Get document error""" diff --git a/picas/modifiers.py b/picas/modifiers.py index 6111bb6..5affa95 100644 --- a/picas/modifiers.py +++ b/picas/modifiers.py @@ -138,8 +138,8 @@ def scrub(self, token): return token def set_error(self, token): - token['lock'] = -1 - token['done'] = -1 + token['lock'] = 99 + token['done'] = 99 return token diff --git a/tests/test_document.py b/tests/test_document.py index 674b07e..d42c3cb 100644 --- a/tests/test_document.py +++ b/tests/test_document.py @@ -88,8 +88,8 @@ def test_scrub(self): def test_error(self): self.task.error("some message") - self.assertEqual(self.task['lock'], -1) - self.assertEqual(self.task['done'], -1) + self.assertEqual(self.task['lock'], 99) + self.assertEqual(self.task['done'], 99) self.task.scrub() self.assertEqual(self.task['lock'], 0) self.assertEqual(self.task['done'], 0) diff --git a/tests/test_modifiers.py b/tests/test_modifiers.py index 562c427..2448669 100644 --- a/tests/test_modifiers.py +++ b/tests/test_modifiers.py @@ -37,8 +37,8 @@ def test_scrub(self): def test_seterror(self): self.modifier.set_error(self.token) - self.assertTrue(self.token['lock'] == -1) - self.assertTrue(self.token['done'] == -1) + self.assertTrue(self.token['lock'] == 99) + self.assertTrue(self.token['done'] == 99) def test_addoutput(self): self.modifier.add_output(self.token, {"output": "test"})