Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPD-409 Scrubbing added #17

Merged
merged 15 commits into from
Oct 11, 2024
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
python -m pip install --upgrade pip
pip install -U .
pip install -U ".[test]"
pip install flake8 pytest nose
pip install flake8 pytest nose stopit setuptools
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
Expand Down
13 changes: 1 addition & 12 deletions examples/local-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,6 @@ def process_task(self, token):
except:
pass

def time_elapsed(self, elapsed=30.):
"""
This function returns whether the class has been alive for more than `elapsed` seconds. This is needed because currently the maxtime argument in RunActor.run is broken:
The run method will break when the iterator is non-empty and then it checks if the maxtime has passed. If the iterator stays empty, it will run until a new token is
processed, and after processing the if statement is true, and run breaks.

@param elapsed: lifetime of the Actor in seconds

@returns: bool
"""
return self.timer.elapsed() > elapsed

def main():
# setup connection to db
Expand All @@ -92,7 +81,7 @@ def main():
# Create actor
actor = ExampleActor(client, modifier)
# Start work!
actor.run(max_tasks=2, stop_function=actor.time_elapsed, elapsed=11)
actor.run(max_token_time=10, max_total_time=100, max_tasks=10, max_scrub=2)

if __name__ == '__main__':
main()
66 changes: 38 additions & 28 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import signal
import subprocess

from .util import Timer
from .util import Timer, time_elapsed
from .iterators import TaskViewIterator, EndlessViewIterator

from couchdb.http import ResourceConflict
from stopit import ThreadingTimeout

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
stopit_logger = logging.getLogger('stopit')
stopit_logger.setLevel(logging.ERROR)


class AbstractRunActor(object):
Expand Down Expand Up @@ -45,7 +48,7 @@ def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], **
else:
self.iterator = iterator

def _run(self, task):
def _run(self, task, timeout):
"""
Execution of the work on the iterator used in the run method.
"""
Expand All @@ -54,13 +57,18 @@ def _run(self, task):
self.current_task = task

try:
self.process_task(task)
with ThreadingTimeout(timeout, swallow_exc=False) as context_manager:
self.process_task(task)
except Exception as ex:
msg = ("Exception {0} occurred during processing: {1}"
.format(type(ex), ex))
task.error(msg, exception=ex)
log.info(msg)

if context_manager.state == context_manager.TIMED_OUT:
msg = ("Token execution exceeded timeout limit of {0} seconds".format(timeout))
log.info(msg)

while True:
try:
self.db.save(task)
Expand All @@ -74,7 +82,7 @@ def _run(self, task):
self.cleanup_run()
self.tasks_processed += 1

def run(self):
def run(self, max_token_time=None):
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB.
Expand All @@ -87,7 +95,7 @@ def run(self):
self.prepare_env()
try:
for task in self.iterator:
self._run(task)
self._run(task, timeout=max_token_time)
self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()
Expand Down Expand Up @@ -117,6 +125,8 @@ def handler(self, signum, frame):

# update the token state, if reset vaue is None, do nothing.
if self.current_task and self.token_reset_values is not None:
# scrub goes first, as it reset lock and done to defaults, which could be overwritten below
self.current_task.scrub()
self.current_task['lock'] = self.token_reset_values[0]
self.current_task['done'] = self.token_reset_values[1]
self.db.save(self.current_task)
Expand Down Expand Up @@ -161,61 +171,61 @@ def cleanup_env(self, *args, **kwargs):
Method which gets called after the run method has completed.
"""


class RunActor(AbstractRunActor):
"""
RunActor class with added stopping functionality.
"""

def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args):
def run(self, max_token_time=None, max_total_time=None, max_tasks=None, max_scrub=0,
stop_function=None, **stop_function_args):
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB, including stop logic. The stop
logic is also extended into the EndlessViewIterator to break it when
the condition is met, otherwise it never stops.

@param max_time: maximum time to run picas before stopping
@param avg_time_factor: used for estimating when to stop with `max_time`,
value is average time per token to run
@param max_token_time: maximum time to run a single token before stopping
@param max_total_time: maximum time to run picas before stopping
@param max_tasks: number of tasks that are performed before stopping
@param max_scrub: number of times a token can be reset ('scrubbed') after failing
@param stop_function: custom function to stop the execution, must return bool
@param stop_function_args: kwargs to supply to stop_function
"""
self.time = Timer()
timer = Timer()
self.prepare_env()

# handler needs to be setup in overwritten method
self.setup_handler()

# Special case to break the while loop of the EndlessViewIterator:
# The while loop cant reach the stop condition in the for loop below,
# so pass the condition into the stop mechanism of the EVI, then the
# iterator is stopped from EVI and not the RunActorWithStop
if isinstance(self.iterator, EndlessViewIterator):
self.iterator.stop_callback = stop_function
self.iterator.stop_callback_args = stop_function_args
# Break the while loop of the EndlessViewIterator if max_total_time is exceeded
if max_total_time is not None and isinstance(self.iterator, EndlessViewIterator):
self.iterator.stop_callback = time_elapsed
self.iterator.stop_callback_args = {"timer": timer, "max": max_total_time}

try:
for task in self.iterator:
self._run(task)
self._run(task, timeout=max_token_time)

logging.debug("Tasks executed: ", self.tasks_processed)

