Skip to content

Commit

Permalink
Merge branch 'master' into SPD-410
Browse files Browse the repository at this point in the history
  • Loading branch information
lnauta authored Sep 3, 2024
2 parents 5284b8f + 714da43 commit feb96d4
Show file tree
Hide file tree
Showing 24 changed files with 589 additions and 148 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]



steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*.py[co]
.DS_Store

# Packages
*.egg
Expand Down
292 changes: 292 additions & 0 deletions examples/examples.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "1e7dbcf6-9dff-46f1-8ee4-be0dc91e7130",
"metadata": {},
"source": [
"regular run: \n",
"`actor.run()` --> done \n",
"regular run with a timer in the iterator: \n",
"`actor.iterator = EndlessViewIterator` --> done\n",
"\n",
"set max tasks \n",
"`actor.run(max_tasks=2)` --> in branch SPD-410 \n",
"see only 2 tasks being taken and run\n",
"\n",
"stop after elapsed time \n",
"`actor.run(stop_function=actor.time_elapsed, elapsed=11)` --> in branch SPD-410 (through run() istead of iterator) \n",
"see that picas doesnt start a new token after 11 seconds of processing\n",
"\n",
"stop when you expect to run out of time \n",
"`actor.run(max_time=1, avg_time_factor=0.9)` --> in branch SPD-410 \n",
"add 3 tokens that sleep for 0.9 seconds and see that picas stops after 1.\n",
"\n",
"not resetting the token when killing picas \n",
"`super(ExampleActor, self).__init__(db, view=view, token_reset_values=None, **viewargs)` --> in branch SPD-409 \n",
"the opposite is the default: killing picas resets the token automatically, also needs to be shown "
]
},
{
"cell_type": "markdown",
"id": "b49cd6b9-d958-4e5a-a20d-b3c0d3da5bf4",
"metadata": {},
"source": [
"# PiCaS examples"
]
},
{
"cell_type": "markdown",
"id": "ef844679-a768-426b-a6ca-7fb5fae95023",
"metadata": {},
"source": [
"## Pushing work to the database\n",
"We need to push tokens or tasks to the CouchDB instance, so that PiCaS can fetch the work and execute it one by one.\n",
"To accomplish this, we define some functions up next, that will push lines from an input file as commands to tokens. Each line becomes a single token."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8d7513b3-c960-4975-be01-68045292e51e",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import random\n",
"import sys\n",
"import time\n",
"\n",
"import couchdb\n",
"import picasconfig"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b055b008-ecda-440d-b290-989641c09a28",
"metadata": {},
"outputs": [],
"source": [
"def getNextIndex():\n",
" \"\"\"Function to set the index sequentially, instead of default random string\"\"\"\n",
" db = get_db()\n",
" index = 0\n",
" while db.get(f\"token_{index}\") is not None:\n",
" index+=1\n",
"\n",
" return index\n",
"\n",
"def loadTokens(db):\n",
" \"\"\"Create the tokens from the given file and push to the server\"\"\"\n",
" tokens = []\n",
" tokensfile = '/home/lodewijkn/picasclient/examples/quickExample.txt' # put your own path here\n",
" with open(tokensfile) as f:\n",
" input = f.read().splitlines()\n",
"\n",
" i = getNextIndex()\n",
" for fractal in input:\n",
" token = {\n",
" '_id': 'token_' + str(i),\n",
" 'type': 'token',\n",
" 'lock': 0,\n",
" 'done': 0,\n",
" 'hostname': '',\n",
" 'scrub_count': 0,\n",
" 'input': fractal,\n",
" 'exit_code': ''\n",
" }\n",
" tokens.append(token)\n",
" i = i +1\n",
" db.update(tokens)\n",
"\n",
"def get_db():\n",
" \"\"\"Fetch the server instance\"\"\"\n",
" server = couchdb.Server(picasconfig.PICAS_HOST_URL)\n",
" username = picasconfig.PICAS_USERNAME\n",
" pwd = picasconfig.PICAS_PASSWORD\n",
" server.resource.credentials = (username,pwd)\n",
" db = server[picasconfig.PICAS_DATABASE]\n",
" return db\n",
"\n",
"print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n",
"#Create a connection to the server\n",
"db = get_db()\n",
"#Load the tokens to the database\n",
"loadTokens(db)\n",
"print(\"Tokens have been pushed.\")"
]
},
{
"cell_type": "markdown",
"id": "ce2cee42-cb49-4896-a816-3dae556965af",
"metadata": {},
"source": [
"## Processing tasks stored in the database using PiCaS classes\n",
"Next, we define a custom class that is based (inherited) on the RunActor. We need to define the \"process_task\" method to define how each token is processed. The automation is then taken case of in the base class implementation. \n",
"Of course, you are free to overwrite more parts of the class in case that is needed. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "31b18dbc-78f2-436c-b074-e60d2eef1fb1",
"metadata": {},
"outputs": [],
"source": [
"from picas.actors import RunActor\n",
"from picas.clients import CouchDB\n",
"from picas.iterators import TaskViewIterator, EndlessViewIterator\n",
"from picas.modifiers import BasicTokenModifier\n",
"from picas.executers import execute\n",
"from picas.util import Timer\n",
"\n",
"class ExampleActor(RunActor):\n",
" \"\"\"\n",
" The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.\n",
" Feel free to adjust to whatever you need, a template can be found at: example-template.py\n",
" \"\"\"\n",
" def __init__(self, db, modifier, view=\"todo\", **viewargs):\n",
" super().__init__(db, view=view, **viewargs)\n",
" self.timer = Timer()\n",
" self.modifier = modifier\n",
" self.client = db\n",
"\n",
" def process_task(self, token):\n",
" # Print token information\n",
" print(\"-----------------------\")\n",
" print(\"Working on token: \" +token['_id'])\n",
" for key, value in token.doc.items():\n",
" print(key, value)\n",
" print(\"-----------------------\")\n",
"\n",
" # Start running the main job\n",
" # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out\n",
" command = \"/usr/bin/time -v ./process_task.sh \" + \"\\\"\" +token['input'] + \"\\\" \" + token['_id'] + \" 2> logs_\" + str(token['_id']) + \".err 1> logs_\" + str(token['_id']) + \".out\"\n",
" out = execute(command, shell=True)\n",
"\n",
" ## Get the job exit code in the token\n",
" token['exit_code'] = out[0]\n",
" token = self.modifier.close(token)\n",
" \n",
" # Attach logs in token\n",
" curdate = time.strftime(\"%d/%m/%Y_%H:%M:%S_\")\n",
" try:\n",
" logsout = \"logs_\" + str(token['_id']) + \".out\"\n",
" log_handle = open(logsout, 'rb')\n",
" token.put_attachment(logsout, log_handle.read())\n",
"\n",
" logserr = \"logs_\" + str(token['_id']) + \".err\"\n",
" log_handle = open(logserr, 'rb')\n",
" token.put_attachment(logserr, log_handle.read())\n",
" except:\n",
" pass\n",
"\n",
"\n",
"client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)\n",
"print(f\"Connected to the database {picasconfig.PICAS_DATABASE} sucessfully. Now starting work...\")\n",
"modifier = BasicTokenModifier()\n",
"\n",
"actor = ExampleActor(client, modifier)\n",
"actor.run()"
]
},
{
"cell_type": "markdown",
"id": "9966f3b6-3459-415d-b712-09fe2beb4ced",
"metadata": {},
"source": [
"Now we want to let the Actor run indefinitely, waiting for work and starting it immediately once its found in the DB. \n",
"Such an Actor should time-out eventually, and for this we define a timer boolean function:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "77800422-0bc2-4d1a-ae66-27df484de012",
"metadata": {},
"outputs": [],
"source": [
"def time_elapsed(timer, elapsed=30.):\n",
" \"\"\"\n",
" @param timer: Timer object from the Actor class\n",
" \n",
" @param elapsed: lifetime of the Actor in seconds\n",
"\n",
" @returns: bool\n",
" \"\"\"\n",
" return timer.elapsed() > elapsed"
]
},
{
"cell_type": "markdown",
"id": "977a4503-9206-49ae-a5e0-752f7b6a7ca4",
"metadata": {},
"source": [
"This boolean is passed into the EndlessViewIterator's `stop_function`, so the Iterator knows when to stop: when the boolean becomes `True`."
]
},
{
"cell_type": "markdown",
"id": "33837de7-54d8-406d-b5d3-a056397cbf19",
"metadata": {},
"source": [
"We push some more tokens, and define a new RunActor that will go on indefinitely, except we gave it a `stop_function` to stop after some seconds of waiting."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "803680e0-4954-43db-9f75-bb4899895883",
"metadata": {},
"outputs": [],
"source": [
"print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n",
"db = get_db()\n",
"loadTokens(db)\n",
"print(\"Tokens have been pushed.\")"
]
},
{
"cell_type": "markdown",
"id": "65c8280f-70db-45ec-9260-90e903ddd957",
"metadata": {},
"source": [
"Now we overwrite the Iterator in the Actor, to use an iterator that does not stop, until `stop_function` is called. For this function we use the `time_elapsed` to stop it after 11 seconds, or two scans of the DB for work."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b98685ad-ab37-4f59-9144-eb93dbba6607",
"metadata": {},
"outputs": [],
"source": [
"actor = ExampleActor(client, modifier)\n",
"actor.iterator = EndlessViewIterator(actor.iterator, stop_callback=time_elapsed, timer=actor.timer, elapsed=11)\n",
"actor.run()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
12 changes: 1 addition & 11 deletions picas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,14 @@
"""

import logging
from .documents import Document, Task, Job, User
from .clients import CouchDB
from .iterators import (ViewIterator, TaskViewIterator, EndlessViewIterator,
PrioritizedViewIterator)
from .actors import RunActor


version = "0.3.0"

picaslogger = logging.getLogger("PiCaS")
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
picaslogger.addHandler(ch)
picaslogger.setLevel(logging.ERROR)
VERSION = "0.3.0"

__all__ = [
'CouchDB',
Expand Down
9 changes: 3 additions & 6 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
@Copyright (c) 2016, Jan Bot
@author: Jan Bot, Joris Borgdorff
"""

import logging
import signal
import subprocess
Expand Down Expand Up @@ -141,14 +142,12 @@ def prepare_env(self, *args, **kwargs):
Method to be called to prepare the environment to run the
application.
"""
pass

def prepare_run(self, *args, **kwargs):
"""
Code to run before a task gets processed. Used e.g. for fetching
inputs.
"""
pass

def process_task(self, task):
"""
Expand All @@ -161,15 +160,13 @@ def cleanup_run(self, *args, **kwargs):
"""
Code to run after a task has been processed.
"""
pass

def cleanup_env(self, *args, **kwargs):
"""
Method which gets called after the run method has completed.
"""
pass



class RunActorWithStop(RunActor):
"""
RunActor class with added stopping functionality.
Expand Down Expand Up @@ -225,4 +222,4 @@ def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=Non
break
self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()
self.cleanup_env()
Loading

0 comments on commit feb96d4

Please sign in to comment.