diff --git a/.gitignore b/.gitignore index 6985cf1..35dea2c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,164 @@ -# Generated by Cargo -# will have compiled files and executables -debug/ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ target/ -# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries -# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html -Cargo.lock +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ -# These are backup files generated by rustfmt -**/*.rs.bk +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +.idea/ -# MSVC Windows builds of rustc generate these, which store debugging information -*.pdb +*.png diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2a4fe14 --- /dev/null +++ b/Makefile @@ -0,0 +1,24 @@ +init: + pip install -r requirements/common.txt + +init-dev: + pip install -r requirements/dev.txt + +run-unify: + unify --in-place --recursive . + +run-black: + black . + +run-isort: + isort . + +reformat: run-unify run-black run-isort + +test: + pytest tests + +git-add: + git add . + +before-commit: reformat test git-add diff --git a/README.md b/README.md index 013b709..07ae88c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,129 @@ # Watchdog -This service watches the health of pipeline by monitoring ingress and egress queue lengths +This service watches the health of pipeline by monitoring one or more buffers in parallel. +It will stop or restart designated pipeline services if the buffer queue length exceeds a threshold value or the time since the last output or input message exceeds a specified time. +Queue monitoring helps detect the slow processing of messages, and ingress and egress monitoring is helpful in detecting how pipeline services are processing messages. +In other words, the service can detect if the pipeline is not processing messages at the expected rate or if the pipeline is not processing messages at all. + +## Configuration + +The watchdog service is configured using the following environment variables: +* `CONFIG_FILE_PATH` - The path to the configuration file. Required. +* `LOGLEVEL` - The log level for the service. Default is `INFO`. + +Configuration file is YAML file with the following structure: +```yaml +watch: + - buffer: + queue: + action: + length: + cooldown: + polling_interval: + container: + - labels: [] + # other labels + egress: + action: + idle: + cooldown: + polling_interval: + container: + - labels: [] + ingress: + action: + idle: + cooldown: + polling_interval: + container: + - labels: [] + # other labels + # other buffers +``` + +Where: +* `buffer` - url of the buffer to watch. +* `queue` - configuration for the buffer queue. Optional. + * `action` - action to take when the queue length exceeds the length threshold. It can be `restart` or `stop`. + * `length` - threshold length for the queue. + * `cooldown` - interval in seconds to wait after applying the action. + * `polling_interval` - interval in seconds to check the queue length. + * `container` - list of labels to match for the action. Actions are performed on containers that match any of the label sets. + * `labels` - one or more labels to match on the same container, i.e. the container must have all labels. +* `ingress` or `egress` - configuration for the input or output traffic of the buffer. Optional. + * `action` - action to take when the time since the last input or output message exceeds the idle threshold. It can be `restart` or `stop`. + * `idle` - threshold time in seconds since the last input or output message. + * `cooldown` - interval in seconds to wait after applying the action. + * `polling_interval` - interval in seconds between buffer traffic checks. Optional. Default equals to `idle`. + * `container` - list of labels to match for the action. Actions are performed on containers that match any of the label sets. + * `labels` - one or more labels to match on the same container, i.e. the container must have all labels. + +**Note**: For each buffer, at least one of the `queue`, `ingress`, or `egress` sections must be present. + +You can find an example configuration file in the [samples](samples/pipeline_monitoring/config.yml) folder. + +### Interpolation + +The configuration file supports variable interpolation. You can use a path to another node or environment variable in the configuration file by wrapping it in `${}`. For example: +* `${oc.env:BUFFER_URL}` - will be replaced with the value of the `BUFFER_URL` environment variable. +* `${.idle}` - will be replaced with the value of the `idle` key in the same section. + +For more information, refer to the [OmegaConf documentation](https://omegaconf.readthedocs.io/en/2.3_branch/usage.html#variable-interpolation). + + +## Sample + +The sample demonstrates how to start the watchdog service with an example pipeline to watch the buffer and restart the SDK client based on configuration and buffer state. + +### Run + +This sample is designed to run on x86 architecture only. + +```bash +docker compose -f samples/pipeline_monitoring/docker-compose.yml up --build -d +``` + +### Check + +After starting the pipeline, you can check the logs of the client container: + +```bash +docker logs -f pipeline_monitoring-client-1 +``` + +When the client stops processing messages for more than `egress.idle` seconds (see [config](samples/pipeline_monitoring/config.yml)) +you will see the following logs in the client container, and the container itself will be restarted: + +``` +Traceback (most recent call last): + File "/opt/savant/src/client.py", line 52, in + main() + File "/opt/savant/src/client.py", line 37, in main + time.sleep(sleep_duration) +KeyboardInterrupt +``` + +### Stop + +```bash +docker compose -f samples/pipeline_monitoring/docker-compose.yml down +``` + +## Development + +### Install requirements + +```bash +make init-dev +``` + +### Format code + +```bash +make reformat +``` + +### Run tests + +```bash +make test +``` diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..29027c9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,8 @@ +[tool.black] +skip-string-normalization = true + +[tool.pylint.messages_control] +max-line-length = 88 + +[tool.isort] +profile = "black" diff --git a/requirements/common.txt b/requirements/common.txt new file mode 100644 index 0000000..541785d --- /dev/null +++ b/requirements/common.txt @@ -0,0 +1,4 @@ +aiohttp~=3.9.5 +aiodocker~=0.22.1 +typing-extensions~=4.12.2 +omegaconf~=2.3.0 diff --git a/requirements/dev.txt b/requirements/dev.txt new file mode 100644 index 0000000..90c6ff6 --- /dev/null +++ b/requirements/dev.txt @@ -0,0 +1,6 @@ +-r common.txt +black~=24.4.2 +unify~=0.5 +pytest~=8.2.2 +pytest-asyncio~=0.23.8 +isort~=5.13.2 diff --git a/samples/pipeline_monitoring/Dockerfile b/samples/pipeline_monitoring/Dockerfile new file mode 100644 index 0000000..77979bb --- /dev/null +++ b/samples/pipeline_monitoring/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.10.12-slim + +COPY requirements/common.txt requirements.txt +RUN python -m pip install --no-cache-dir -r requirements.txt && \ + rm -rf requirements.txt + +WORKDIR /app +COPY src/ src/ + +ENV PYTHONPATH /app + +ENTRYPOINT ["python", "src/pipeline_watchdog/run.py"] diff --git a/samples/pipeline_monitoring/client.py b/samples/pipeline_monitoring/client.py new file mode 100755 index 0000000..fc460c8 --- /dev/null +++ b/samples/pipeline_monitoring/client.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +import os +import random +import signal +import sys +import time + +from savant.client import SinkBuilder +from savant.utils import logging + + +def main(): + # To gracefully shut down the adapter on SIGTERM (raise KeyboardInterrupt) + signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) + + logging.init_logging() + logger = logging.get_logger('Client') + + zmq_sink_endpoint = os.environ.get('ZMQ_SINK_ENDPOINT') + min_sleep = int(os.environ.get('MIN_SLEEP', 0)) + max_sleep = int(os.environ.get('MAX_SLEEP', 200)) + + if not zmq_sink_endpoint: + sys.exit( + 'ZMQ_SINK_ENDPOINT is not set. Please provide the ZMQ_SINK_ENDPOINT in the environment variable.' + ) + + # Build the sink + sink = SinkBuilder().with_socket(zmq_sink_endpoint).build() + + for result in sink: + sleep_duration = random.uniform(min_sleep, max_sleep) + if sleep_duration > 0: + logger.info('Stop processing for %s seconds', sleep_duration) + time.sleep(sleep_duration) + + if result is not None: + logger.debug( + 'Received frame %s/%s (keyframe=%s)', + result.frame_meta.source_id, + result.frame_meta.pts, + result.frame_meta.keyframe, + ) + else: + source_id = result.eos.source_id + logger.debug('Received EOS for source %s', source_id) + + +if __name__ == '__main__': + main() diff --git a/samples/pipeline_monitoring/config.yml b/samples/pipeline_monitoring/config.yml new file mode 100644 index 0000000..5594cd1 --- /dev/null +++ b/samples/pipeline_monitoring/config.yml @@ -0,0 +1,22 @@ +watch: + - buffer: 127.0.0.1:8000 + queue: + action: stop + length: 999 + cooldown: 60s + polling_interval: 10s + container: + - labels: [queue-client-label1, queue-client-label2=2] + egress: + action: restart + cooldown: 60s + idle: 100s + container: + - labels: egress-client-label=egress-client-value + - labels: some-label + ingress: + action: restart + cooldown: 30s + idle: 60s + container: + - labels: ingress-client-label=3 diff --git a/samples/pipeline_monitoring/docker-compose.yml b/samples/pipeline_monitoring/docker-compose.yml new file mode 100644 index 0000000..da60cca --- /dev/null +++ b/samples/pipeline_monitoring/docker-compose.yml @@ -0,0 +1,79 @@ +version: "3.3" +services: + + video-loop-source: + image: ghcr.io/insight-platform/savant-adapters-gstreamer:latest + restart: unless-stopped + volumes: + - /tmp/zmq-sockets:/tmp/zmq-sockets + - /tmp/video-loop-source-downloads:/tmp/video-loop-source-downloads + environment: + - LOCATION=https://eu-central-1.linodeobjects.com/savant-data/demo/Free_City_Street_Footage.mp4 + - DOWNLOAD_PATH=/tmp/video-loop-source-downloads + - ZMQ_ENDPOINT=pub+connect:ipc:///tmp/zmq-sockets/buffer.ipc + - SOURCE_ID=city-traffic + - SYNC_OUTPUT=True + entrypoint: /opt/savant/adapters/gst/sources/video_loop.sh + labels: + ingress-client-label: 3 + depends_on: + buffer: + condition: service_started + + buffer: + image: ghcr.io/insight-platform/savant-adapters-py:latest + restart: unless-stopped + ports: + - "8000:8000" + volumes: + - /tmp/zmq-sockets:/tmp/zmq-sockets + - /tmp/savant-adapter/buffer:/tmp/savant-adapter/buffer + environment: + - ZMQ_SRC_ENDPOINT=sub+bind:ipc:///tmp/zmq-sockets/buffer.ipc + - ZMQ_SINK_ENDPOINT=dealer+connect:ipc:///tmp/zmq-sockets/input-video.ipc + - BUFFER_PATH=/tmp/savant-adapter/buffer + - BUFFER_LEN=1000 + - INTERVAL=0.1 + - METRICS_FRAME_PERIOD=1000 + - METRICS_TIME_PERIOD=10 + - STATS_LOG_INTERVAL=10 + - METRICS_PROVIDER=prometheus + - METRICS_PROVIDER_PARAMS={"port":8000, "labels":{"adapter":"buffer"}} + command: python -m adapters.python.bridge.buffer + + client: + image: ghcr.io/insight-platform/savant-adapters-py:latest + environment: + - ZMQ_SINK_ENDPOINT=router+bind:ipc:///tmp/zmq-sockets/input-video.ipc + - LOGLEVEL=INFO + # uncomment the following lines to define the range for the random idle time (in seconds) + #- MIN_SLEEP=0 + #- MAX_SLEEP=100 + labels: + queue-client-label1: 1 + queue-client-label2: 2 + egress-client-label: egress-client-value + volumes: + - /tmp/zmq-sockets:/tmp/zmq-sockets + - ./client.py:/opt/savant/src/client.py + entrypoint: python /opt/savant/src/client.py + depends_on: + buffer: + condition: service_started + + pipeline-watchdog: + container_name: pipeline-watchdog + build: + context: ../.. + dockerfile: ./samples/pipeline_monitoring/Dockerfile + restart: unless-stopped + network_mode: host + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./config.yml:/app/config.yml + environment: + - LOGLEVEL=INFO + - CONFIG_FILE_PATH=/app/config.yml + depends_on: + client: + condition: service_started diff --git a/src/pipeline_watchdog/__init__.py b/src/pipeline_watchdog/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pipeline_watchdog/buffer_metrics.py b/src/pipeline_watchdog/buffer_metrics.py new file mode 100644 index 0000000..9d4e105 --- /dev/null +++ b/src/pipeline_watchdog/buffer_metrics.py @@ -0,0 +1,26 @@ +import re +from typing import Dict + +import aiohttp + +METRIC_PATTERN = re.compile(r'(\w+){[^}]*} ([0-9.e+-]+) \d+') + + +async def get_metrics(buffer_url: str) -> str: + async with aiohttp.ClientSession() as session: + async with session.get(f'http://{buffer_url}/metrics') as response: + content = await response.text() + return content + + +async def parse_metrics(content: str) -> Dict[str, float]: + metrics = {} + + try: + for match in METRIC_PATTERN.finditer(content): + metric, value = match.groups() + metrics[metric] = float(value) + except TypeError as e: + raise RuntimeError(f'Failed to parse metrics: {e}') + + return metrics diff --git a/src/pipeline_watchdog/config/__init__.py b/src/pipeline_watchdog/config/__init__.py new file mode 100644 index 0000000..6292ed9 --- /dev/null +++ b/src/pipeline_watchdog/config/__init__.py @@ -0,0 +1 @@ +from .config import Action, FlowConfig, QueueConfig, WatchConfig diff --git a/src/pipeline_watchdog/config/config.py b/src/pipeline_watchdog/config/config.py new file mode 100644 index 0000000..e9b0436 --- /dev/null +++ b/src/pipeline_watchdog/config/config.py @@ -0,0 +1,74 @@ +# This file contains the configuration classes for the pipeline watchdog +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional + + +class Action(Enum): + STOP = 'stop' + RESTART = 'restart' + + +@dataclass +class QueueConfig: + """Configuration to watch a buffer queue.""" + + action: Action + """Action to take when buffer queue is full.""" + + length: int + """Maximum buffer queue length.""" + + cooldown: int + """Interval in seconds to wait after applying the action.""" + + polling_interval: int + """Interval in seconds between buffer queue length checks.""" + + container_labels: List[List[str]] + """List of labels to filter the containers to which the action is applied.""" + + +@dataclass +class FlowConfig: + """Configuration to watch a buffer incoming or outgoing traffic.""" + + action: Action + """Action to take when buffer traffic is idle.""" + + idle: int + """Maximum time in seconds buffer traffic can be idle.""" + + cooldown: int + """Interval in seconds to wait after applying the action.""" + + polling_interval: Optional[int] + """Interval in seconds between buffer traffic checks.""" + + container_labels: List[List[str]] + """List of labels to filter the containers to which the action is applied.""" + + +@dataclass +class WatchConfig: + """Configuration for a single buffer.""" + + buffer: str + """Buffer url to retrieve metrics.""" + + queue: Optional[QueueConfig] + """Queue watch configuration.""" + + egress: Optional[FlowConfig] + """Egress traffic watch configuration.""" + + ingress: Optional[FlowConfig] + """Ingress traffic watch configuration.""" + + +@dataclass +class Config: + """Pipeline watchdog configuration.""" + + watch_configs: List[WatchConfig] + """List of buffer watch configurations.""" diff --git a/src/pipeline_watchdog/config/parser.py b/src/pipeline_watchdog/config/parser.py new file mode 100644 index 0000000..6ba9ad3 --- /dev/null +++ b/src/pipeline_watchdog/config/parser.py @@ -0,0 +1,84 @@ +from omegaconf import DictConfig, ListConfig, OmegaConf +from omegaconf.errors import ConfigKeyError + +from src.pipeline_watchdog.config.config import * +from src.pipeline_watchdog.utils import convert_to_seconds + + +class ConfigParser: + + def __init__(self, config_path: str): + self._config_path = config_path + + @staticmethod + def __parse_labels(labels_list: list) -> list: + labels = [] + for label_dict in labels_list: + if isinstance(label_dict.labels, ListConfig): + labels.append(label_dict.labels) + else: + labels.append([label_dict.labels]) + return labels + + @staticmethod + def __parse_queue_config(queue_config: dict): + if queue_config is None: + return None + + return QueueConfig( + action=Action(queue_config['action']), + length=queue_config['length'], + cooldown=convert_to_seconds(queue_config['cooldown']), + polling_interval=convert_to_seconds(queue_config['polling_interval']), + container_labels=ConfigParser.__parse_labels(queue_config['container']), + ) + + @staticmethod + def __parse_flow_config(flow_config: dict): + if flow_config is None: + return None + + idle = convert_to_seconds(flow_config['idle']) + polling_interval = flow_config.get('polling_interval') + + return FlowConfig( + action=Action(flow_config['action']), + idle=idle, + cooldown=convert_to_seconds(flow_config['cooldown']), + polling_interval=( + convert_to_seconds(polling_interval) if polling_interval else idle + ), + container_labels=ConfigParser.__parse_labels(flow_config['container']), + ) + + @staticmethod + def __parse_watch_config(watch_config: dict): + return WatchConfig( + buffer=watch_config['buffer'], + queue=ConfigParser.__parse_queue_config(watch_config.get('queue')), + egress=ConfigParser.__parse_flow_config(watch_config.get('egress')), + ingress=ConfigParser.__parse_flow_config(watch_config.get('ingress')), + ) + + def parse(self) -> Config: + with open(self._config_path, 'r') as file: + parsed_yaml = OmegaConf.load(file) + watch = ( + parsed_yaml.get('watch') + if isinstance(parsed_yaml, DictConfig) + else None + ) + + if not watch: + raise ValueError( + 'No watch configs found in the config file. Please specify at least one.' + ) + + try: + config = Config([self.__parse_watch_config(w) for w in watch]) + except ConfigKeyError as e: + raise ValueError( + f'Field "{e.key}" must be specified in the watch config.' + ) + + return config diff --git a/src/pipeline_watchdog/config/validator.py b/src/pipeline_watchdog/config/validator.py new file mode 100644 index 0000000..b765ccb --- /dev/null +++ b/src/pipeline_watchdog/config/validator.py @@ -0,0 +1,10 @@ +from src.pipeline_watchdog.config.config import Config + + +def validate(config: Config): + if any( + [not w.queue and not w.ingress and not w.egress for w in config.watch_configs] + ): + raise ValueError( + 'Watch config must include at least one of the following: queue, ingress, or egress.' + ) diff --git a/src/pipeline_watchdog/run.py b/src/pipeline_watchdog/run.py new file mode 100644 index 0000000..cc5913b --- /dev/null +++ b/src/pipeline_watchdog/run.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 + +import asyncio +import logging +import os +import signal +import time +from typing import List + +import aiodocker +from aiodocker import DockerError +from aiodocker.containers import DockerContainer + +from src.pipeline_watchdog.buffer_metrics import get_metrics, parse_metrics +from src.pipeline_watchdog.config import Action, FlowConfig, QueueConfig, WatchConfig +from src.pipeline_watchdog.config.parser import ConfigParser +from src.pipeline_watchdog.config.validator import validate +from src.pipeline_watchdog.utils import init_logging + +LOG_LEVEL = os.environ.get('LOGLEVEL', 'INFO') + +BUFFER_SIZE_METRIC = 'buffer_size' +LAST_SENT_MESSAGE_METRIC = 'last_sent_message' +LAST_RECEIVED_MESSAGE_METRIC = 'last_received_message' + + +init_logging(LOG_LEVEL) +logger = logging.getLogger('PipelineWatchdog') + + +class DockerClient: + + def __init__(self): + self._client = aiodocker.Docker() + + async def get_containers( + self, container_labels: List[List[str]] + ) -> List[DockerContainer]: + containers = [] + for labels in container_labels: + try: + containers += await self._client.containers.list( + all=True, filters={'label': labels} + ) + except DockerError: + raise RuntimeError(f'Failed to list containers with labels {labels}') + + return containers + + @staticmethod + async def restart_container(container: DockerContainer): + try: + await container.restart() + logger.debug('Container %s restarted', container.id) + except DockerError: + logger.error('Failed to restart container %s. Skipping', container.id) + + @staticmethod + async def stop_container(container: DockerContainer): + try: + await container.stop() + logger.debug('Container %s stopped', container.id) + except DockerError: + logger.error('Failed to stop container %s. Skipping', container.id) + + async def close(self): + await self._client.close() + + +async def process_action( + docker_client: DockerClient, action: Action, container_labels: List[List[str]] +): + containers = await docker_client.get_containers(container_labels) + + if not containers: + logger.debug('No containers found with labels %s', container_labels) + return + + if action == Action.STOP: + logger.debug('Stopping containers') + for container in containers: + await docker_client.stop_container(container) + elif action == Action.RESTART: + logger.debug('Restarting containers') + for container in containers: + await docker_client.restart_container(container) + else: + raise RuntimeError(f'Unknown action: {action}') + + +async def watch_queue(docker_client: DockerClient, buffer: str, config: QueueConfig): + await asyncio.sleep(config.cooldown) + + while True: + content = await get_metrics(buffer) + metrics = await parse_metrics(content) + + buffer_size = metrics[BUFFER_SIZE_METRIC] + + if buffer_size > config.length: + logger.debug( + 'Buffer %s is full, processing action %s', buffer, config.action + ) + await process_action(docker_client, config.action, config.container_labels) + await asyncio.sleep(config.cooldown) + else: + await asyncio.sleep(config.polling_interval) + + +async def watch_egress(docker_client: DockerClient, buffer: str, config: FlowConfig): + await asyncio.sleep(config.cooldown) + + while True: + content = await get_metrics(buffer) + metrics = await parse_metrics(content) + + last_sent_message = metrics[LAST_SENT_MESSAGE_METRIC] + now = time.time() + + if now - last_sent_message > config.idle: + logger.debug( + 'Egress flow %s is idle, processing action %s', buffer, config.action + ) + await process_action(docker_client, config.action, config.container_labels) + await asyncio.sleep(config.cooldown) + else: + await asyncio.sleep(config.polling_interval) + + +async def watch_ingress(docker_client: DockerClient, buffer: str, config: FlowConfig): + await asyncio.sleep(config.cooldown) + + while True: + content = await get_metrics(buffer) + metrics = await parse_metrics(content) + + last_received_message = metrics[LAST_RECEIVED_MESSAGE_METRIC] + now = time.time() + + if now - last_received_message > config.idle: + logger.debug( + 'Ingress flow %s is idle, processing action %s', buffer, config.action + ) + await process_action(docker_client, config.action, config.container_labels) + await asyncio.sleep(config.cooldown) + else: + await asyncio.sleep(config.polling_interval) + + +async def watch_buffer(docker_client: DockerClient, config: WatchConfig): + logger.info('Watching buffer metrics %s', config.buffer) + await asyncio.gather( + watch_queue(docker_client, config.buffer, config.queue), + watch_egress(docker_client, config.buffer, config.egress), + watch_ingress(docker_client, config.buffer, config.ingress), + ) + + +def main(): + # To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt) + signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) + + config_file_path = os.environ.get('CONFIG_FILE_PATH') + if not config_file_path: + logger.error( + 'Configuration file path is not provided. Provide the CONFIG_FILE_PATH environment variable' + ) + exit(1) + + parser = ConfigParser(config_file_path) + + try: + config = parser.parse() + validate(config) + except Exception as e: + logger.error('Invalid configuration. %s: %s', type(e).__name__, e) + exit(1) + + docker_client = DockerClient() + + loop = asyncio.get_event_loop() + futures = asyncio.gather( + *[watch_buffer(docker_client, x) for x in config.watch_configs] + ) + try: + loop.run_until_complete(futures) + except KeyboardInterrupt: + logger.error('Shutting down the pipeline watchdog') + finally: + loop.close() + asyncio.run(docker_client.close()) + + +if __name__ == '__main__': + main() diff --git a/src/pipeline_watchdog/utils.py b/src/pipeline_watchdog/utils.py new file mode 100644 index 0000000..20f1c49 --- /dev/null +++ b/src/pipeline_watchdog/utils.py @@ -0,0 +1,22 @@ +import logging +import sys + +seconds_per_unit = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400, 'w': 604800} + + +def init_logging(loglevel: str): + logging.basicConfig( + stream=sys.stdout, + format='%(asctime)s %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=loglevel, + ) + + +def convert_to_seconds(s: str): + seconds = int(s[:-1]) * seconds_per_unit[s[-1]] + + if seconds < 0: + raise ValueError('Invalid input') + + return seconds diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/config/__init__.py b/tests/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/config/test_parser.py b/tests/config/test_parser.py new file mode 100644 index 0000000..d15eb3c --- /dev/null +++ b/tests/config/test_parser.py @@ -0,0 +1,36 @@ +import os + +import pytest + +from src.pipeline_watchdog.config import WatchConfig +from src.pipeline_watchdog.config.parser import ConfigParser + + +def test_parse(config_file_path, watch_config): + os.environ['POLLING_INTERVAL'] = '20s' + + config = ConfigParser(config_file_path).parse() + + assert len(config.watch_configs) == 2 + assert config.watch_configs[0] == watch_config + + # check optional fields + assert config.watch_configs[1] == WatchConfig( + buffer='buffer2:8002', queue=None, egress=None, ingress=None + ) + + +def test_parse_empty(empty_config_file_path): + with pytest.raises( + ValueError, + match='No watch configs found in the config file. Please specify at least one.', + ): + ConfigParser(empty_config_file_path).parse() + + +def test_parse_invalid(invalid_config_file_path): + with pytest.raises( + ValueError, + match='Field ".*" must be specified in the watch config.', + ): + ConfigParser(invalid_config_file_path).parse() diff --git a/tests/config/test_validator.py b/tests/config/test_validator.py new file mode 100644 index 0000000..3de0e2d --- /dev/null +++ b/tests/config/test_validator.py @@ -0,0 +1,24 @@ +import pytest + +from src.pipeline_watchdog.config.validator import validate + + +@pytest.mark.parametrize( + 'config1', + [ + 'config', + 'config_with_queue_only', + 'config_with_ingress_only', + 'config_with_egress_only', + ], +) +def test_validate(request, config1): + validate(request.getfixturevalue(config1)) + + +def test_validate_empty_watch(config_with_invalid_watch_config): + with pytest.raises( + ValueError, + match='Watch config must include at least one of the following: queue, ingress, or egress.', + ): + validate(config_with_invalid_watch_config) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..66d247c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,111 @@ +import copy +import os + +import pytest +import yaml + +from src.pipeline_watchdog.config import Action, FlowConfig, QueueConfig, WatchConfig +from src.pipeline_watchdog.config.config import Config + + +@pytest.fixture(scope='session') +def watch_config() -> WatchConfig: + return WatchConfig( + buffer='buffer1:8000', + queue=QueueConfig( + action=Action.RESTART, + length=18, + cooldown=60, + polling_interval=10, + container_labels=[['label1', 'label2=2'], ['some-label']], + ), + egress=FlowConfig( + action=Action.STOP, + idle=100, + cooldown=60, + polling_interval=20, + container_labels=[['egress-label=egress-value'], ['some-label']], + ), + ingress=FlowConfig( + action=Action.RESTART, + idle=60, + cooldown=30, + polling_interval=60, + container_labels=[['some-label']], + ), + ) + + +@pytest.fixture(scope='session') +def config(watch_config) -> Config: + return Config(watch_configs=[watch_config]) + + +@pytest.fixture(scope='session') +def config_with_queue_only(watch_config) -> Config: + new_watch_config = copy.deepcopy(watch_config) + new_watch_config.ingress = None + new_watch_config.egress = None + + return Config(watch_configs=[new_watch_config]) + + +@pytest.fixture(scope='session') +def config_with_ingress_only(watch_config) -> Config: + new_watch_config = copy.deepcopy(watch_config) + new_watch_config.queue = None + new_watch_config.egress = None + + return Config(watch_configs=[new_watch_config]) + + +@pytest.fixture(scope='session') +def config_with_egress_only(watch_config) -> Config: + new_watch_config = copy.deepcopy(watch_config) + new_watch_config.queue = None + new_watch_config.ingress = None + + return Config(watch_configs=[new_watch_config]) + + +@pytest.fixture(scope='session') +def config_with_invalid_watch_config() -> Config: + return Config(watch_configs=[WatchConfig('buffer:8000', None, None, None)]) + + +@pytest.fixture(scope='session') +def config_file_path(): + return os.path.join(os.path.dirname(__file__), 'test_config.yml') + + +@pytest.fixture( + params=( + '', + 'watch', + 'watch:', + 'watch: []', + 'some-key: some-value', + ) +) +def empty_config_file_path(request, tmpdir): + config_file = tmpdir.join('empty_config.txt') + config_file.write(request.param) + + return str(config_file) + + +@pytest.fixture( + params=( + {'watch': [{'some-key': 'some-value'}]}, + {'watch': [{'buffer': 'buffer1:8000', 'queue': {'action': 'restart'}}]}, + {'watch': [{'buffer': 'buffer1:8000', 'egress': {'action': 'restart'}}]}, + {'watch': [{'buffer': 'buffer1:8000', 'ingress': {'action': 'restart'}}]}, + ) +) +def invalid_config_file_path(request, tmpdir): + config_file = tmpdir.join('empty_config.txt') + + yaml_str = yaml.dump(request.param) + config_file.write(yaml_str) + + return str(config_file) diff --git a/tests/test_buffer_metrics.py b/tests/test_buffer_metrics.py new file mode 100644 index 0000000..09cdebb --- /dev/null +++ b/tests/test_buffer_metrics.py @@ -0,0 +1,81 @@ +from unittest import mock +from unittest.mock import AsyncMock, MagicMock, call + +import pytest +from aiohttp import ClientResponse + +from src.pipeline_watchdog.buffer_metrics import get_metrics, parse_metrics + + +@pytest.mark.asyncio +@mock.patch('aiohttp.ClientSession', new_callable=MagicMock) +async def test_get_metrics(session_mock: MagicMock): + session = session_mock() + response_mock = MagicMock(ClientResponse) + response = response_mock() + + session_in_with = session.__aenter__.return_value + session_in_with.get = response_mock + response_in_with: AsyncMock = response.__aenter__.return_value + response_in_with.text = AsyncMock(return_value='content') + + result = await get_metrics('localhost:8080') + + assert result == 'content' + assert response_mock.call_count == 2 + assert response_mock.call_args_list[0] == call() # initial call in test itself + assert response_mock.call_args_list[1] == call('http://localhost:8080/metrics') + + +@pytest.mark.asyncio +async def test_get_metrics_session_exception(): + with mock.patch('aiohttp.ClientSession', side_effect=RuntimeError('error')): + with pytest.raises(RuntimeError, match='error'): + await get_metrics('localhost:8080') + + +@pytest.mark.asyncio +@mock.patch('aiohttp.ClientSession', new_callable=MagicMock) +async def test_get_metrics_response_exception(session_mock): + with pytest.raises(RuntimeError, match='error'): + session = session_mock() + response_mock = MagicMock(ClientResponse, side_effect=RuntimeError('error')) + + session_in_with = session.__aenter__.return_value + session_in_with.get = response_mock + + await get_metrics('localhost:8080') + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'content, expected', + [ + ('', {}), + ('no metrics', {}), + ( + ''' + # HELP received_messages_total Number of messages received by the adapter + # TYPE received_messages_total counter + received_messages_total{adapter="buffer"} 120.0 1720441634544 + # HELP pushed_messages_total Number of messages pushed to the buffer + # TYPE pushed_messages_total counter + pushed_messages_total{adapter="buffer"} 34.0 1720441634544 + ''', + {'received_messages_total': 120.0, 'pushed_messages_total': 34.0}, + ), + ], +) +async def test_parse_metrics(content, expected): + result = await parse_metrics(content) + + assert result == expected + + +@pytest.mark.asyncio +async def test_parse_metrics_invalid_content_type(): + with pytest.raises( + RuntimeError, + match='Failed to parse metrics: expected string or bytes-like object', + ): + await parse_metrics(123) # type: ignore diff --git a/tests/test_config.yml b/tests/test_config.yml new file mode 100644 index 0000000..78acba6 --- /dev/null +++ b/tests/test_config.yml @@ -0,0 +1,25 @@ +watch: + - buffer: buffer1:8000 + queue: + action: restart + length: 18 + cooldown: 60s + polling_interval: 10s + container: + - labels: [label1, label2=2] + - labels: some-label + egress: + action: stop + cooldown: 60s + idle: 100s + polling_interval: ${oc.env:POLLING_INTERVAL} + container: + - labels: egress-label=egress-value + - labels: some-label + ingress: + action: restart + cooldown: 30s + idle: 60s + container: + - labels: some-label + - buffer: buffer2:8002 diff --git a/tests/test_docker_client.py b/tests/test_docker_client.py new file mode 100644 index 0000000..19c5899 --- /dev/null +++ b/tests/test_docker_client.py @@ -0,0 +1,151 @@ +from unittest.mock import AsyncMock, Mock, call, patch + +import pytest +import pytest_asyncio +from aiodocker import DockerError +from aiodocker.containers import DockerContainer, DockerContainers + +from src.pipeline_watchdog.run import DockerClient + +DOCKER_ERROR = DockerError('status', {'message': 'error'}) +RUNTIME_ERROR = RuntimeError('Test error') + + +@pytest_asyncio.fixture +def docker_mock(): + with patch('aiodocker.Docker', autospec=True) as mock: + yield mock() + + +@pytest.mark.asyncio +async def test_get_containers(docker_mock): + label1 = ['label1'] + label2 = ['label2', 'label3'] + containers_for_label1 = [Mock(), Mock()] + containers_for_label2 = [Mock()] + + containers = Mock( + DockerContainers, + list=AsyncMock(side_effect=[containers_for_label1, containers_for_label2]), + ) + docker_mock.containers = containers + + client = DockerClient() + + result = await client.get_containers([label1, label2]) + + assert result == containers_for_label1 + containers_for_label2 + assert containers.list.call_count == 2 + assert containers.list.call_args_list[0] == call( + all=True, filters={'label': label1} + ) + assert containers.list.call_args_list[1] == call( + all=True, filters={'label': label2} + ) + + +@pytest.mark.asyncio +async def test_get_containers_empty_labels(docker_mock): + containers = Mock(DockerContainers, list=AsyncMock(side_effect=[])) + docker_mock.containers = containers + + client = DockerClient() + + result = await client.get_containers([]) + + assert result == [] + assert containers.list.call_count == 0 + + +@pytest.mark.asyncio +async def test_get_containers_docker_error(docker_mock): + containers = Mock( + DockerContainers, + list=AsyncMock(side_effect=DOCKER_ERROR), + ) + docker_mock.containers = containers + + client = DockerClient() + + with pytest.raises( + RuntimeError, match="Failed to list containers with labels \\['label1'\\]" + ): + await client.get_containers([['label1']]) + + +@pytest.mark.asyncio +async def test_get_containers_exception(docker_mock): + containers = Mock(DockerContainers, list=AsyncMock(side_effect=RUNTIME_ERROR)) + docker_mock.containers = containers + + client = DockerClient() + + with pytest.raises(RUNTIME_ERROR.__class__, match=str(RUNTIME_ERROR)): + await client.get_containers([['label1']]) + + +@pytest.mark.asyncio +async def test_close(docker_mock): + client = DockerClient() + + await client.close() + + docker_mock.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_restart_container(): + container = AsyncMock(DockerContainer) + + await DockerClient.restart_container(container) + + container.restart.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_restart_container_docker_error(): + container = Mock( + DockerContainer, + restart=AsyncMock(side_effect=DOCKER_ERROR), + ) + + await DockerClient.restart_container(container) + + container.restart.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_restart_container_exception(): + container = Mock(DockerContainer, restart=AsyncMock(side_effect=RUNTIME_ERROR)) + + with pytest.raises(RUNTIME_ERROR.__class__, match=str(RUNTIME_ERROR)): + await DockerClient.restart_container(container) + + +@pytest.mark.asyncio +async def test_stop_container(): + container = AsyncMock(DockerContainer) + + await DockerClient.stop_container(container) + + container.stop.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_stop_container_docker_error(): + container = Mock( + DockerContainer, + stop=AsyncMock(side_effect=DOCKER_ERROR), + ) + + await DockerClient.stop_container(container) + + container.stop.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_stop_container_exception(): + container = Mock(DockerContainer, stop=AsyncMock(side_effect=RUNTIME_ERROR)) + + with pytest.raises(RUNTIME_ERROR.__class__, match=str(RUNTIME_ERROR)): + await DockerClient.stop_container(container) diff --git a/tests/test_run.py b/tests/test_run.py new file mode 100644 index 0000000..5ea22bd --- /dev/null +++ b/tests/test_run.py @@ -0,0 +1,474 @@ +import asyncio +import sys +import time +from unittest import mock +from unittest.mock import AsyncMock, call + +import pytest +from aiodocker.containers import DockerContainer + +from src.pipeline_watchdog import run +from src.pipeline_watchdog.config import Action +from src.pipeline_watchdog.run import ( + process_action, + watch_buffer, + watch_egress, + watch_ingress, + watch_queue, +) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.DockerClient', autospec=True) +async def test_process_action_stop(docker_client_mock): + docker_client = docker_client_mock() + docker_container1 = AsyncMock(DockerContainer) + docker_container2 = AsyncMock(DockerContainer) + docker_client.get_containers = mock.AsyncMock( + return_value=[docker_container1, docker_container2] + ) + container_labels = [['label1']] + + await process_action(docker_client, Action.STOP, container_labels) + + docker_client.get_containers.assert_awaited_once_with(container_labels) + docker_client.stop_container.assert_has_awaits( + [call(docker_container1), call(docker_container2)] + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.DockerClient', autospec=True) +async def test_process_action_restart(docker_client_mock): + docker_client = docker_client_mock() + docker_container1 = AsyncMock(DockerContainer) + docker_container2 = AsyncMock(DockerContainer) + docker_client.get_containers = mock.AsyncMock( + return_value=[docker_container1, docker_container2] + ) + container_labels = [['label1']] + + await process_action(docker_client, Action.RESTART, container_labels) + + docker_client.get_containers.assert_awaited_once_with(container_labels) + docker_client.restart_container.assert_has_awaits( + [call(docker_container1), call(docker_container2)] + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.DockerClient', autospec=True) +@pytest.mark.parametrize( + 'action', + [ + Action.STOP, + Action.RESTART, + ], +) +async def test_process_action_empty_containers(docker_client_mock, action): + docker_client = docker_client_mock() + docker_client.get_containers = mock.AsyncMock(return_value=[]) + container_labels = [['label1']] + + await process_action(docker_client, action, container_labels) + + docker_client.get_containers.assert_awaited_once_with(container_labels) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.DockerClient', autospec=True) +async def test_process_action_invalid(docker_client_mock): + docker_client = docker_client_mock() + + with pytest.raises(RuntimeError, match='Unknown action: invalid_action'): + await process_action(docker_client, 'invalid_action', []) # type: ignore + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.watch_ingress') +@mock.patch('src.pipeline_watchdog.run.watch_egress') +@mock.patch('src.pipeline_watchdog.run.watch_queue') +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_buffer( + docker_client_mock, + watch_queue_mock, + watch_egress_mock, + watch_ingress_mock, + watch_config, +): + docker_client = docker_client_mock() + + await watch_buffer(docker_client, watch_config) + watch_queue_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.queue + ) + watch_egress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.egress + ) + watch_ingress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.ingress + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.watch_ingress') +@mock.patch('src.pipeline_watchdog.run.watch_egress') +@mock.patch('src.pipeline_watchdog.run.watch_queue', side_effect=RuntimeError('error')) +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_buffer_watch_queue_failed( + docker_client_mock, + watch_queue_mock, + watch_egress_mock, + watch_ingress_mock, + watch_config, +): + docker_client = docker_client_mock() + + with pytest.raises(RuntimeError, match='error'): + await watch_buffer(docker_client, watch_config) + + watch_queue_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.queue + ) + watch_egress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.egress + ) + watch_ingress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.ingress + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.watch_ingress') +@mock.patch('src.pipeline_watchdog.run.watch_egress', side_effect=RuntimeError('error')) +@mock.patch('src.pipeline_watchdog.run.watch_queue') +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_buffer_watch_egress_failed( + docker_client_mock, + watch_queue_mock, + watch_egress_mock, + watch_ingress_mock, + watch_config, +): + docker_client = docker_client_mock() + + with pytest.raises(RuntimeError, match='error'): + await watch_buffer(docker_client, watch_config) + + watch_queue_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.queue + ) + watch_egress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.egress + ) + watch_ingress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.ingress + ) + + +@pytest.mark.asyncio +@mock.patch( + 'src.pipeline_watchdog.run.watch_ingress', side_effect=RuntimeError('error') +) +@mock.patch('src.pipeline_watchdog.run.watch_egress') +@mock.patch('src.pipeline_watchdog.run.watch_queue') +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_buffer_watch_ingress_failed( + docker_client_mock, + watch_queue_mock, + watch_egress_mock, + watch_ingress_mock, + watch_config, +): + docker_client = docker_client_mock() + + with pytest.raises(RuntimeError, match='error'): + await watch_buffer(docker_client, watch_config) + + watch_queue_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.queue + ) + watch_egress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.egress + ) + watch_ingress_mock.assert_awaited_once_with( + docker_client, watch_config.buffer, watch_config.ingress + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.process_action') +@mock.patch('src.pipeline_watchdog.run.get_metrics', return_value='content') +@mock.patch( + 'src.pipeline_watchdog.run.parse_metrics', return_value={'buffer_size': 999} +) +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_queue( + docker_client_mock, + parse_metrics_mock, + get_metrics_mock, + process_action_mock, + watch_config, +): + docker_client = docker_client_mock() + + with mock.patch( + 'asyncio.sleep', side_effect=[None, asyncio.CancelledError] + ) as sleep_mock: + try: + await watch_queue(docker_client, watch_config.buffer, watch_config.queue) + except asyncio.CancelledError: + sleep_mock.assert_has_awaits( + [call(watch_config.queue.cooldown), call(watch_config.queue.cooldown)] + ) + pass + + get_metrics_mock.assert_awaited_once_with(watch_config.buffer) + parse_metrics_mock.assert_awaited_once_with('content') + process_action_mock.assert_awaited_once_with( + docker_client, watch_config.queue.action, watch_config.queue.container_labels + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.process_action') +@mock.patch('src.pipeline_watchdog.run.get_metrics', return_value='content') +@mock.patch('src.pipeline_watchdog.run.parse_metrics', return_value={'buffer_size': 0}) +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_queue_empty( + docker_client_mock, + parse_metrics_mock, + get_metrics_mock, + process_action_mock, + watch_config, +): + docker_client = docker_client_mock() + + with mock.patch( + 'asyncio.sleep', side_effect=[None, asyncio.CancelledError] + ) as sleep_mock: + try: + await watch_queue(docker_client, watch_config.buffer, watch_config.queue) + except asyncio.CancelledError: + sleep_mock.assert_has_awaits( + [ + call(watch_config.queue.cooldown), + call(watch_config.queue.polling_interval), + ] + ) + pass + + get_metrics_mock.assert_awaited_once_with(watch_config.buffer) + parse_metrics_mock.assert_awaited_once_with('content') + process_action_mock.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.process_action') +@mock.patch('src.pipeline_watchdog.run.get_metrics', return_value='content') +@mock.patch( + 'src.pipeline_watchdog.run.parse_metrics', + return_value={'last_sent_message': time.time() - 999}, +) +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_egress( + docker_client_mock, + parse_metrics_mock, + get_metrics_mock, + process_action_mock, + watch_config, +): + docker_client = docker_client_mock() + + with mock.patch( + 'asyncio.sleep', side_effect=[None, asyncio.CancelledError] + ) as sleep_mock: + try: + await watch_egress(docker_client, watch_config.buffer, watch_config.egress) + except asyncio.CancelledError: + sleep_mock.assert_has_awaits( + [call(watch_config.egress.cooldown), call(watch_config.egress.cooldown)] + ) + pass + + get_metrics_mock.assert_awaited_once_with(watch_config.buffer) + parse_metrics_mock.assert_awaited_once_with('content') + process_action_mock.assert_awaited_once_with( + docker_client, watch_config.egress.action, watch_config.egress.container_labels + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.process_action') +@mock.patch('src.pipeline_watchdog.run.get_metrics', return_value='content') +@mock.patch( + 'src.pipeline_watchdog.run.parse_metrics', + return_value={'last_sent_message': time.time()}, +) +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_egress_message_just_sent( + docker_client_mock, + parse_metrics_mock, + get_metrics_mock, + process_action_mock, + watch_config, +): + docker_client = docker_client_mock() + + with mock.patch( + 'asyncio.sleep', side_effect=[None, asyncio.CancelledError] + ) as sleep_mock: + try: + await watch_egress(docker_client, watch_config.buffer, watch_config.egress) + except asyncio.CancelledError: + sleep_mock.assert_has_awaits( + [ + call(watch_config.egress.cooldown), + call(watch_config.egress.polling_interval), + ] + ) + pass + + get_metrics_mock.assert_awaited_once_with(watch_config.buffer) + parse_metrics_mock.assert_awaited_once_with('content') + process_action_mock.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.process_action') +@mock.patch('src.pipeline_watchdog.run.get_metrics', return_value='content') +@mock.patch( + 'src.pipeline_watchdog.run.parse_metrics', + return_value={'last_received_message': time.time() - 999}, +) +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_ingress( + docker_client_mock, + parse_metrics_mock, + get_metrics_mock, + process_action_mock, + watch_config, +): + docker_client = docker_client_mock() + + with mock.patch( + 'asyncio.sleep', side_effect=[None, asyncio.CancelledError] + ) as sleep_mock: + try: + await watch_ingress( + docker_client, watch_config.buffer, watch_config.ingress + ) + except asyncio.CancelledError: + sleep_mock.assert_has_awaits( + [ + call(watch_config.ingress.cooldown), + call(watch_config.ingress.cooldown), + ] + ) + pass + + get_metrics_mock.assert_awaited_once_with(watch_config.buffer) + parse_metrics_mock.assert_awaited_once_with('content') + process_action_mock.assert_awaited_once_with( + docker_client, + watch_config.ingress.action, + watch_config.ingress.container_labels, + ) + + +@pytest.mark.asyncio +@mock.patch('src.pipeline_watchdog.run.process_action') +@mock.patch('src.pipeline_watchdog.run.get_metrics', return_value='content') +@mock.patch( + 'src.pipeline_watchdog.run.parse_metrics', + return_value={'last_received_message': time.time()}, +) +@mock.patch('src.pipeline_watchdog.run.DockerClient') +async def test_watch_ingress_message_just_received( + docker_client_mock, + parse_metrics_mock, + get_metrics_mock, + process_action_mock, + watch_config, +): + docker_client = docker_client_mock() + + with mock.patch( + 'asyncio.sleep', side_effect=[None, asyncio.CancelledError] + ) as sleep_mock: + try: + await watch_ingress( + docker_client, watch_config.buffer, watch_config.ingress + ) + except asyncio.CancelledError: + sleep_mock.assert_has_awaits( + [ + call(watch_config.ingress.cooldown), + call(watch_config.ingress.polling_interval), + ] + ) + pass + + get_metrics_mock.assert_awaited_once_with(watch_config.buffer) + parse_metrics_mock.assert_awaited_once_with('content') + process_action_mock.assert_not_awaited() + + +@pytest.mark.parametrize('config', [None, '']) +@mock.patch('os.environ.get') +def test_main_no_config_file_path(environ_mock, config): + environ_mock.return_value = config + + with pytest.raises(SystemExit, match='1'): + run.main() + + +@mock.patch('src.pipeline_watchdog.run.validate', side_effect=RuntimeError('error')) +@mock.patch('src.pipeline_watchdog.run.ConfigParser') +@mock.patch('os.environ.get', return_value='config.yml') +def test_main_invalid_config(environ_mock, config_parser_mock, validate_mock): + config_parser = config_parser_mock.return_value + + with pytest.raises(SystemExit, match='1'): + run.main() + + config_parser_mock.assert_called_once_with('config.yml') + config_parser.parse.assert_called_once() + parsed_config = config_parser.parse.return_value + validate_mock.assert_called_once_with(parsed_config) + + +@mock.patch('src.pipeline_watchdog.run.watch_buffer') +@mock.patch('src.pipeline_watchdog.run.DockerClient', autospec=True) +@mock.patch('src.pipeline_watchdog.run.validate') +@mock.patch('src.pipeline_watchdog.run.ConfigParser') +@mock.patch('os.environ.get', return_value='config.yml') +def test_main_keyboard_interrupt( + environ_mock, + config_parser_mock, + validate_mock, + docker_client_mock, + watch_buffer_mock, + config, +): + config_parser = config_parser_mock.return_value + config_parser.parse.return_value = config + + def raise_exception(): + raise KeyboardInterrupt() + + loop = asyncio.new_event_loop() + loop.call_later(1, raise_exception) + asyncio.set_event_loop(loop) + + run.main() + + assert loop.is_closed() + docker_client_mock.return_value.close.assert_awaited_once() + + config_parser_mock.assert_called_once_with('config.yml') + config_parser.parse.assert_called_once() + validate_mock.assert_called_once_with(config) + watch_buffer_mock.assert_awaited_once_with( + docker_client_mock.return_value, config.watch_configs[0] + ) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..2f83ccc --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,49 @@ +import sys +from unittest import mock + +import pytest + +from src.pipeline_watchdog.utils import convert_to_seconds, init_logging + + +@mock.patch('logging.basicConfig') +def test_init_logging(basic_config_mock): + log_level = 'DEBUG' + + init_logging(log_level) + + basic_config_mock.assert_called_once_with( + stream=sys.stdout, + format='%(asctime)s %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=log_level, + ) + + +@pytest.mark.parametrize( + 'string, expected', + [ + ('0s', 0), + ('1s', 1), + ('1m', 60), + ('1h', 3600), + ('1d', 86400), + ('1w', 604800), + ], +) +def test_convert_to_seconds(string, expected): + assert convert_to_seconds(string) == expected + + +@pytest.mark.parametrize( + 'string, expected_error', + [ + ('1x', KeyError), + ('-1s', ValueError), + ('xs', ValueError), + ('xx', ValueError), + ], +) +def test_convert_to_seconds_invalid_input(string, expected_error): + with pytest.raises(expected_error): + convert_to_seconds(string)