From feac0b5de4ec2bee12c0806a23aaac98f6b3370d Mon Sep 17 00:00:00 2001 From: Lodewijk Nauta Date: Fri, 16 Aug 2024 15:19:53 +0200 Subject: [PATCH] Add first examples to the notebook --- examples/examples.ipynb | 257 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 252 insertions(+), 5 deletions(-) diff --git a/examples/examples.ipynb b/examples/examples.ipynb index ddd787c..8291ec3 100644 --- a/examples/examples.ipynb +++ b/examples/examples.ipynb @@ -1,23 +1,270 @@ { "cells": [ + { + "cell_type": "markdown", + "id": "1e7dbcf6-9dff-46f1-8ee4-be0dc91e7130", + "metadata": {}, + "source": [ + "regular run: \n", + "`actor.run()` --> done \n", + "regular run with a timer in the iterator: \n", + "`actor.iterator = EndlessViewIterator` --> done\n", + "\n", + "set max tasks \n", + "`actor.run(max_tasks=2)` --> in branch SPD-410 \n", + "see only 2 tasks being taken and run\n", + "\n", + "stop after elapsed time \n", + "`actor.run(stop_function=actor.time_elapsed, elapsed=11)` --> in branch SPD-410 (through run() istead of iterator) \n", + "see that picas doesnt start a new token after 11 seconds of processing\n", + "\n", + "stop when you expect to run out of time \n", + "`actor.run(max_time=1, avg_time_factor=0.9)` --> in branch SPD-410 \n", + "add 3 tokens that sleep for 0.9 seconds and see that picas stops after 1.\n", + "\n", + "not resetting the token when killing picas \n", + "`super(ExampleActor, self).__init__(db, view=view, token_reset_values=None, **viewargs)` --> in branch SPD-409 \n", + "the opposite is the default: killing picas resets the token automatically, also needs to be shown " + ] + }, { "cell_type": "markdown", "id": "b49cd6b9-d958-4e5a-a20d-b3c0d3da5bf4", "metadata": {}, "source": [ - "# Examples notebook\n", + "# PiCaS examples" + ] + }, + { + "cell_type": "markdown", + "id": "ef844679-a768-426b-a6ca-7fb5fae95023", + "metadata": {}, + "source": [ + "## Pushing work to the database\n", + "We need to push tokens or tasks to the CouchDB instance, so that PiCaS can fetch the work and execute it one by one.\n", + "To accomplish this, we define some functions up next, that will push lines from an input file as commands to tokens. Each line becomes a single token." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8d7513b3-c960-4975-be01-68045292e51e", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import random\n", + "import sys\n", + "import time\n", + "\n", + "import couchdb\n", + "import picasconfig" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b055b008-ecda-440d-b290-989641c09a28", + "metadata": {}, + "outputs": [], + "source": [ + "def getNextIndex():\n", + " \"\"\"Function to set the index sequentially, instead of default random string\"\"\"\n", + " db = get_db()\n", + " index = 0\n", + " while db.get(f\"token_{index}\") is not None:\n", + " index+=1\n", + "\n", + " return index\n", + "\n", + "def loadTokens(db):\n", + " \"\"\"Create the tokens from the given file and push to the server\"\"\"\n", + " tokens = []\n", + " tokensfile = '/home/lodewijkn/picasclient/examples/quickExample.txt' # put your own path here\n", + " with open(tokensfile) as f:\n", + " input = f.read().splitlines()\n", + "\n", + " i = getNextIndex()\n", + " for fractal in input:\n", + " token = {\n", + " '_id': 'token_' + str(i),\n", + " 'type': 'token',\n", + " 'lock': 0,\n", + " 'done': 0,\n", + " 'hostname': '',\n", + " 'scrub_count': 0,\n", + " 'input': fractal,\n", + " 'exit_code': ''\n", + " }\n", + " tokens.append(token)\n", + " i = i +1\n", + " db.update(tokens)\n", + "\n", + "def get_db():\n", + " \"\"\"Fetch the server instance\"\"\"\n", + " server = couchdb.Server(picasconfig.PICAS_HOST_URL)\n", + " username = picasconfig.PICAS_USERNAME\n", + " pwd = picasconfig.PICAS_PASSWORD\n", + " server.resource.credentials = (username,pwd)\n", + " db = server[picasconfig.PICAS_DATABASE]\n", + " return db\n", + "\n", + "print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n", + "#Create a connection to the server\n", + "db = get_db()\n", + "#Load the tokens to the database\n", + "loadTokens(db)\n", + "print(\"Tokens have been pushed.\")" + ] + }, + { + "cell_type": "markdown", + "id": "ce2cee42-cb49-4896-a816-3dae556965af", + "metadata": {}, + "source": [ + "## Processing tasks stored in the database using PiCaS classes\n", + "Next, we define a custom class that is based (inherited) on the RunActor. We need to define the \"process_task\" method to define how each token is processed. The automation is then taken case of in the base class implementation. \n", + "Of course, you are free to overwrite more parts of the class in case that is needed. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "31b18dbc-78f2-436c-b074-e60d2eef1fb1", + "metadata": {}, + "outputs": [], + "source": [ + "from picas.actors import RunActor\n", + "from picas.clients import CouchDB\n", + "from picas.iterators import TaskViewIterator, EndlessViewIterator\n", + "from picas.modifiers import BasicTokenModifier\n", + "from picas.executors import execute\n", + "from picas.util import Timer\n", + "\n", + "class ExampleActor(RunActor):\n", + " \"\"\"\n", + " The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.\n", + " Feel free to adjust to whatever you need, a template can be found at: example-template.py\n", + " \"\"\"\n", + " def __init__(self, db, modifier, view=\"todo\", **viewargs):\n", + " super().__init__(db, view=view, **viewargs)\n", + " self.timer = Timer()\n", + " self.modifier = modifier\n", + " self.client = db\n", + "\n", + " def process_task(self, token):\n", + " # Print token information\n", + " print(\"-----------------------\")\n", + " print(\"Working on token: \" +token['_id'])\n", + " for key, value in token.doc.items():\n", + " print(key, value)\n", + " print(\"-----------------------\")\n", + "\n", + " # Start running the main job\n", + " # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out\n", + " command = \"/usr/bin/time -v ./process_task.sh \" + \"\\\"\" +token['input'] + \"\\\" \" + token['_id'] + \" 2> logs_\" + str(token['_id']) + \".err 1> logs_\" + str(token['_id']) + \".out\"\n", + " out = execute(command, shell=True)\n", + "\n", + " ## Get the job exit code in the token\n", + " token['exit_code'] = out[0]\n", + " token = self.modifier.close(token)\n", + " \n", + " # Attach logs in token\n", + " curdate = time.strftime(\"%d/%m/%Y_%H:%M:%S_\")\n", + " try:\n", + " logsout = \"logs_\" + str(token['_id']) + \".out\"\n", + " log_handle = open(logsout, 'rb')\n", + " token.put_attachment(logsout, log_handle.read())\n", + "\n", + " logserr = \"logs_\" + str(token['_id']) + \".err\"\n", + " log_handle = open(logserr, 'rb')\n", + " token.put_attachment(logserr, log_handle.read())\n", + " except:\n", + " pass\n", "\n", - "Please put your examples here" + "\n", + "client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)\n", + "print(f\"Connected to the database {picasconfig.PICAS_DATABASE} sucessfully. Now starting work...\")\n", + "modifier = BasicTokenModifier()\n", + "\n", + "actor = ExampleActor(client, modifier)\n", + "actor.run()" + ] + }, + { + "cell_type": "markdown", + "id": "9966f3b6-3459-415d-b712-09fe2beb4ced", + "metadata": {}, + "source": [ + "Now we want to let the Actor run indefinitely, waiting for work and starting it immediately once its found in the DB. \n", + "Such an Actor should time-out eventually, and for this we define a timer boolean function:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "77800422-0bc2-4d1a-ae66-27df484de012", + "metadata": {}, + "outputs": [], + "source": [ + "def time_elapsed(timer, elapsed=30.):\n", + " \"\"\"\n", + " @param timer: Timer object from the Actor class\n", + " \n", + " @param elapsed: lifetime of the Actor in seconds\n", + "\n", + " @returns: bool\n", + " \"\"\"\n", + " return timer.elapsed() > elapsed" + ] + }, + { + "cell_type": "markdown", + "id": "977a4503-9206-49ae-a5e0-752f7b6a7ca4", + "metadata": {}, + "source": [ + "This boolean is passed into the EndlessViewIterator's `stop_function`, so the Iterator knows when to stop: when the boolean becomes `True`." + ] + }, + { + "cell_type": "markdown", + "id": "33837de7-54d8-406d-b5d3-a056397cbf19", + "metadata": {}, + "source": [ + "We push some more tokens, and define a new RunActor that will go on indefinitely, except we gave it a `stop_function` to stop after some seconds of waiting." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "803680e0-4954-43db-9f75-bb4899895883", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n", + "db = get_db()\n", + "loadTokens(db)\n", + "print(\"Tokens have been pushed.\")" + ] + }, + { + "cell_type": "markdown", + "id": "65c8280f-70db-45ec-9260-90e903ddd957", + "metadata": {}, + "source": [ + "Now we overwrite the Iterator in the Actor, to use an iterator that does not stop, until `stop_function` is called. For this function we use the `time_elapsed` to stop it after 11 seconds, or two scans of the DB for work." ] }, { "cell_type": "code", "execution_count": null, - "id": "8e30fd48-f220-4625-96bd-41585c1aed56", + "id": "b98685ad-ab37-4f59-9144-eb93dbba6607", "metadata": {}, "outputs": [], "source": [ - "1+1" + "actor = ExampleActor(client, modifier)\n", + "actor.iterator = EndlessViewIterator(actor.iterator, stop_callback=time_elapsed, timer=actor.timer, elapsed=11)\n", + "actor.run()" ] } ], @@ -37,7 +284,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.13" + "version": "3.10.14" } }, "nbformat": 4,