diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 6ca85a7..08012ef 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -19,14 +19,14 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.gitignore b/.gitignore index a02076f..d76a7ff 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.py[co] +.DS_Store # Packages *.egg diff --git a/examples/examples.ipynb b/examples/examples.ipynb new file mode 100644 index 0000000..c385b1b --- /dev/null +++ b/examples/examples.ipynb @@ -0,0 +1,292 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1e7dbcf6-9dff-46f1-8ee4-be0dc91e7130", + "metadata": {}, + "source": [ + "regular run: \n", + "`actor.run()` --> done \n", + "regular run with a timer in the iterator: \n", + "`actor.iterator = EndlessViewIterator` --> done\n", + "\n", + "set max tasks \n", + "`actor.run(max_tasks=2)` --> in branch SPD-410 \n", + "see only 2 tasks being taken and run\n", + "\n", + "stop after elapsed time \n", + "`actor.run(stop_function=actor.time_elapsed, elapsed=11)` --> in branch SPD-410 (through run() istead of iterator) \n", + "see that picas doesnt start a new token after 11 seconds of processing\n", + "\n", + "stop when you expect to run out of time \n", + "`actor.run(max_time=1, avg_time_factor=0.9)` --> in branch SPD-410 \n", + "add 3 tokens that sleep for 0.9 seconds and see that picas stops after 1.\n", + "\n", + "not resetting the token when killing picas \n", + "`super(ExampleActor, self).__init__(db, view=view, token_reset_values=None, **viewargs)` --> in branch SPD-409 \n", + "the opposite is the default: killing picas resets the token automatically, also needs to be shown " + ] + }, + { + "cell_type": "markdown", + "id": "b49cd6b9-d958-4e5a-a20d-b3c0d3da5bf4", + "metadata": {}, + "source": [ + "# PiCaS examples" + ] + }, + { + "cell_type": "markdown", + "id": "ef844679-a768-426b-a6ca-7fb5fae95023", + "metadata": {}, + "source": [ + "## Pushing work to the database\n", + "We need to push tokens or tasks to the CouchDB instance, so that PiCaS can fetch the work and execute it one by one.\n", + "To accomplish this, we define some functions up next, that will push lines from an input file as commands to tokens. Each line becomes a single token." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8d7513b3-c960-4975-be01-68045292e51e", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import random\n", + "import sys\n", + "import time\n", + "\n", + "import couchdb\n", + "import picasconfig" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b055b008-ecda-440d-b290-989641c09a28", + "metadata": {}, + "outputs": [], + "source": [ + "def getNextIndex():\n", + " \"\"\"Function to set the index sequentially, instead of default random string\"\"\"\n", + " db = get_db()\n", + " index = 0\n", + " while db.get(f\"token_{index}\") is not None:\n", + " index+=1\n", + "\n", + " return index\n", + "\n", + "def loadTokens(db):\n", + " \"\"\"Create the tokens from the given file and push to the server\"\"\"\n", + " tokens = []\n", + " tokensfile = '/home/lodewijkn/picasclient/examples/quickExample.txt' # put your own path here\n", + " with open(tokensfile) as f:\n", + " input = f.read().splitlines()\n", + "\n", + " i = getNextIndex()\n", + " for fractal in input:\n", + " token = {\n", + " '_id': 'token_' + str(i),\n", + " 'type': 'token',\n", + " 'lock': 0,\n", + " 'done': 0,\n", + " 'hostname': '',\n", + " 'scrub_count': 0,\n", + " 'input': fractal,\n", + " 'exit_code': ''\n", + " }\n", + " tokens.append(token)\n", + " i = i +1\n", + " db.update(tokens)\n", + "\n", + "def get_db():\n", + " \"\"\"Fetch the server instance\"\"\"\n", + " server = couchdb.Server(picasconfig.PICAS_HOST_URL)\n", + " username = picasconfig.PICAS_USERNAME\n", + " pwd = picasconfig.PICAS_PASSWORD\n", + " server.resource.credentials = (username,pwd)\n", + " db = server[picasconfig.PICAS_DATABASE]\n", + " return db\n", + "\n", + "print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n", + "#Create a connection to the server\n", + "db = get_db()\n", + "#Load the tokens to the database\n", + "loadTokens(db)\n", + "print(\"Tokens have been pushed.\")" + ] + }, + { + "cell_type": "markdown", + "id": "ce2cee42-cb49-4896-a816-3dae556965af", + "metadata": {}, + "source": [ + "## Processing tasks stored in the database using PiCaS classes\n", + "Next, we define a custom class that is based (inherited) on the RunActor. We need to define the \"process_task\" method to define how each token is processed. The automation is then taken case of in the base class implementation. \n", + "Of course, you are free to overwrite more parts of the class in case that is needed. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "31b18dbc-78f2-436c-b074-e60d2eef1fb1", + "metadata": {}, + "outputs": [], + "source": [ + "from picas.actors import RunActor\n", + "from picas.clients import CouchDB\n", + "from picas.iterators import TaskViewIterator, EndlessViewIterator\n", + "from picas.modifiers import BasicTokenModifier\n", + "from picas.executers import execute\n", + "from picas.util import Timer\n", + "\n", + "class ExampleActor(RunActor):\n", + " \"\"\"\n", + " The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.\n", + " Feel free to adjust to whatever you need, a template can be found at: example-template.py\n", + " \"\"\"\n", + " def __init__(self, db, modifier, view=\"todo\", **viewargs):\n", + " super().__init__(db, view=view, **viewargs)\n", + " self.timer = Timer()\n", + " self.modifier = modifier\n", + " self.client = db\n", + "\n", + " def process_task(self, token):\n", + " # Print token information\n", + " print(\"-----------------------\")\n", + " print(\"Working on token: \" +token['_id'])\n", + " for key, value in token.doc.items():\n", + " print(key, value)\n", + " print(\"-----------------------\")\n", + "\n", + " # Start running the main job\n", + " # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out\n", + " command = \"/usr/bin/time -v ./process_task.sh \" + \"\\\"\" +token['input'] + \"\\\" \" + token['_id'] + \" 2> logs_\" + str(token['_id']) + \".err 1> logs_\" + str(token['_id']) + \".out\"\n", + " out = execute(command, shell=True)\n", + "\n", + " ## Get the job exit code in the token\n", + " token['exit_code'] = out[0]\n", + " token = self.modifier.close(token)\n", + " \n", + " # Attach logs in token\n", + " curdate = time.strftime(\"%d/%m/%Y_%H:%M:%S_\")\n", + " try:\n", + " logsout = \"logs_\" + str(token['_id']) + \".out\"\n", + " log_handle = open(logsout, 'rb')\n", + " token.put_attachment(logsout, log_handle.read())\n", + "\n", + " logserr = \"logs_\" + str(token['_id']) + \".err\"\n", + " log_handle = open(logserr, 'rb')\n", + " token.put_attachment(logserr, log_handle.read())\n", + " except:\n", + " pass\n", + "\n", + "\n", + "client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)\n", + "print(f\"Connected to the database {picasconfig.PICAS_DATABASE} sucessfully. Now starting work...\")\n", + "modifier = BasicTokenModifier()\n", + "\n", + "actor = ExampleActor(client, modifier)\n", + "actor.run()" + ] + }, + { + "cell_type": "markdown", + "id": "9966f3b6-3459-415d-b712-09fe2beb4ced", + "metadata": {}, + "source": [ + "Now we want to let the Actor run indefinitely, waiting for work and starting it immediately once its found in the DB. \n", + "Such an Actor should time-out eventually, and for this we define a timer boolean function:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "77800422-0bc2-4d1a-ae66-27df484de012", + "metadata": {}, + "outputs": [], + "source": [ + "def time_elapsed(timer, elapsed=30.):\n", + " \"\"\"\n", + " @param timer: Timer object from the Actor class\n", + " \n", + " @param elapsed: lifetime of the Actor in seconds\n", + "\n", + " @returns: bool\n", + " \"\"\"\n", + " return timer.elapsed() > elapsed" + ] + }, + { + "cell_type": "markdown", + "id": "977a4503-9206-49ae-a5e0-752f7b6a7ca4", + "metadata": {}, + "source": [ + "This boolean is passed into the EndlessViewIterator's `stop_function`, so the Iterator knows when to stop: when the boolean becomes `True`." + ] + }, + { + "cell_type": "markdown", + "id": "33837de7-54d8-406d-b5d3-a056397cbf19", + "metadata": {}, + "source": [ + "We push some more tokens, and define a new RunActor that will go on indefinitely, except we gave it a `stop_function` to stop after some seconds of waiting." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "803680e0-4954-43db-9f75-bb4899895883", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n", + "db = get_db()\n", + "loadTokens(db)\n", + "print(\"Tokens have been pushed.\")" + ] + }, + { + "cell_type": "markdown", + "id": "65c8280f-70db-45ec-9260-90e903ddd957", + "metadata": {}, + "source": [ + "Now we overwrite the Iterator in the Actor, to use an iterator that does not stop, until `stop_function` is called. For this function we use the `time_elapsed` to stop it after 11 seconds, or two scans of the DB for work." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b98685ad-ab37-4f59-9144-eb93dbba6607", + "metadata": {}, + "outputs": [], + "source": [ + "actor = ExampleActor(client, modifier)\n", + "actor.iterator = EndlessViewIterator(actor.iterator, stop_callback=time_elapsed, timer=actor.timer, elapsed=11)\n", + "actor.run()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/picas/__init__.py b/picas/__init__.py index b272845..48c9a0a 100644 --- a/picas/__init__.py +++ b/picas/__init__.py @@ -11,7 +11,6 @@ """ -import logging from .documents import Document, Task, Job, User from .clients import CouchDB from .iterators import (ViewIterator, TaskViewIterator, EndlessViewIterator, @@ -19,16 +18,7 @@ from .actors import RunActor -version = "0.3.0" - -picaslogger = logging.getLogger("PiCaS") -formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') -ch = logging.StreamHandler() -ch.setLevel(logging.DEBUG) -ch.setFormatter(formatter) -picaslogger.addHandler(ch) -picaslogger.setLevel(logging.ERROR) +VERSION = "0.3.0" __all__ = [ 'CouchDB', diff --git a/picas/actors.py b/picas/actors.py index 31d0af9..e7c20a9 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -4,6 +4,7 @@ @Copyright (c) 2016, Jan Bot @author: Jan Bot, Joris Borgdorff """ + import logging import signal import subprocess @@ -141,14 +142,12 @@ def prepare_env(self, *args, **kwargs): Method to be called to prepare the environment to run the application. """ - pass def prepare_run(self, *args, **kwargs): """ Code to run before a task gets processed. Used e.g. for fetching inputs. """ - pass def process_task(self, task): """ @@ -161,15 +160,13 @@ def cleanup_run(self, *args, **kwargs): """ Code to run after a task has been processed. """ - pass def cleanup_env(self, *args, **kwargs): """ Method which gets called after the run method has completed. """ - pass - + class RunActorWithStop(RunActor): """ RunActor class with added stopping functionality. @@ -225,4 +222,4 @@ def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=Non break self.current_task = None # set to None so the handler leaves the token alone when picas is killed finally: - self.cleanup_env() + self.cleanup_env() \ No newline at end of file diff --git a/picas/batchid.py b/picas/batchid.py index a668ab5..6f0b542 100644 --- a/picas/batchid.py +++ b/picas/batchid.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- -from os import environ +""" +@author Maarten Kooyman +""" -''' @author Maarten Kooyman ''' +from os import environ def add_batch_management_id(doc): diff --git a/picas/clients.py b/picas/clients.py index ec90628..2b0dcb2 100644 --- a/picas/clients.py +++ b/picas/clients.py @@ -1,31 +1,31 @@ # -*- coding: utf-8 -*- """ - @licence: The MIT License (MIT) @Copyright (c) 2016, Jan Bot @author: Jan Bot @author: Joris Borgdorff """ -from .documents import Document import random import sys -# Couchdb imports import couchdb from couchdb.design import ViewDefinition from couchdb.http import ResourceConflict +from .documents import Document +from .picaslogger import picaslogger + -class CouchDB(object): +class CouchDB: - """Client class to handle communication with the CouchDB back-end. - """ + """Client class to handle communication with the CouchDB back-end.""" def __init__(self, url="http://localhost:5984", db="test", username=None, password="", ssl_verification=True, create=False): - """Create a CouchClient object. + """ + Create a CouchClient object. :param url: the location where the CouchDB instance is located, including the port at which it's listening. Default: http://localhost:5984 @@ -43,6 +43,7 @@ def __init__(self, url="http://localhost:5984", db="test", self.db = server[db] def copy(self): + """Copy the DB connection.""" resource = self.db.resource try: username, password = resource.credentials @@ -122,7 +123,8 @@ def save(self, doc): return doc def save_documents(self, docs): - """Save a sequence of Documents to the database. + """ + Save a sequence of Documents to the database. - If the document was newly created and the _id is already is in the database the document will not be added. @@ -145,9 +147,9 @@ def save_documents(self, docs): return result - def add_view(self, view, map_fun, reduce_fun=None, design_doc="Monitor", - *args, **kwargs): - """ Add a view to the database + def add_view(self, view, map_fun, *args, reduce_fun=None, design_doc="Monitor", **kwargs): + """ + Add a view to the database All extra parameters are passed to couchdb.design.ViewDefinition :param view: name of the view :param map_fun: string of the javascript map function @@ -185,16 +187,13 @@ def delete_documents(self, docs): try: self.delete(doc) except ResourceConflict as ex: - print("Could not delete document {0} (rev {1}) " - "due to resource conflict: {2}". - format(doc.id, doc.rev, str(ex)), - file=sys.stderr) + picaslogger.info( + f"Could not delete document {doc.id} (rev {doc.rev}) due to resource conflict: {str(ex)}", + file=sys.stderr) result[i] = False except Exception as ex: - print("Could not delete document {0!s}: {1!s}". - format(str(doc), str(ex)), file=sys.stderr) + picaslogger.info(f"Could not delete document {str(doc)}: {str(ex)}", file=sys.stderr) result[i] = False - return result def delete_from_view(self, view, design_doc="Monitor"): @@ -210,6 +209,7 @@ def delete_from_view(self, view, design_doc="Monitor"): def set_users(self, admins=None, members=None, admin_roles=None, member_roles=None): + """Set permissions for users.""" security = self.db.resource.get_json("_security")[2] def try_set(value, d, key, subkey): diff --git a/picas/documents.py b/picas/documents.py index d6ef096..b642239 100644 --- a/picas/documents.py +++ b/picas/documents.py @@ -1,17 +1,19 @@ +""" +@author Joris Borgdorff +""" + import socket -from .util import merge_dicts, seconds import mimetypes import base64 import traceback from uuid import uuid4 -from . import batchid - -''' @author Joris Borgdorff ''' +from . import batchid +from .util import merge_dicts, seconds -class Document(object): - ''' A CouchDB document ''' +class Document: + """A CouchDB document.""" def __init__(self, data=None, base=None): if data is None: @@ -50,34 +52,37 @@ def __iter__(self): @property def id(self): + """id getter""" try: return self.doc['_id'] - except KeyError: - raise AttributeError("_id for document is not set") + except KeyError as ex: + raise AttributeError("_id for document is not set") from ex @property def rev(self): + """revision getter""" try: return self.doc['_rev'] - except KeyError: + except KeyError as ex: raise AttributeError("_rev is not available: document is not " - "retrieved from database") + "retrieved from database") from ex @id.setter def id(self, new_id): + """id setter""" self.doc['_id'] = new_id @property def value(self): + """doc getter""" return self.doc def update(self, values): - """Add the output of the RunActor to the task. - """ + """Add the output of the RunActor to the task.""" self.doc.update(values) def put_attachment(self, name, data, mimetype=None): - ''' + """ Put an attachment in the document. The attachment data must be provided as str in Python 2 and bytes in @@ -85,12 +90,12 @@ def put_attachment(self, name, data, mimetype=None): The mimetype, if not provided, is guessed from the filename and defaults to text/plain. - ''' + """ if '_attachments' not in self.doc: self.doc['_attachments'] = {} if mimetype is None: - mimetype, encoding = mimetypes.guess_type(name) + mimetype, _ = mimetypes.guess_type(name) if mimetype is None: mimetype = 'text/plain' @@ -105,7 +110,8 @@ def put_attachment(self, name, data, mimetype=None): 'content_type': mimetype, 'data': b64data.decode()} def get_attachment(self, name, retrieve_from_database=None): - ''' Gets an attachment dict from the document. + """ + Gets an attachment dict from the document. Attachment data may not have been copied over from the database, in that case it will have an md5 checksum. A CouchDB database may be set in retrieve_from_database to retrieve @@ -115,7 +121,7 @@ def get_attachment(self, name, retrieve_from_database=None): Python 3. Raises KeyError if attachment does not exist. - ''' + """ # Copy all attributes except data, it may be very large attachment = {} for key in self.doc['_attachments'][name]: @@ -139,25 +145,29 @@ def get_attachment(self, name, retrieve_from_database=None): return attachment def remove_attachment(self, name): + """Remove attachment from document""" del self.doc['_attachments'][name] return self def _update_hostname(self): + """Set hostname in document""" self.doc['hostname'] = socket.gethostname() return self class User(Document): - ''' CouchDB user ''' + """ + CouchDB user + """ def __init__(self, username, password, roles=None, data=None): if roles is None: roles = [] if data is None: data = {} - super(User, self).__init__( + super().__init__( data=data, base={ - '_id': 'org.couchdb.user:{0}'.format(username), + '_id': f'org.couchdb.user:{username}', 'name': username, 'type': 'user', 'password': password, @@ -166,6 +176,9 @@ def __init__(self, username, password, roles=None, data=None): class Task(Document): + """ + Class to manage task modifications with. + """ __BASE = { 'type': 'task', 'lock': 0, @@ -179,25 +192,22 @@ class Task(Document): 'error': [], } - """Class to manage task modifications with. - """ - def __init__(self, task=None): if task is None: task = {} - super(Task, self).__init__(task, Task.__BASE) + super().__init__(task, Task.__BASE) if '_id' not in self.doc: self.doc['_id'] = 'task_' + uuid4().hex def lock(self): - """Function which modifies the task such that it is locked. - """ + """Function which modifies the task such that it is locked.""" self.doc['lock'] = seconds() batchid.add_batch_management_id(self.doc) return self._update_hostname() def done(self): - """Function which modifies the task such that it is closed for ever + """ + Function which modifies the task such that it is closed for ever to the view that has supplied it. """ self.doc['done'] = seconds() @@ -205,12 +215,12 @@ def done(self): @property def input(self): - """ Get input """ + """Get input""" return self.doc['input'] @input.setter def input(self, value): - """ Set input """ + """Set input""" self.doc['input'] = value @property @@ -220,16 +230,17 @@ def output(self): @output.setter def output(self, output): - """Add the output of the RunActor to the task. - """ + """Add the output of the RunActor to the task.""" self.doc['output'] = output @property def uploads(self): + """Uploads getter""" return self.doc['uploads'] @uploads.setter def uploads(self, uploads): + """Uploads setter""" self.doc['uploads'] = uploads def scrub(self): @@ -246,6 +257,7 @@ def scrub(self): return self._update_hostname() def error(self, msg=None, exception=None): + """Set error message in the document""" error = {'time': seconds()} if msg is not None: error['message'] = str(msg) @@ -261,19 +273,25 @@ def error(self, msg=None, exception=None): return self def has_error(self): + """Bool: check if document has an error""" return self.doc['lock'] == -1 def get_errors(self): + """Get document error""" try: return self.doc['error'] except KeyError(): return [] def is_done(self): + """Bool: is document done""" return self.doc['done'] != 0 class Job(Document): + """ + Job class is more explicit in the timing and archives the work. + """ __BASE = { 'type': 'job', 'hostname': '', @@ -285,11 +303,12 @@ class Job(Document): } def __init__(self, job): - super(Job, self).__init__(job, Job.__BASE) + super().__init__(job, Job.__BASE) if '_id' not in self.doc: raise ValueError('Job ID must be set') def queue(self, method, host=None): + """Set queue time""" self.doc['method'] = method if host is not None: self.doc['hostname'] = host @@ -297,16 +316,19 @@ def queue(self, method, host=None): return self def start(self): + """Set start time""" self.doc['start'] = seconds() self.doc['done'] = 0 self.doc['archive'] = 0 return self._update_hostname() def finish(self): + """Set end time""" self.doc['done'] = seconds() return self def archive(self): + """Set archive time""" if self.doc['done'] <= 0: self.doc['done'] = seconds() self.doc['archive'] = seconds() @@ -315,4 +337,5 @@ def archive(self): return self def is_done(self): + """Bool: is done""" return self.doc['done'] != 0 diff --git a/picas/executers.py b/picas/executers.py index b1bcaa5..dbb454a 100644 --- a/picas/executers.py +++ b/picas/executers.py @@ -21,6 +21,7 @@ def execute(args, shell=False): return (proc, proc.returncode, stdout, stderr) + def execute_old(cmd): """Helper function to execute an external application. @param cmd: the command to be executed. diff --git a/picas/generators.py b/picas/generators.py index 003be4c..6df5c98 100644 --- a/picas/generators.py +++ b/picas/generators.py @@ -6,14 +6,16 @@ """ -class TokenGenerator(object): - """Object to generate the standard tokens with. +class TokenGenerator: + """ + Object to generate the standard tokens with. """ def __init__(self): pass @staticmethod def get_empty_token(): + """Generate empty token""" token = { 'lock': 0, 'done': 0, diff --git a/picas/iterators.py b/picas/iterators.py index a403d03..b13eb5b 100644 --- a/picas/iterators.py +++ b/picas/iterators.py @@ -5,12 +5,15 @@ @Copyright (c) 2016, Jan Bot """ -from .documents import Task -from couchdb.http import ResourceConflict import time +from couchdb.http import ResourceConflict + +from .documents import Task +from .picaslogger import picaslogger + -class ViewIterator(object): +class ViewIterator: """ Dummy class to show what to implement for a PICaS iterator. """ @@ -23,17 +26,17 @@ def __iter__(self): return self def reset(self): + """Reset the iterator.""" self._stop = False def stop(self): + """Stop the iterator.""" self._stop = True def is_stopped(self): + """Bool: iterator stopped.""" return self._stop - def next(self): - return self.__next__() - def __next__(self): """ Get the next task. @@ -46,14 +49,12 @@ def __next__(self): try: return self.claim_task() - except IndexError: + except IndexError as ex: self.stop() - raise StopIteration + raise StopIteration from ex def claim_task(self): - """ - Get the first available task from a view. - """ + """Get the first available task from a view.""" raise NotImplementedError("claim_task function not implemented.") @@ -71,8 +72,7 @@ def _claim_task(database, view, allowed_failures=10, **view_params): class TaskViewIterator(ViewIterator): - """Iterator object to fetch tasks while available. - """ + """Iterator object to fetch tasks while available.""" def __init__(self, database, view, **view_params): """ @param database: CouchDB database to get tasks from. @@ -80,7 +80,7 @@ def __init__(self, database, view, **view_params): @param view_params: parameters which need to be passed on to the view (optional). """ - super(TaskViewIterator, self).__init__() + super().__init__() self.database = database self.view = view self.view_params = view_params @@ -106,7 +106,7 @@ def __init__(self, database, high_priority_view, low_priority_view, @param view_params: parameters which need to be passed on to the view (optional). """ - super(PrioritizedViewIterator, self).__init__() + super().__init__() self.database = database self.high_priority_view = high_priority_view self.low_priority_view = low_priority_view @@ -138,13 +138,14 @@ def __init__(self, view_iterator, sleep_sec=10, stop_callback=None, iterator should stop feeding tasks @param stop_callback_args: arguments to the stop_callback function. """ - super(EndlessViewIterator, self).__init__() + super().__init__() self.iterator = view_iterator self.sleep_sec = sleep_sec self.stop_callback = stop_callback self.stop_callback_args = stop_callback_args def is_cancelled(self): + """Bool to check if the while should be stopped""" return (self.is_stopped() or (self.stop_callback is not None and self.stop_callback(**self.stop_callback_args))) @@ -156,10 +157,10 @@ def __next__(self): except StopIteration: self.iterator.reset() time.sleep(self.sleep_sec) - print("Iterator is waiting for work...") + picaslogger.info("Iterator is waiting for work...") # no longer continue self.iterator.stop() self.stop() - print("Iterator is finishing.") + picaslogger.info("Iterator is finishing.") raise StopIteration diff --git a/picas/modifiers.py b/picas/modifiers.py index 9c7c64d..8370129 100644 --- a/picas/modifiers.py +++ b/picas/modifiers.py @@ -13,30 +13,39 @@ from . import batchid -class TokenModifier(object): +class TokenModifier: + """(semi)Abstract class for token modifiers + """ def __init__(self, timeout=86400): self.timeout = timeout def lock(self, *args, **kwargs): + """Set the token to locked state""" raise NotImplementedError("Lock function not implemented.") def unlock(self, *args, **kwargs): + """Set the token to unlocked state""" raise NotImplementedError("Unlock functin not implemented.") def close(self, *args, **kwargs): + """Set the token to closed state""" raise NotImplementedError("Close function not implemented.") def unclose(self, *args, **kwargs): + """Set the token to not closed state""" raise NotImplementedError("Unclose function not implemented.") def add_output(self, *args, **kwargs): + """Add output to the token""" raise NotImplementedError("Add_output function not implemented.") def scrub(self, *args, **kwargs): + """Scrub the token""" raise NotImplementedError("Scrub function not implemented.") def set_error(self, *args, **kwargs): + """Set the token to error state""" raise NotImplementedError("set_error function not implemented.") @@ -138,6 +147,8 @@ def set_error(self, token): class NestedTokenModifier(TokenModifier): + """Nested token modifier class + """ def __init__(self, timeout=86400): self.timeout = timeout @@ -153,17 +164,17 @@ def _get_token_from_value(self, ref, record): return record[ref] def _get_token(self, ref, record): - if (isinstance(ref, list)): + if isinstance(ref, list): return self._get_token_from_list(ref, record) - else: - return self._get_token_from_value(ref, record) + return self._get_token_from_value(ref, record) def get_token(self, ref, record): + """Get a nested token with a reference""" return self._get_token(ref, record) def _update_record(self, ref, record, token): r = record - if (isinstance(ref, list)): + if isinstance(ref, list): for k in ref[1:-1]: r = r[k] r[ref[-1]] = token diff --git a/picas/picaslogger.py b/picas/picaslogger.py new file mode 100644 index 0000000..2fd5cab --- /dev/null +++ b/picas/picaslogger.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +""" +@licence: The MIT License (MIT) +@Copyright (c) 2016, Jan Bot +@author: Lodewijk Nauta +""" + +import logging + +picaslogger = logging.getLogger("PiCaS") +formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +ch.setFormatter(formatter) +picaslogger.addHandler(ch) +picaslogger.setLevel(logging.INFO) diff --git a/picas/srm.py b/picas/srm.py index eca82ee..a775cb0 100644 --- a/picas/srm.py +++ b/picas/srm.py @@ -13,18 +13,19 @@ from os import path from .executers import execute, execute_old +from .picaslogger import picaslogger -def download(remotefile, local_dir): - logging.debug("Downloading: " + remotefile) - raise NotImplementedError( - "Download function not implemented yet. Use SRMClient class.") +def download(remotefile): + """Download file""" + logging.debug(f"Downloading: {remotefile}") + raise NotImplementedError("Download function not implemented yet. Use SRMClient class.") -def upload(localfile, srm_dir): - logging.debug("Uploading: " + localfile) - raise NotImplementedError( - "Upload function not implemented yet. Use SRMClient class.") +def upload(localfile): + """Upload file""" + logging.debug(f"Uploading: {localfile}") + raise NotImplementedError("Upload function not implemented yet. Use SRMClient class.") def download_many(files, poolsize=10, logger=None): @@ -39,7 +40,7 @@ def download_many(files, poolsize=10, logger=None): q.put(v) thread_pool = [] - for i in range(poolsize): + for _ in range(poolsize): d = Downloader(q, logger) d.start() thread_pool.append(d) @@ -50,8 +51,8 @@ def download_many(files, poolsize=10, logger=None): def upload_many(files, poolsize=10): - raise NotImplementedError( - "upload_many function not implemented. Use SRMClient class.") + """Upload multiple files""" + raise NotImplementedError("upload_many function not implemented. Use SRMClient class.") class Downloader(threading.Thread): @@ -61,14 +62,14 @@ class Downloader(threading.Thread): SRM with too many request. """ - def __init__(self, queue, logger=None): + def __init__(self, q, logger=None): """Initialization. - @param queue: Python queue object containing all the files that need + @param q: Python queue object containing all the files that need to be downloaded. @param logger: Python logger object. """ threading.Thread.__init__(self) - self.q = queue + self.q = q if logger is None: self.logger = logging.getLogger('SRM') else: @@ -93,13 +94,12 @@ def run(self): self.q.task_done() - if (count > 24): - self.logger.error("Download of " + f + - " failed after multiple tries.") + if count > 24: + self.logger.error(f"Download of {f} failed after multiple tries.") raise EnvironmentError("Download failed of: " + f) -class SRMClient(object): +class SRMClient: """Helper class to easily down- and upload files to/from SRM. """ @@ -119,17 +119,16 @@ def remote_exists(self, loc): """ surl = self.srm_host + loc cmd = ['srmls', surl] - print(" ".join(cmd)) - (proc, returncode, stdout, stderr) = execute(cmd) + picaslogger.info(" ".join(cmd)) + (proc, returncode, stdout, _) = execute(cmd) + if returncode == 0: bn = path.basename(loc) lines = stdout.split("\n") for line in lines: if bn in line: return True - return False - else: - return False + return False def upload(self, local_file, srm_dir, check=False): """Upload local file to the SRM. @@ -151,8 +150,8 @@ def upload(self, local_file, srm_dir, check=False): cmd = ['srmcp', '-2', '-server_mode=passive', 'file:///' + local_file, srm_url] - print(cmd) - (proc, returncode, stdout, stderr) = execute(cmd) + picaslogger.info(cmd) + (proc, returncode, _, _) = execute(cmd) if returncode == 0: pass else: diff --git a/picas/srmclient.py b/picas/srmclient.py index d8ce3ee..02061f5 100644 --- a/picas/srmclient.py +++ b/picas/srmclient.py @@ -10,32 +10,35 @@ import logging import queue -from picas import SRMClient +from picas.srm import SRMClient +from .picaslogger import picaslogger def download(files, threads=10): + """Download wrapper""" q = queue.Queue() - for k, v in files.items(): + for _, v in files.items(): q.put(v) thread_pool = [] - for i in range(threads): + for _ in range(threads): d = Downloader(q) d.start() thread_pool.append(d) q.join() - print("Download work done, joining threads") + picaslogger.info("Download work done, joining threads") for d in thread_pool: - print("Joining: {0!s}".format(str(d.ident))) + picaslogger.info(f"Joining: {d.ident}") d.join(1) class Downloader(threading.Thread): + """Download class using SRM""" - def __init__(self, queue): + def __init__(self, q): threading.Thread.__init__(self) - self.q = queue + self.q = q self.logger = logging.getLogger('Pindel') self.srm = SRMClient(self.logger) self.daemon = False @@ -51,7 +54,7 @@ def run(self): done = True except Exception(): count += 1 - if (count > 9): + if count > 9: raise EnvironmentError("Download failed.") self.q.task_done() - print("Exiting while loop, thread should close itself...") + picaslogger.info("Exiting while loop, thread should close itself...") diff --git a/picas/util.py b/picas/util.py index d14ae62..07f2a33 100644 --- a/picas/util.py +++ b/picas/util.py @@ -1,28 +1,35 @@ +""" +@author Joris Borgdorff +""" + import time from copy import deepcopy -''' @author Joris Borgdorff ''' - def merge_dicts(dict1, dict2): + """merge two dicts""" merge = deepcopy(dict1) merge.update(dict2) return merge def seconds(): + """get time in seconds""" return int(time.time()) -class Timer(object): +class Timer: + """Timer class""" def __init__(self): self.t = time.time() def elapsed(self): + """Get elapsed since class init""" return time.time() - self.t def reset(self): + """Reset timer""" new_t = time.time() diff = new_t - self.t self.t = new_t diff --git a/setup.cfg b/setup.cfg index b88034e..c40b046 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,5 @@ [metadata] description-file = README.md + +[flake8] +max-line-length = 199 diff --git a/tests/test_document.py b/tests/test_document.py index cf1396a..674b07e 100644 --- a/tests/test_document.py +++ b/tests/test_document.py @@ -1,22 +1,23 @@ +""" +@author Joris Borgdorff +""" + import base64 import unittest from picas.documents import Document, Task from picas.util import seconds -''' @author Joris Borgdorff ''' test_id = 'mydoc' test_other_id = 'myotherdoc' - class TestTask(unittest.TestCase): def setUp(self): self.task = Task({'_id': test_id}) - def test_create(self): doc = Document({'_id': test_id}) self.assertEqual(doc.id, test_id) @@ -25,17 +26,17 @@ def test_create(self): self.assertEqual(doc.id, test_other_id) self.assertEqual(doc.value, {'_id': test_other_id}) - def test_no_id(self): doc = Document({'someattr': 1}) - self.assertRaises(AttributeError, getattr(doc), 'id') - self.assertRaises(AttributeError, getattr(doc), 'rev') + with self.assertRaises(AttributeError): + getattr(doc, 'id') + with self.assertRaises(AttributeError): + getattr(doc, 'rev') def test_empty(self): Document({}) - def test_attachment(self): doc = Document() data = b"This is it" @@ -46,7 +47,7 @@ def test_attachment(self): self.assertEqual(attach['content_type'], 'text/plain') self.assertEqual(attach['data'], data) self.assertEqual(doc['_attachments'][textfile]['data'], - base64.b64encode(data).decode()) + base64.b64encode(data).decode()) doc.remove_attachment(textfile) self.assertTrue(textfile not in doc['_attachments']) self.assertEqual(attach['data'], data) @@ -54,13 +55,12 @@ def test_attachment(self): attach = doc.get_attachment(jsonfile) self.assertEqual(attach['content_type'], 'application/json') - def test_id(self): self.assertEqual(self.task.id, test_id) self.assertEqual(self.task.value['_id'], test_id) self.assertEqual(self.task['_id'], test_id) - def test_no_id(self): + def test_id_len(self): t = Task() self.assertTrue(len(t.id) > 10) diff --git a/tests/test_executers.py b/tests/test_executers.py new file mode 100644 index 0000000..127b0b1 --- /dev/null +++ b/tests/test_executers.py @@ -0,0 +1,13 @@ +import unittest + +from picas.executers import execute + + +class TestExecutors(unittest.TestCase): + + def test_run_command(self): + returncode, stdout, stderr = execute(["echo", "'hello world'"]) + + self.assertEqual(returncode, 0) + self.assertEqual(stdout, b"'hello world'\n") + self.assertEqual(stderr, b'') diff --git a/tests/test_generators.py b/tests/test_generators.py new file mode 100644 index 0000000..a4c3c8d --- /dev/null +++ b/tests/test_generators.py @@ -0,0 +1,17 @@ +import unittest + +from picas.generators import TokenGenerator + + +class TestGenerators(unittest.TestCase): + + def setUp(self): + self.generator = TokenGenerator() + + def test_get_token(self): + token = self.generator.get_empty_token() + self.assertTrue(token['lock'] == 0) + self.assertTrue(token['done'] == 0) + self.assertTrue(token['hostname'] == '') + self.assertTrue(token['scrub_count'] == 0) + self.assertTrue(token['type'] == 'token') diff --git a/tests/test_iterators.py b/tests/test_iterators.py index 6d5b1b0..5f0c189 100644 --- a/tests/test_iterators.py +++ b/tests/test_iterators.py @@ -1,13 +1,12 @@ import unittest -from picas.iterators import TaskViewIterator +from picas.iterators import TaskViewIterator, EndlessViewIterator from test_mock import MockDB -#from nose.tools import assert_equals, assert_true class TestTask(unittest.TestCase): - def test_iterator(self): + def test_taskviewiterator(self): self.db = MockDB() for task in TaskViewIterator(self.db, 'view'): self.assertTrue(task['lock'] > 0) @@ -16,3 +15,18 @@ def test_iterator(self): break # process one task only self.assertEqual(len(self.db.saved), 1) + + def stop_function(self, stop_value=2): + self.stop_value = stop_value + return len(self.db.saved) == stop_value + + def test_endlessviewiterator(self): + self.db = MockDB() + self.iterator = TaskViewIterator(self.db, 'view') + for task in EndlessViewIterator(self.iterator, stop_callback=self.stop_function): + self.assertTrue(task['lock'] > 0) + self.assertEqual(task.rev, 'something') + self.assertEqual(self.db.saved[task.id], task.value) + + self.assertEqual(len(self.db.saved), self.stop_value) + self.assertEqual(len(self.db.TASKS), 3) diff --git a/tests/test_mock.py b/tests/test_mock.py index 6a5702c..553fbe7 100644 --- a/tests/test_mock.py +++ b/tests/test_mock.py @@ -4,7 +4,7 @@ class MockDB(object): - TASKS = [{'_id': 'a', 'lock': 0}, {'_id': 'b', 'lock': 0}] + TASKS = [{'_id': 'a', 'lock': 0}, {'_id': 'b', 'lock': 0}, {'_id': 'c', 'lock': 0}] JOBS = [{'_id': 'myjob'}] def __init__(self): diff --git a/tests/test_modifiers.py b/tests/test_modifiers.py new file mode 100644 index 0000000..b5574c8 --- /dev/null +++ b/tests/test_modifiers.py @@ -0,0 +1,46 @@ +import unittest + +from picas.documents import Task +from picas.modifiers import BasicTokenModifier + + +class TestModifier(unittest.TestCase): + + def setUp(self): + self.modifier = BasicTokenModifier() + self.token = Task() + + def test_lock(self): + self.modifier.lock(self.token) + self.assertTrue(self.token['hostname'] != "") + self.assertTrue(self.token['lock'] > 0) + self.assertTrue(self.token['dirac_jobid'] is None) + + def test_unlock(self): + self.modifier.unlock(self.token) + self.assertTrue(self.token['hostname'] != "") + self.assertTrue(self.token['lock'] == 0) + + def test_close(self): + self.modifier.close(self.token) + self.assertTrue(self.token['done'] > 0) + + def test_unclose(self): + self.modifier.unclose(self.token) + self.assertTrue(self.token['done'] == 0) + + def test_scrub(self): + self.modifier.scrub(self.token) + self.assertTrue(self.token['scrub_count'] == 1) + self.modifier.scrub(self.token) + self.assertTrue(self.token['scrub_count'] == 2) + self.assertTrue(self.token['lock'] == 0) + + def test_seterror(self): + self.modifier.set_error(self.token) + self.assertTrue(self.token['lock'] == -1) + self.assertTrue(self.token['done'] == -1) + + def test_addoutput(self): + self.modifier.add_output(self.token, {"output": "test"}) + self.assertTrue(self.token['output'] == "test") diff --git a/tests/test_util.py b/tests/test_util.py index fd1b0d9..dd9c717 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -33,6 +33,8 @@ def test_empty_empty_merge(self): self.assertEqual(merge_dicts({}, {}), {}) +class TestTimer(unittest.TestCase): + def test_timer(self): timer = Timer() time.sleep(0.2)