diff --git a/README.rst b/README.rst index 407e5fb..7fb018b 100644 --- a/README.rst +++ b/README.rst @@ -1,4 +1,4 @@ -.. _readme:: +.. _readme: ================ NodeOpenRiverCam @@ -344,7 +344,7 @@ is sufficient drive space, starting with the oldest files first. If that is not sufficient, then results files will also be removed. If for some reason the space goes down further, then nodeorc will shut itself down to ensure that the compute device can still be accessed remotely. In the -:ref:`configuration ` section you will find that you can alter the +configuration section you will find that you can alter the thresholds, which default to 2GB and 1GB respectively. @@ -390,7 +390,7 @@ current device. This would look as follows in the JSON-configuration file. -.. code-block:: +.. code-block:: json { ... @@ -442,7 +442,7 @@ in order to ensure that there is never a timezone issue between the platform on This file naming convention can be configured by altering the field ``video_file_fmt`` under the ``settings`` section in the JSON file. -.. code-block:: +.. code-block:: json { ... @@ -464,6 +464,7 @@ Change the callback url details During setup, you will have configured the LiveORC on which the device will report and checks in for reconfiguration. You may alter this by changing the following settings: + * ``url``: the main url to the LiveORC server * ``token_refresh_end_point``: normally you should never change this, unless in a later moment in time the name of this end point changes in the LiveORC code. @@ -474,13 +475,18 @@ settings: refresh token is used, it also automatically expires and gets replaced by a new refresh token. -{ - "callback_url": { - "url": "http://127.0.0.1:8000", - "token_refresh_end_point": "/api/token/refresh/", - "token_refresh": "", - "token_access": "" - } +Your resulting configuration section should look like this (tokens shown are not valid and should of course be replaced +by your own token set). + +.. code-block:: json + + { + "callback_url": { + "url": "http://openrivercam.com", + "token_refresh_end_point": "/api/token/refresh/", + "token_refresh": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoicmVmcmVzaCIsImV4cCI6MjAyMjE2NTU5NywiaWF0IjoxNzA2ODA1NTk3LCJqdGkiOiI2YWJiODgxNGExNDA0NTFlYmE1YzQyMTMyNDM2YmUxZCIsInVzZXJfaWQiOjF9.ps1P8zA7EBRrRb2A4iA1X53D2TzSkCx1AzncIVpcdUE", + "token_access": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwiZXhwIjoxNzA2ODI3MTk3LCJpYXQiOjE3MDY4MDU1OTcsImp0aSI6IjU2MGE4NTM4MTViYzQ5ZDVhMzEwNWNhODU4NmQ2MzVmIiwidXNlcl9pZCI6MX0._ORAHl8z2bpkmP31jXfPkB_hLgEX2Rx6R5IFzcVmcyw" + } Water level file naming format and datetime format ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docker-compose.yml b/docker-compose-test.yml similarity index 100% rename from docker-compose.yml rename to docker-compose-test.yml diff --git a/nodeorc/db/models.py b/nodeorc/db/models.py index 98ed97e..9bc1fae 100644 --- a/nodeorc/db/models.py +++ b/nodeorc/db/models.py @@ -102,7 +102,7 @@ def as_dict(self): device_info["id"] = str(self.id) device_info["status"] = self.status.value device_info["form_status"] = self.form_status.value - device_info["nodeorc_version"] = self.nodeorc_version.value + device_info["nodeorc_version"] = self.nodeorc_version return device_info @@ -170,6 +170,12 @@ class Settings(BaseConfig): comment="Flag for enabling automated shutdown after a task is performed. Must only be used if a power cycling " "scheme is implemented and is meant to save power only." ) + reboot_after = Column( + Float, + default=0, + nullable=False, + comment="Float indicating the amount of seconds after which device reboots (0 means never reboot)" + ) def __str__(self): diff --git a/nodeorc/main.py b/nodeorc/main.py index 9a1e3c6..e8a9237 100644 --- a/nodeorc/main.py +++ b/nodeorc/main.py @@ -42,7 +42,7 @@ def get_docs_settings(): docs += """================================================\n\n""" for f in fields: attr_doc = getattr(db.models.ActiveConfig, f).comment - docs += " {} : {}\n\n".format(f, attr_doc) + docs += " {} : {}\n\n".format(f[:-3], attr_doc) return docs @@ -197,7 +197,8 @@ def start(storage, listen): session=session, callback_url=callback_url, device=device, - logger=logger + logger=logger, + reboot_after=active_config.settings.reboot_after ) else: # check for a new form with one single request diff --git a/nodeorc/models/callback.py b/nodeorc/models/callback.py index 266f4a3..350d731 100644 --- a/nodeorc/models/callback.py +++ b/nodeorc/models/callback.py @@ -33,8 +33,9 @@ def get_body(self): def to_db(self): from .. import db # import lazily to prevent circular imports + session_data = db.init_basedata.get_data_session() rec = db.models.Callback( body=self.model_dump_json() ) - db.session.add(rec) - db.session.commit() \ No newline at end of file + session_data.add(rec) + session_data.commit() \ No newline at end of file diff --git a/nodeorc/models/config.py b/nodeorc/models/config.py index 72a908f..4f0e424 100644 --- a/nodeorc/models/config.py +++ b/nodeorc/models/config.py @@ -87,6 +87,7 @@ class Settings(BaseModel): water_level_datetimefmt: str allowed_dt: float shutdown_after_task: StrictBool = False + reboot_after: float = 0 @field_validator("video_file_fmt") @classmethod diff --git a/nodeorc/tasks/local_task.py b/nodeorc/tasks/local_task.py index 671db4a..f693dca 100644 --- a/nodeorc/tasks/local_task.py +++ b/nodeorc/tasks/local_task.py @@ -14,9 +14,10 @@ from urllib.parse import urljoin from requests.exceptions import ConnectionError -from .. import models, disk_mng, db, config +from .. import models, disk_mng, db, config, utils from . import request_task_form + device = db.session.query(db.models.Device).first() REPLACE_ARGS = ["input_files", "output_files", "storage", "callbacks"] @@ -39,8 +40,10 @@ def __init__( self.disk_management = models.DiskManagement(**disk_management) self.callback_url = models.CallbackUrl(**callback_url) self.storage = models.Storage(**storage) - self.max_workers = max_workers + self.max_workers = max_workers # for now we always only do one job at the time self.logger = logger + self.processing = False # state for checking if any processing is going on + self.reboot = False # state that checks if a scheduled reboot should be done self.water_level_file_template = os.path.join(self.disk_management.water_level_path, self.settings["water_level_fmt"]) # make a list for processed files or files that are being processed so that they are not duplicated self.processed_files = set() @@ -74,11 +77,17 @@ def await_task(self): ) # start the timer for the disk manager disk_mng_t0 = time.time() + reboot_t0 = time.time() + get_task_form_t0 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: while not self.event.is_set(): if not os.path.isdir(self.disk_management.home_folder): # usually happens when USB drive is removed - self.logger.info(f"Home folder {self.disk_management.home_folder} is not available, probably disk is removed. Reboot to try to find disk.") + self.logger.info( + f"Home folder {self.disk_management.home_folder} is not " + f"available, probably disk is removed. Reboot to try to find " + f"disk. " + ) os._exit(1) file_paths = disk_mng.scan_folder( @@ -86,7 +95,11 @@ def await_task(self): suffix=self.video_file_ext ) for file_path in file_paths: - if os.path.isfile(file_path) and file_path not in self.processed_files: + # each file is checked if it is not yet in the queue and not + # being written into + if os.path.isfile(file_path) and \ + file_path not in self.processed_files and \ + not utils.is_file_size_changing(file_path): self.logger.info(f"Found file: {file_path}") # Submit the file processing task to the thread pool executor.submit( @@ -97,6 +110,15 @@ def await_task(self): # duplicated to another thread instance self.processed_files.add(file_path) time.sleep(5) # wait for 5 secs before re-investigating the monitored folder for new files + # do housekeeping, reboots, new task forms, disk management + if self.settings["reboot_after"] != 0: + if time.time() - reboot_t0 > max(self.settings["reboot_after"], 3600) and not self.reboot: + self.logger.info(f"Reboot scheduled after any running task after {max(self.settings['reboot_after'], 3600)} seconds") + self.reboot = True + self.reboot_now_or_not() + if time.time() - get_task_form_t0 > 300: + get_task_form_t0 = time.time() + self.get_new_task_form() if time.time() - disk_mng_t0 > self.disk_management.frequency: # reset the disk management t0 counter disk_mng_t0 = time.time() @@ -110,6 +132,7 @@ def await_task(self): self.cleanup_space(free_space=free_space) + def cleanup_space(self, free_space): """ Free up space on disk by removing oldest files first. @@ -161,11 +184,7 @@ def cleanup_space(self, free_space): self.logger.warning( f"Free space is {free_space} which is not yet under critical space {self.disk_management.critical_space}. Please contact your supplier for information.") - def process_file( - self, - file_path, - ): - # before any processing, check for new task forms online + def get_new_task_form(self): new_task_form_row = request_task_form( session=db.session, callback_url=self.callback_url, @@ -176,6 +195,15 @@ def process_file( # replace the task form template self.task_form_template = new_task_form_row.task_body + def process_file( + self, + file_path, + ): + + # before any processing, check for new task forms online + self.get_new_task_form() + + task_uuid = uuid.uuid4() task_path = os.path.join( self.disk_management.tmp_path, @@ -184,6 +212,8 @@ def process_file( # ensure the tmp path is in place if not(os.path.isdir(task_path)): os.makedirs(task_path) + # now we really start processing + self.processing = True try: url, filename = os.path.split(file_path) cur_path = file_path @@ -272,9 +302,15 @@ def process_file( # once done, the file is removed from list of considered files for processing self.processed_files.remove(file_path) # if callbacks were successful, try to send off old callbacks that were not successful earlier + self.logger.debug("Checking for old callbacks to send") self._post_old_callbacks() + # processing done, so set back to False + self.logger.debug("Processing done, setting processing state to False") + self.processing = False # shutdown if configured to shutdown after task self._shutdown_or_not() + # check if any reboots are needed and reboot + self.reboot_now_or_not() def _set_results_to_final_path(self, cur_path, dst_path, filename, task_path): """ move the results from temp to final destination and cleanup """ @@ -332,7 +368,22 @@ def _post_callbacks(self, callbacks): return success + def reboot_now_or_not(self): + """ + Check for reboot requirement and reboot if nothing is being processed + """ + if self.reboot: + # check if any processing is happening, if not, then reboot, else wait + if not self.processing: + self.logger.info("Rebooting now") + utils.reboot_now() + + def _post_old_callbacks(self): + """ + Try to post remaining non-posted callbacks and change their states in database + if successful + """ session_data = db.init_basedata.get_data_session() callback_records = session_data.query(db.models.Callback) callbacks = [cb.callback for cb in callback_records] @@ -364,6 +415,24 @@ def get_timestamp( parse_from_fn, fn_fmt, ): + """ + Find time stamp from file name using expected file name template with datetime fmt + + Parameters + ---------- + fn : str + filename path + parse_from_fn : bool + If set to True, filename is used to parse timestamp using a filename template, + if False, timestamp is retrieved from the last change datetime of the file + fn_fmt : filename template with datetime format between {} + + Returns + ------- + datetime + timestamp of video file + + """ if parse_from_fn: datetime_fmt = fn_fmt.split("{")[1].split("}")[0] fn_template = fn_fmt.replace(datetime_fmt, "") @@ -386,6 +455,22 @@ def get_timestamp( def read_water_level_file(fn, fmt): + """ + Parse water level file using supplied datetime format + + Parameters + ---------- + fn : str + water level file + fmt : str + datetime format + + Returns + ------- + pd.DataFrame + content of water level file + + """ date_parser = lambda x: datetime.datetime.strptime(x, fmt) df = pd.read_csv( fn, @@ -405,8 +490,26 @@ def get_water_level( file_fmt, datetime_fmt, allowed_dt=None, - ): + """ + Get water level from file(s) + + Parameters + ---------- + timestamp : datetime + timestamp to seek in water level file(s) + file_fmt : str + water level file template with possible datetime format in between {} + datetime_fmt : str + datetime format used inside water level files + allowed_dt : float + maximum difference between closest water level and video timestamp + + Returns + ------- + float + water level + """ if "{" in file_fmt and "}" in file_fmt: datetimefmt = file_fmt.split("{")[1].split("}")[0] water_level_template = file_fmt.replace(datetimefmt, ":s") @@ -441,6 +544,33 @@ def create_task( h_a, logger=logging ): + """ + Create task for specific video file from generic task form + + Parameters + ---------- + task_form_template : dict + task form with specifics to be filled in + task_uuid : uuid + unique task id + task_path : str + path to temporary location where task is conducted + storage : permanent storage for results + filename : str + name of video file + timestamp : datetime + timestamp of video + h_a : float + actual water level during video + logger : logging + logger object + + Returns + ------- + Task + specific task for video + + """ task_form = copy.deepcopy(task_form_template) # prepare input_files field in task definition input_files = { diff --git a/nodeorc/tasks/task_form.py b/nodeorc/tasks/task_form.py index f9b3623..9679c86 100644 --- a/nodeorc/tasks/task_form.py +++ b/nodeorc/tasks/task_form.py @@ -5,18 +5,20 @@ import os import requests import time +import traceback + from urllib.parse import urljoin import uuid import json from ..models import Task from ..db.models import TaskForm, TaskFormStatus, DeviceFormStatus -from .. import __home__ +from .. import utils, __home__ config_file = os.path.join(__home__, "task_form.json") -def wait_for_task_form(session, callback_url, device, timeout=5, logger=logging): +def wait_for_task_form(session, callback_url, device, timeout=300, reboot_after=0., logger=logging): """ Keep looking for a task form for the device, remotely and locally. When found, the service will shutdown and auto-reboot if running as service. @@ -33,7 +35,7 @@ def wait_for_task_form(session, callback_url, device, timeout=5, logger=logging) logger of nodeorc """ - + reboot_t0 = time.time() while True: # keep on trying to get a new task form from configured server until successful task_form = request_task_form(session, callback_url, device, logger=logger) @@ -44,7 +46,9 @@ def wait_for_task_form(session, callback_url, device, timeout=5, logger=logging) # new task form found, reboot service logger.info("New task form setup. Reboot service...") os._exit(0) - + if time.time() - reboot_t0 > reboot_after: + logger.info(f"Rebooting device after {reboot_after} seconds") + utils.reboot_now() time.sleep(timeout) def request_task_form(session, callback_url, device, logger=logging): @@ -161,6 +165,7 @@ def request_remote_task_form(session, callback_url, device, logger=logging): except Exception as e: logger.error(f"Could not connect to server {callback_url.url}, with following information: {e}. check your connection...") + traceback.print_exc() return None diff --git a/nodeorc/utils.py b/nodeorc/utils.py index cbfb0db..d12962e 100644 --- a/nodeorc/utils.py +++ b/nodeorc/utils.py @@ -1,6 +1,7 @@ import boto3 import os import logging +import time def check_bucket(s3, bucket_name): try: @@ -66,4 +67,37 @@ def upload_io(obj, bucket, dest=None, logger=logging): r = bucket.upload_fileobj(obj, dest) logger.info(f"{bucket}/{dest} created") - return r \ No newline at end of file + return r + + +def is_file_size_changing(fn, delay=1): + """ + Check if the file size changes over a certain amount of time. Can be used to check + if a file is being written into by another process. + + Parameters + ---------- + fn : str + path to file + delay : float + amount of delay time to check if file size changes + + Returns + ------- + bool + True (False) if file does (not) change + + """ + if not(os.path.isfile(fn)): + raise IOError(f"File {fn} does not exist") + # check if file is being written into, by checking changes in file size over a delay + size1 = os.path.getsize(fn) + time.sleep(delay) + if size1 != os.path.getsize(fn): + return True + else: + return False + + +def reboot_now(): + os.system("/sbin/shutdown -r now") diff --git a/pyproject.toml b/pyproject.toml index 17f9dca..c4d34ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ ] requires-python =">=3.9" -readme = "README.md" +readme = "README.rst" classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", diff --git a/settings/config_template.json b/settings/config_template.json index a6d6e57..244e86a 100644 --- a/settings/config_template.json +++ b/settings/config_template.json @@ -15,7 +15,9 @@ "water_level_fmt": "all_levels.txt", "water_level_datetimefmt": "%Y%m%d_%H%M%S", "allowed_dt": 3600, - "shutdown_after_task": false + "shutdown_after_task": false, + "reboot_after": 86400 + }, "disk_management": { "home_folder": "",