Skip to content

Commit

Permalink
Issue 55 (#57)
Browse files Browse the repository at this point in the history
* added check for video file still being written. Requires tests #55

* add reboot #54. Must be done only after a task is done using a flag.

* changed reboot and made safe by waiting until processing ended #54

* flit checks #10

* fix #54 #55
  • Loading branch information
hcwinsemius authored Apr 5, 2024
1 parent 8423ab5 commit 3eaffba
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 33 deletions.
28 changes: 17 additions & 11 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.. _readme::
.. _readme:

================
NodeOpenRiverCam
Expand Down Expand Up @@ -344,7 +344,7 @@ is sufficient drive space, starting with the oldest files first. If that is not
sufficient, then results files will also be removed. If for
some reason the space goes down further, then nodeorc will shut itself down to ensure
that the compute device can still be accessed remotely. In the
:ref:`configuration <config>` section you will find that you can alter the
configuration section you will find that you can alter the
thresholds, which default to 2GB and 1GB respectively.


Expand Down Expand Up @@ -390,7 +390,7 @@ current device.

This would look as follows in the JSON-configuration file.

.. code-block::
.. code-block:: json
{
...
Expand Down Expand Up @@ -442,7 +442,7 @@ in order to ensure that there is never a timezone issue between the platform on
This file naming convention can be configured by altering the field ``video_file_fmt`` under the ``settings`` section in
the JSON file.

.. code-block::
.. code-block:: json
{
...
Expand All @@ -464,6 +464,7 @@ Change the callback url details
During setup, you will have configured the LiveORC on which the device will report
and checks in for reconfiguration. You may alter this by changing the following
settings:

* ``url``: the main url to the LiveORC server
* ``token_refresh_end_point``: normally you should never change this, unless in a
later moment in time the name of this end point changes in the LiveORC code.
Expand All @@ -474,13 +475,18 @@ settings:
refresh token is used, it also automatically expires and gets replaced by a new
refresh token.

{
"callback_url": {
"url": "http://127.0.0.1:8000",
"token_refresh_end_point": "/api/token/refresh/",
"token_refresh": "",
"token_access": ""
}
Your resulting configuration section should look like this (tokens shown are not valid and should of course be replaced
by your own token set).

.. code-block:: json
{
"callback_url": {
"url": "http://openrivercam.com",
"token_refresh_end_point": "/api/token/refresh/",
"token_refresh": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoicmVmcmVzaCIsImV4cCI6MjAyMjE2NTU5NywiaWF0IjoxNzA2ODA1NTk3LCJqdGkiOiI2YWJiODgxNGExNDA0NTFlYmE1YzQyMTMyNDM2YmUxZCIsInVzZXJfaWQiOjF9.ps1P8zA7EBRrRb2A4iA1X53D2TzSkCx1AzncIVpcdUE",
"token_access": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwiZXhwIjoxNzA2ODI3MTk3LCJpYXQiOjE3MDY4MDU1OTcsImp0aSI6IjU2MGE4NTM4MTViYzQ5ZDVhMzEwNWNhODU4NmQ2MzVmIiwidXNlcl9pZCI6MX0._ORAHl8z2bpkmP31jXfPkB_hLgEX2Rx6R5IFzcVmcyw"
}
Water level file naming format and datetime format
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
File renamed without changes.
8 changes: 7 additions & 1 deletion nodeorc/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def as_dict(self):
device_info["id"] = str(self.id)
device_info["status"] = self.status.value
device_info["form_status"] = self.form_status.value
device_info["nodeorc_version"] = self.nodeorc_version.value
device_info["nodeorc_version"] = self.nodeorc_version

return device_info

Expand Down Expand Up @@ -170,6 +170,12 @@ class Settings(BaseConfig):
comment="Flag for enabling automated shutdown after a task is performed. Must only be used if a power cycling "
"scheme is implemented and is meant to save power only."
)
reboot_after = Column(
Float,
default=0,
nullable=False,
comment="Float indicating the amount of seconds after which device reboots (0 means never reboot)"
)


def __str__(self):
Expand Down
5 changes: 3 additions & 2 deletions nodeorc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_docs_settings():
docs += """================================================\n\n"""
for f in fields:
attr_doc = getattr(db.models.ActiveConfig, f).comment
docs += " {} : {}\n\n".format(f, attr_doc)
docs += " {} : {}\n\n".format(f[:-3], attr_doc)
return docs


Expand Down Expand Up @@ -197,7 +197,8 @@ def start(storage, listen):
session=session,
callback_url=callback_url,
device=device,
logger=logger
logger=logger,
reboot_after=active_config.settings.reboot_after
)
else:
# check for a new form with one single request
Expand Down
5 changes: 3 additions & 2 deletions nodeorc/models/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def get_body(self):

def to_db(self):
from .. import db # import lazily to prevent circular imports
session_data = db.init_basedata.get_data_session()
rec = db.models.Callback(
body=self.model_dump_json()
)
db.session.add(rec)
db.session.commit()
session_data.add(rec)
session_data.commit()
1 change: 1 addition & 0 deletions nodeorc/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class Settings(BaseModel):
water_level_datetimefmt: str
allowed_dt: float
shutdown_after_task: StrictBool = False
reboot_after: float = 0

@field_validator("video_file_fmt")
@classmethod
Expand Down
150 changes: 140 additions & 10 deletions nodeorc/tasks/local_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
from urllib.parse import urljoin
from requests.exceptions import ConnectionError

from .. import models, disk_mng, db, config
from .. import models, disk_mng, db, config, utils
from . import request_task_form


device = db.session.query(db.models.Device).first()

REPLACE_ARGS = ["input_files", "output_files", "storage", "callbacks"]
Expand All @@ -39,8 +40,10 @@ def __init__(
self.disk_management = models.DiskManagement(**disk_management)
self.callback_url = models.CallbackUrl(**callback_url)
self.storage = models.Storage(**storage)
self.max_workers = max_workers
self.max_workers = max_workers # for now we always only do one job at the time
self.logger = logger
self.processing = False # state for checking if any processing is going on
self.reboot = False # state that checks if a scheduled reboot should be done
self.water_level_file_template = os.path.join(self.disk_management.water_level_path, self.settings["water_level_fmt"])
# make a list for processed files or files that are being processed so that they are not duplicated
self.processed_files = set()
Expand Down Expand Up @@ -74,19 +77,29 @@ def await_task(self):
)
# start the timer for the disk manager
disk_mng_t0 = time.time()
reboot_t0 = time.time()
get_task_form_t0 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
while not self.event.is_set():
if not os.path.isdir(self.disk_management.home_folder):
# usually happens when USB drive is removed
self.logger.info(f"Home folder {self.disk_management.home_folder} is not available, probably disk is removed. Reboot to try to find disk.")
self.logger.info(
f"Home folder {self.disk_management.home_folder} is not "
f"available, probably disk is removed. Reboot to try to find "
f"disk. "
)
os._exit(1)

file_paths = disk_mng.scan_folder(
self.disk_management.incoming_path,
suffix=self.video_file_ext
)
for file_path in file_paths:
if os.path.isfile(file_path) and file_path not in self.processed_files:
# each file is checked if it is not yet in the queue and not
# being written into
if os.path.isfile(file_path) and \
file_path not in self.processed_files and \
not utils.is_file_size_changing(file_path):
self.logger.info(f"Found file: {file_path}")
# Submit the file processing task to the thread pool
executor.submit(
Expand All @@ -97,6 +110,15 @@ def await_task(self):
# duplicated to another thread instance
self.processed_files.add(file_path)
time.sleep(5) # wait for 5 secs before re-investigating the monitored folder for new files
# do housekeeping, reboots, new task forms, disk management
if self.settings["reboot_after"] != 0:
if time.time() - reboot_t0 > max(self.settings["reboot_after"], 3600) and not self.reboot:
self.logger.info(f"Reboot scheduled after any running task after {max(self.settings['reboot_after'], 3600)} seconds")
self.reboot = True
self.reboot_now_or_not()
if time.time() - get_task_form_t0 > 300:
get_task_form_t0 = time.time()
self.get_new_task_form()
if time.time() - disk_mng_t0 > self.disk_management.frequency:
# reset the disk management t0 counter
disk_mng_t0 = time.time()
Expand All @@ -110,6 +132,7 @@ def await_task(self):
self.cleanup_space(free_space=free_space)



def cleanup_space(self, free_space):
"""
Free up space on disk by removing oldest files first.
Expand Down Expand Up @@ -161,11 +184,7 @@ def cleanup_space(self, free_space):
self.logger.warning(
f"Free space is {free_space} which is not yet under critical space {self.disk_management.critical_space}. Please contact your supplier for information.")

def process_file(
self,
file_path,
):
# before any processing, check for new task forms online
def get_new_task_form(self):
new_task_form_row = request_task_form(
session=db.session,
callback_url=self.callback_url,
Expand All @@ -176,6 +195,15 @@ def process_file(
# replace the task form template
self.task_form_template = new_task_form_row.task_body

def process_file(
self,
file_path,
):

# before any processing, check for new task forms online
self.get_new_task_form()


task_uuid = uuid.uuid4()
task_path = os.path.join(
self.disk_management.tmp_path,
Expand All @@ -184,6 +212,8 @@ def process_file(
# ensure the tmp path is in place
if not(os.path.isdir(task_path)):
os.makedirs(task_path)
# now we really start processing
self.processing = True
try:
url, filename = os.path.split(file_path)
cur_path = file_path
Expand Down Expand Up @@ -272,9 +302,15 @@ def process_file(
# once done, the file is removed from list of considered files for processing
self.processed_files.remove(file_path)
# if callbacks were successful, try to send off old callbacks that were not successful earlier
self.logger.debug("Checking for old callbacks to send")
self._post_old_callbacks()
# processing done, so set back to False
self.logger.debug("Processing done, setting processing state to False")
self.processing = False
# shutdown if configured to shutdown after task
self._shutdown_or_not()
# check if any reboots are needed and reboot
self.reboot_now_or_not()

def _set_results_to_final_path(self, cur_path, dst_path, filename, task_path):
""" move the results from temp to final destination and cleanup """
Expand Down Expand Up @@ -332,7 +368,22 @@ def _post_callbacks(self, callbacks):

return success

def reboot_now_or_not(self):
"""
Check for reboot requirement and reboot if nothing is being processed
"""
if self.reboot:
# check if any processing is happening, if not, then reboot, else wait
if not self.processing:
self.logger.info("Rebooting now")
utils.reboot_now()


def _post_old_callbacks(self):
"""
Try to post remaining non-posted callbacks and change their states in database
if successful
"""
session_data = db.init_basedata.get_data_session()
callback_records = session_data.query(db.models.Callback)
callbacks = [cb.callback for cb in callback_records]
Expand Down Expand Up @@ -364,6 +415,24 @@ def get_timestamp(
parse_from_fn,
fn_fmt,
):
"""
Find time stamp from file name using expected file name template with datetime fmt
Parameters
----------
fn : str
filename path
parse_from_fn : bool
If set to True, filename is used to parse timestamp using a filename template,
if False, timestamp is retrieved from the last change datetime of the file
fn_fmt : filename template with datetime format between {}
Returns
-------
datetime
timestamp of video file
"""
if parse_from_fn:
datetime_fmt = fn_fmt.split("{")[1].split("}")[0]
fn_template = fn_fmt.replace(datetime_fmt, "")
Expand All @@ -386,6 +455,22 @@ def get_timestamp(


def read_water_level_file(fn, fmt):
"""
Parse water level file using supplied datetime format
Parameters
----------
fn : str
water level file
fmt : str
datetime format
Returns
-------
pd.DataFrame
content of water level file
"""
date_parser = lambda x: datetime.datetime.strptime(x, fmt)
df = pd.read_csv(
fn,
Expand All @@ -405,8 +490,26 @@ def get_water_level(
file_fmt,
datetime_fmt,
allowed_dt=None,

):
"""
Get water level from file(s)
Parameters
----------
timestamp : datetime
timestamp to seek in water level file(s)
file_fmt : str
water level file template with possible datetime format in between {}
datetime_fmt : str
datetime format used inside water level files
allowed_dt : float
maximum difference between closest water level and video timestamp
Returns
-------
float
water level
"""
if "{" in file_fmt and "}" in file_fmt:
datetimefmt = file_fmt.split("{")[1].split("}")[0]
water_level_template = file_fmt.replace(datetimefmt, ":s")
Expand Down Expand Up @@ -441,6 +544,33 @@ def create_task(
h_a,
logger=logging
):
"""
Create task for specific video file from generic task form
Parameters
----------
task_form_template : dict
task form with specifics to be filled in
task_uuid : uuid
unique task id
task_path : str
path to temporary location where task is conducted
storage : permanent storage for results
filename : str
name of video file
timestamp : datetime
timestamp of video
h_a : float
actual water level during video
logger : logging
logger object
Returns
-------
Task
specific task for video
"""
task_form = copy.deepcopy(task_form_template)
# prepare input_files field in task definition
input_files = {
Expand Down
Loading

0 comments on commit 3eaffba

Please sign in to comment.