Skip to content

Commit

Permalink
Add first examples to the notebook
Browse files Browse the repository at this point in the history
  • Loading branch information
lnauta committed Aug 16, 2024
1 parent ec8d177 commit feac0b5
Showing 1 changed file with 252 additions and 5 deletions.
257 changes: 252 additions & 5 deletions examples/examples.ipynb
Original file line number Diff line number Diff line change
@@ -1,23 +1,270 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "1e7dbcf6-9dff-46f1-8ee4-be0dc91e7130",
"metadata": {},
"source": [
"regular run: \n",
"`actor.run()` --> done \n",
"regular run with a timer in the iterator: \n",
"`actor.iterator = EndlessViewIterator` --> done\n",
"\n",
"set max tasks \n",
"`actor.run(max_tasks=2)` --> in branch SPD-410 \n",
"see only 2 tasks being taken and run\n",
"\n",
"stop after elapsed time \n",
"`actor.run(stop_function=actor.time_elapsed, elapsed=11)` --> in branch SPD-410 (through run() istead of iterator) \n",
"see that picas doesnt start a new token after 11 seconds of processing\n",
"\n",
"stop when you expect to run out of time \n",
"`actor.run(max_time=1, avg_time_factor=0.9)` --> in branch SPD-410 \n",
"add 3 tokens that sleep for 0.9 seconds and see that picas stops after 1.\n",
"\n",
"not resetting the token when killing picas \n",
"`super(ExampleActor, self).__init__(db, view=view, token_reset_values=None, **viewargs)` --> in branch SPD-409 \n",
"the opposite is the default: killing picas resets the token automatically, also needs to be shown "
]
},
{
"cell_type": "markdown",
"id": "b49cd6b9-d958-4e5a-a20d-b3c0d3da5bf4",
"metadata": {},
"source": [
"# Examples notebook\n",
"# PiCaS examples"
]
},
{
"cell_type": "markdown",
"id": "ef844679-a768-426b-a6ca-7fb5fae95023",
"metadata": {},
"source": [
"## Pushing work to the database\n",
"We need to push tokens or tasks to the CouchDB instance, so that PiCaS can fetch the work and execute it one by one.\n",
"To accomplish this, we define some functions up next, that will push lines from an input file as commands to tokens. Each line becomes a single token."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8d7513b3-c960-4975-be01-68045292e51e",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import random\n",
"import sys\n",
"import time\n",
"\n",
"import couchdb\n",
"import picasconfig"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b055b008-ecda-440d-b290-989641c09a28",
"metadata": {},
"outputs": [],
"source": [
"def getNextIndex():\n",
" \"\"\"Function to set the index sequentially, instead of default random string\"\"\"\n",
" db = get_db()\n",
" index = 0\n",
" while db.get(f\"token_{index}\") is not None:\n",
" index+=1\n",
"\n",
" return index\n",
"\n",
"def loadTokens(db):\n",
" \"\"\"Create the tokens from the given file and push to the server\"\"\"\n",
" tokens = []\n",
" tokensfile = '/home/lodewijkn/picasclient/examples/quickExample.txt' # put your own path here\n",
" with open(tokensfile) as f:\n",
" input = f.read().splitlines()\n",
"\n",
" i = getNextIndex()\n",
" for fractal in input:\n",
" token = {\n",
" '_id': 'token_' + str(i),\n",
" 'type': 'token',\n",
" 'lock': 0,\n",
" 'done': 0,\n",
" 'hostname': '',\n",
" 'scrub_count': 0,\n",
" 'input': fractal,\n",
" 'exit_code': ''\n",
" }\n",
" tokens.append(token)\n",
" i = i +1\n",
" db.update(tokens)\n",
"\n",
"def get_db():\n",
" \"\"\"Fetch the server instance\"\"\"\n",
" server = couchdb.Server(picasconfig.PICAS_HOST_URL)\n",
" username = picasconfig.PICAS_USERNAME\n",
" pwd = picasconfig.PICAS_PASSWORD\n",
" server.resource.credentials = (username,pwd)\n",
" db = server[picasconfig.PICAS_DATABASE]\n",
" return db\n",
"\n",
"print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n",
"#Create a connection to the server\n",
"db = get_db()\n",
"#Load the tokens to the database\n",
"loadTokens(db)\n",
"print(\"Tokens have been pushed.\")"
]
},
{
"cell_type": "markdown",
"id": "ce2cee42-cb49-4896-a816-3dae556965af",
"metadata": {},
"source": [
"## Processing tasks stored in the database using PiCaS classes\n",
"Next, we define a custom class that is based (inherited) on the RunActor. We need to define the \"process_task\" method to define how each token is processed. The automation is then taken case of in the base class implementation. \n",
"Of course, you are free to overwrite more parts of the class in case that is needed. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "31b18dbc-78f2-436c-b074-e60d2eef1fb1",
"metadata": {},
"outputs": [],
"source": [
"from picas.actors import RunActor\n",
"from picas.clients import CouchDB\n",
"from picas.iterators import TaskViewIterator, EndlessViewIterator\n",
"from picas.modifiers import BasicTokenModifier\n",
"from picas.executors import execute\n",
"from picas.util import Timer\n",
"\n",
"class ExampleActor(RunActor):\n",
" \"\"\"\n",
" The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.\n",
" Feel free to adjust to whatever you need, a template can be found at: example-template.py\n",
" \"\"\"\n",
" def __init__(self, db, modifier, view=\"todo\", **viewargs):\n",
" super().__init__(db, view=view, **viewargs)\n",
" self.timer = Timer()\n",
" self.modifier = modifier\n",
" self.client = db\n",
"\n",
" def process_task(self, token):\n",
" # Print token information\n",
" print(\"-----------------------\")\n",
" print(\"Working on token: \" +token['_id'])\n",
" for key, value in token.doc.items():\n",
" print(key, value)\n",
" print(\"-----------------------\")\n",
"\n",
" # Start running the main job\n",
" # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out\n",
" command = \"/usr/bin/time -v ./process_task.sh \" + \"\\\"\" +token['input'] + \"\\\" \" + token['_id'] + \" 2> logs_\" + str(token['_id']) + \".err 1> logs_\" + str(token['_id']) + \".out\"\n",
" out = execute(command, shell=True)\n",
"\n",
" ## Get the job exit code in the token\n",
" token['exit_code'] = out[0]\n",
" token = self.modifier.close(token)\n",
" \n",
" # Attach logs in token\n",
" curdate = time.strftime(\"%d/%m/%Y_%H:%M:%S_\")\n",
" try:\n",
" logsout = \"logs_\" + str(token['_id']) + \".out\"\n",
" log_handle = open(logsout, 'rb')\n",
" token.put_attachment(logsout, log_handle.read())\n",
"\n",
" logserr = \"logs_\" + str(token['_id']) + \".err\"\n",
" log_handle = open(logserr, 'rb')\n",
" token.put_attachment(logserr, log_handle.read())\n",
" except:\n",
" pass\n",
"\n",
"Please put your examples here"
"\n",
"client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)\n",
"print(f\"Connected to the database {picasconfig.PICAS_DATABASE} sucessfully. Now starting work...\")\n",
"modifier = BasicTokenModifier()\n",
"\n",
"actor = ExampleActor(client, modifier)\n",
"actor.run()"
]
},
{
"cell_type": "markdown",
"id": "9966f3b6-3459-415d-b712-09fe2beb4ced",
"metadata": {},
"source": [
"Now we want to let the Actor run indefinitely, waiting for work and starting it immediately once its found in the DB. \n",
"Such an Actor should time-out eventually, and for this we define a timer boolean function:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "77800422-0bc2-4d1a-ae66-27df484de012",
"metadata": {},
"outputs": [],
"source": [
"def time_elapsed(timer, elapsed=30.):\n",
" \"\"\"\n",
" @param timer: Timer object from the Actor class\n",
" \n",
" @param elapsed: lifetime of the Actor in seconds\n",
"\n",
" @returns: bool\n",
" \"\"\"\n",
" return timer.elapsed() > elapsed"
]
},
{
"cell_type": "markdown",
"id": "977a4503-9206-49ae-a5e0-752f7b6a7ca4",
"metadata": {},
"source": [
"This boolean is passed into the EndlessViewIterator's `stop_function`, so the Iterator knows when to stop: when the boolean becomes `True`."
]
},
{
"cell_type": "markdown",
"id": "33837de7-54d8-406d-b5d3-a056397cbf19",
"metadata": {},
"source": [
"We push some more tokens, and define a new RunActor that will go on indefinitely, except we gave it a `stop_function` to stop after some seconds of waiting."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "803680e0-4954-43db-9f75-bb4899895883",
"metadata": {},
"outputs": [],
"source": [
"print(f\"Pushing tokens to {picasconfig.PICAS_DATABASE} at {picasconfig.PICAS_HOST_URL}\")\n",
"db = get_db()\n",
"loadTokens(db)\n",
"print(\"Tokens have been pushed.\")"
]
},
{
"cell_type": "markdown",
"id": "65c8280f-70db-45ec-9260-90e903ddd957",
"metadata": {},
"source": [
"Now we overwrite the Iterator in the Actor, to use an iterator that does not stop, until `stop_function` is called. For this function we use the `time_elapsed` to stop it after 11 seconds, or two scans of the DB for work."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8e30fd48-f220-4625-96bd-41585c1aed56",
"id": "b98685ad-ab37-4f59-9144-eb93dbba6607",
"metadata": {},
"outputs": [],
"source": [
"1+1"
"actor = ExampleActor(client, modifier)\n",
"actor.iterator = EndlessViewIterator(actor.iterator, stop_callback=time_elapsed, timer=actor.timer, elapsed=11)\n",
"actor.run()"
]
}
],
Expand All @@ -37,7 +284,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
"version": "3.10.14"
}
},
"nbformat": 4,
Expand Down

0 comments on commit feac0b5

Please sign in to comment.