if (stop_function is not None and
stop_function(**stop_function_args)):
# Scrub the token if it failed, scrubbing puts it back in 'todo' state
if (task['scrub_count'] < max_scrub) and (task['exit_code'] != 0):
log.info(f"Scrubbing token {task['_id']}")
task.scrub()
self.db.save(task)

if (stop_function is not None and stop_function(**stop_function_args)):
break

# break if number of tasks processed is max set
if max_tasks and self.tasks_processed == max_tasks:
break

if max_time is not None:
# for a large number of tokens the avg time will be better (due to statistics)
# resulting in a better estimate of whether time.elapsed + avg_time (what will
# be added on the next iteration) is larger than the max_time.
will_elapse = (self.time.elapsed() + avg_time_factor)
if will_elapse > max_time:
break
# break if max_total_time is exceeded (needed because only EndlessViewIterator has stop callback)
if max_total_time is not None and timer.elapsed() > max_total_time:
break

self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()
11 changes: 11 additions & 0 deletions picas/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
from copy import deepcopy


def time_elapsed(timer, max=30.):
"""
This function returns True whether the elapsed time is more than `max` seconds.
@param timer: Timer
@param max: maximum allowed time

@returns: bool
"""
return timer.elapsed() > max


def merge_dicts(dict1, dict2):
"""merge two dicts"""
merge = deepcopy(dict1)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
url='https://github.com/sara-nl/picasclient',
download_url='https://github.com/sara-nl/picasclient/tarball/0.3.0',
packages=['picas'],
install_requires=['couchdb'],
install_requires=['couchdb', 'stopit'],
license="MIT",
extras_require={
'test': ['flake8', 'pytest'],
Expand Down
52 changes: 45 additions & 7 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
import unittest

from test_mock import MockDB, MockRun, MockRunWithStop
from test_mock import MockDB, MockRun, MockRunWithStop, MockRunEmpty
from unittest.mock import patch

from picas import actors
Expand Down Expand Up @@ -68,17 +68,55 @@ def _callback_timer(self, task):
time.sleep(0.5) # force one token to "take" 0.5 s
task['exit_code'] = 0

def test_max_time(self):
def test_max_total_time(self):
"""
Test to stop running when the max time is about to be reached.
"""
self.count = 0
self.max_time = 0.5 # one token takes 0.5, so it quits after 1 token
self.avg_time_fac = 0.5
self.test_number = 1
max_time = 1.
runner = MockRunWithStop(self._callback_timer)
runner.run(max_time=self.max_time, avg_time_factor=self.avg_time_fac)
self.assertEqual(self.count, self.test_number)
start = time.time()
runner.run(max_total_time=max_time)
end = time.time()
exec_time = end-start
self.assertAlmostEqual(max_time, exec_time, 1)

def test_max_total_time_empty(self):

self.count = 0
max_time = 1.
runner = MockRunEmpty(self._callback_timer)
start = time.time()
runner.run(max_total_time=max_time)
end = time.time()
exec_time = end-start
self.assertAlmostEqual(max_time, exec_time, 1)

def _callback_error(self, task):
"""
Callback function that simulates an error.
"""
self.assertTrue(task.id in [t['_id'] for t in MockDB.TASKS])
self.assertTrue(task['lock'] > 0)
self.count += 1
task['exit_code'] = 1

def test_scrub(self):
"""
Test how many times a token is scrubbed. We can only test max_scrub
0 or 1 because of the limitations of MockDB.
"""
self.count = 0
max_scrub = 0
runner = MockRunWithStop(self._callback_error)
runner.run(max_scrub=max_scrub)
for t in runner.db.saved:
self.assertEqual(runner.db.saved[t]["scrub_count"], max_scrub)
max_scrub = 1
runner = MockRunWithStop(self._callback_error)
runner.run(max_scrub=max_scrub)
for t in runner.db.saved:
self.assertEqual(runner.db.saved[t]["scrub_count"], max_scrub)

@patch('picas.actors.log')
@patch('signal.signal')
Expand Down
23 changes: 21 additions & 2 deletions tests/test_mock.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import random
from picas.actors import AbstractRunActor, RunActor
from picas.documents import Document
from picas.iterators import EndlessViewIterator


class MockDB(object):
TASKS = [{'_id': 'a', 'lock': 0}, {'_id': 'b', 'lock': 0}, {'_id': 'c', 'lock': 0}]
TASKS = [{'_id': 'a', 'lock': 0, 'scrub_count': 0},
{'_id': 'b', 'lock': 0, 'scrub_count': 0},
{'_id': 'c', 'lock': 0, 'scrub_count': 0}]
JOBS = [{'_id': 'myjob'}]

def __init__(self):
Expand Down Expand Up @@ -40,12 +43,16 @@ def save(self, doc):
return doc


class EmptyMockDB(MockDB):
TASKS = []
JOBS = []


class MockRun(AbstractRunActor):

def __init__(self, callback):
db = MockDB()
super(MockRun, self).__init__(db)

self.callback = callback

def process_task(self, task):
Expand All @@ -57,8 +64,20 @@ class MockRunWithStop(RunActor):
def __init__(self, callback):
db = MockDB()
super(MockRunWithStop, self).__init__(db)
self.callback = callback
# self.iterator = EndlessViewIterator(self.iterator)

def process_task(self, task):
self.callback(task)


class MockRunEmpty(RunActor):

def __init__(self, callback):
db = EmptyMockDB()
super(MockRunEmpty, self).__init__(db)
self.callback = callback
self.iterator = EndlessViewIterator(self.iterator)

def process_task(self, task):
self.callback(task)