diff --git a/.github/workflows/test_pr_merge.yaml b/.github/workflows/test_pr_merge.yaml index 5b771d6..934db18 100644 --- a/.github/workflows/test_pr_merge.yaml +++ b/.github/workflows/test_pr_merge.yaml @@ -10,11 +10,17 @@ on: jobs: tests: - runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.6", "3.7", "3.8", "3.9", "3.10", " 3.11", "3.12"] + runs-on: ubuntu-20.04 steps: - - name: Checkout repo content - uses: actions/checkout@v2 - - name: install dependencies + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies run: ./scripts/ci_tests.sh install - - name: run tests - run: ./scripts/ci_tests.sh test \ No newline at end of file + - name: Run tests + run: ./scripts/ci_tests.sh test diff --git a/Makefile b/Makefile index 7c9b62d..81d01c3 100644 --- a/Makefile +++ b/Makefile @@ -20,14 +20,11 @@ devenv: .venv ## create a python virtual environment with tools to dev, run and @echo "To activate the virtual environment, run 'source $) threshold, the corresponding manager will report busy. +- `ACTIVITY_MONITOR_BUSY_THRESHOLD_CPU_PERCENT` [percentage(%)], default=`1000`: used cpu usage monitor +- `ACTIVITY_MONITOR_BUSY_THRESHOLD_DISK_READ_BPS` [bytes], default=`1099511627776`: used by disk usage monitor +- `ACTIVITY_MONITOR_BUSY_THRESHOLD_DISK_WRITE_BPS` [bytes], default=`1099511627776`: used by disk usage monitor +- `ACTIVITY_MONITOR_BUSY_THRESHOLD_NETWORK_RECEIVE_BPS` [bytes], default=`1099511627776`: used by network usage monitor +- `ACTIVITY_MONITOR_BUSY_THRESHOLD_NETWORK_SENT__BPS` [bytes], default=`1099511627776`: used by network usage monitor + +##### Other: +- `ACTIVITY_MONITOR_JUPYTER_NOTEBOOK_BASE_URL` [str] default=`http://localhost:8888`: endpoint where the jupyter notebook is exposed +- `ACTIVITY_MONITOR_JUPYTER_NOTEBOOK_KERNEL_CHECK_INTERVAL_S` [float] default=`5`: used by the jupyter kernel monitor to update it's metrics +- `ACTIVITY_MONITOR_MONITOR_INTERVAL_S` [float] default=`1`: all other monitors us this interval to update their metrics +- `ACTIVITY_MONITOR_LISTEN_PORT` [int] default=`19597`: port on which the http server will be exposed + + + +# Exposed API + + +### `GET /activity` + +Used by oSPARC top retrieve the status of the service if it's active or not + +```json +{"seconds_inactive": 0} +``` + +```bash +curl http://localhost:19597/activity +``` + +### `GET /debug` + +Used for debugging and not used by oSPARC + +```json +{ + "kernel_monitor": {"is_busy": true}, + "cpu_usage": {"is_busy": false, "total": 0}, + "disk_usage": {"is_busy": false, "total": {"bytes_read_per_second": 0, "bytes_write_per_second": 0}}, + "network_usage": {"is_busy": false, "total": {"bytes_received_per_second": 345452, "bytes_sent_per_second": 343809}} +} +``` + +```bash +curl http://localhost:19597/debug +``` + +### `GET /metrics` + +Exposes Prometheus metrics relative to the running processes. + +``` +# HELP network_bytes_received_total Total number of bytes received across all network interfaces. +# TYPE network_bytes_received_total counter +network_bytes_received_total 23434790 + +# HELP network_bytes_sent_total Total number of bytes sent across all network interfaces. +# TYPE network_bytes_sent_total counter +network_bytes_sent_total 22893843 +``` + +```bash +curl http://localhost:19597/metrics +``` \ No newline at end of file diff --git a/requirements/test.in b/requirements/test.in index 202a1b1..d7dc480 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -1,12 +1,10 @@ -# from jupyter +# required packages psutil -tornado # testing pytest -pytest-asyncio pytest-cov pytest-mock requests diff --git a/requirements/test.txt b/requirements/test.txt deleted file mode 100644 index 43fae43..0000000 --- a/requirements/test.txt +++ /dev/null @@ -1,54 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.10 -# by the following command: -# -# pip-compile --output-file=requirements/test.txt requirements/test.in -# -certifi==2024.2.2 - # via requests -charset-normalizer==3.3.2 - # via requests -coverage[toml]==7.4.4 - # via pytest-cov -exceptiongroup==1.2.0 - # via pytest -idna==3.6 - # via requests -iniconfig==2.0.0 - # via pytest -packaging==24.0 - # via pytest -pluggy==1.4.0 - # via pytest -psutil==5.9.8 - # via -r requirements/test.in -pytest==8.1.1 - # via - # -r requirements/test.in - # pytest-asyncio - # pytest-cov - # pytest-mock -pytest-asyncio==0.23.6 - # via -r requirements/test.in -pytest-cov==4.1.0 - # via -r requirements/test.in -pytest-mock==3.12.0 - # via -r requirements/test.in -requests==2.31.0 - # via - # -r requirements/test.in - # requests-mock -requests-mock==1.11.0 - # via -r requirements/test.in -six==1.16.0 - # via requests-mock -tenacity==8.2.3 - # via -r requirements/test.in -tomli==2.0.1 - # via - # coverage - # pytest -tornado==6.4 - # via -r requirements/test.in -urllib3==2.2.1 - # via requests diff --git a/scripts/ci_tests.sh b/scripts/ci_tests.sh index b5626c2..e253230 100755 --- a/scripts/ci_tests.sh +++ b/scripts/ci_tests.sh @@ -11,7 +11,6 @@ install() { make .venv source .venv/bin/activate make install-test - pip list --verbose } test() { diff --git a/scripts/install.sh b/scripts/install.sh index d4f2a29..4f7386e 100644 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -15,7 +15,7 @@ IFS=$'\n\t' # Function to display usage information usage() { echo "Usage: $0 " - echo "Example: $0 v.0.0.9-debug" + echo "Example: $0 v.0.0.1" exit 1 } @@ -24,22 +24,23 @@ if [ $# -ne 1 ]; then usage fi -TAG=$1 -URL="https://github.com/ITISFoundation/service-activity-monitor/releases/download/$TAG/release_archive_$TAG.zip" # Download and install +TAG=$1 +URL="https://github.com/ITISFoundation/service-activity-monitor/releases/download/$TAG/release_archive_$TAG.zip" echo "Downloading release $TAG..." curl -sSL -o /tmp/release.zip "$URL" -echo "Extracting files..." +echo "Installing..." + +# python scripts mkdir -p /usr/local/bin/service-monitor unzip -q /tmp/release.zip -d /usr/local/bin/service-monitor +# requirements +pip install psutil -echo "Installing..." -# Here you can write your installation steps, for now let's just echo the installation is complete echo "Installation complete." # Cleanup rm /tmp/release.zip - echo "Done!" diff --git a/src/activity.py b/src/activity.py index c8c7989..83058bd 100644 --- a/src/activity.py +++ b/src/activity.py @@ -1,4 +1,14 @@ +import os + import requests -r = requests.get("http://localhost:19597") -print(r.text) +LISTEN_PORT: int = int(os.environ.get("ACTIVITY_MONITOR_LISTEN_PORT", 19597)) + + +def main(): + response = requests.get(f"http://localhost:{LISTEN_PORT}/activity") + print(response.text) + + +if __name__ == "__main__": + main() diff --git a/src/activity_monitor.py b/src/activity_monitor.py index 7b763ac..c0a6f8d 100755 --- a/src/activity_monitor.py +++ b/src/activity_monitor.py @@ -3,6 +3,7 @@ import psutil import requests import time +import os from abc import abstractmethod from concurrent.futures import ThreadPoolExecutor, as_completed @@ -10,36 +11,113 @@ from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler from threading import Thread -from typing import Final - +from typing import Any, Dict, List, Tuple, Set, Union + + +_TB: int = 1024 * 1024 * 1024 * 1024 +_ENV_VAR_PREFIX: str = "ACTIVITY_MONITOR" + +# NOTE: using high thresholds to make service by default +# considered inactive. +# If the service owner does not change these, by lowering +# them to an adequate value the service will always be shut +# down as soon as the inactivity period is detected. +_THRESHOLD_PREFIX: str = f"{_ENV_VAR_PREFIX}_BUSY_THRESHOLD" +BUSY_USAGE_THRESHOLD_CPU: float = os.environ.get( + f"{_THRESHOLD_PREFIX}_CPU_PERCENT", 1000 +) +BUSY_USAGE_THRESHOLD_DISK_READ: int = os.environ.get( + f"{_THRESHOLD_PREFIX}_DISK_READ_BPS", 1 * _TB +) +BUSY_USAGE_THRESHOLD_DISK_WRITE: int = os.environ.get( + f"{_THRESHOLD_PREFIX}_DISK_WRITE_BPS", 1 * _TB +) +BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED: int = os.environ.get( + f"{_THRESHOLD_PREFIX}_NETWORK_RECEIVE_BPS", 1 * _TB +) +BUSY_USAGE_THRESHOLD_NETWORK_SENT: int = os.environ.get( + f"{_THRESHOLD_PREFIX}_NETWORK_SENT__BPS", 1 * _TB +) + +# NOTE: set the following flags to disable a specific monitor +DISABLE_JUPYTER_KERNEL_MONITOR: bool = ( + os.environ.get(f"{_ENV_VAR_PREFIX}_DISABLE_JUPYTER_KERNEL_MONITOR", None) + is not None +) +DISABLE_CPU_USAGE_MONITOR: bool = ( + os.environ.get(f"{_ENV_VAR_PREFIX}_DISABLE_CPU_USAGE_MONITOR", None) is not None +) +DISABLE_DISK_USAGE_MONITOR: bool = ( + os.environ.get(f"{_ENV_VAR_PREFIX}_DISABLE_DISK_USAGE_MONITOR", None) is not None +) +DISABLE_NETWORK_USAGE_MONITOR: bool = ( + os.environ.get(f"{_ENV_VAR_PREFIX}_DISABLE_NETWORK_USAGE_MONITOR", None) is not None +) + +# NOTE: Other configuration options +JUPYTER_NOTEBOOK_BASE_URL: str = os.environ.get( + f"{_ENV_VAR_PREFIX}_JUPYTER_NOTEBOOK_BASE_URL", "http://localhost:8888" +) +JUPYTER_NOTEBOOK_KERNEL_CHECK_INTERVAL_S: float = float( + os.environ.get(f"{_ENV_VAR_PREFIX}_JUPYTER_NOTEBOOK_KERNEL_CHECK_INTERVAL_S", 5) +) +MONITOR_INTERVAL_S: float = float( + os.environ.get(f"{_ENV_VAR_PREFIX}_MONITOR_INTERVAL_S", 1) +) +LISTEN_PORT: int = int(os.environ.get(f"{_ENV_VAR_PREFIX}_LISTEN_PORT", 19597)) + +# Internals +_THREAD_EXECUTOR_WORKERS: int = 10 _logger = logging.getLogger(__name__) -LISTEN_PORT: Final[int] = 19597 +############### Utils + + +_METRICS_COUNTER_TEMPLATE: str = """ +# HELP {name} {help} +# TYPE {name} counter +{name} {value} +""" + -KERNEL_CHECK_INTERVAL_S: Final[float] = 5 -CHECK_INTERVAL_S: Final[float] = 1 -THREAD_EXECUTOR_WORKERS: Final[int] = 10 +MetricEntry = Dict[str, Union[str, int]] -_KB: Final[int] = 1024 -BUSY_USAGE_THRESHOLD_CPU: Final[float] = 0.5 # percent in range [0, 100] -BUSY_USAGE_THRESHOLD_DISK_READ: Final[int] = 0 -BUSY_USAGE_THRESHOLD_DISK_WRITE: Final[int] = 0 -BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED: Final[int] = 1 * _KB -BUSY_USAGE_THRESHOLD_NETWORK_SENT: Final[int] = 1 * _KB +class MetricsManager: + def __init__(self) -> None: + self._metrics: Dict[str, MetricEntry] = {} + + def register_metric( + self, name: str, *, help: str, initial_value: Union[int, float] + ) -> None: + self._metrics[name] = {"help": help, "value": initial_value} + + def inc_metric(self, name: str, value: Union[int, float]) -> None: + self._metrics[name]["value"] += value + + def format_metrics(self) -> str: + result = "" + + for name, metric_entry in self._metrics.items(): + entry = _METRICS_COUNTER_TEMPLATE.format( + name=name, help=metric_entry["help"], value=metric_entry["value"] + ) + result += f"{entry}" + + return result -# Utilities class AbstractIsBusyMonitor: - def __init__(self, poll_interval: float) -> None: + def __init__(self, poll_interval: float, metrics: MetricsManager) -> None: self._poll_interval: float = poll_interval self._keep_running: bool = True self._thread: Thread | None = None + self.metrics = metrics self.is_busy: bool = True - self.thread_executor = ThreadPoolExecutor(max_workers=THREAD_EXECUTOR_WORKERS) + self.thread_executor = ThreadPoolExecutor(max_workers=_THREAD_EXECUTOR_WORKERS) @abstractmethod def _check_if_busy(self) -> bool: @@ -50,6 +128,14 @@ def _check_if_busy(self) -> bool: bool: True if considered busy """ + @abstractmethod + def get_debug_entry(self) -> Dict[str, Any]: + """Information about the current internal state to be exported + + Returns: + dict[str, Any]: json serializable data + """ + def _worker(self) -> None: while self._keep_running: try: @@ -80,14 +166,14 @@ def __exit__(self, exc_type, exc_value, traceback): self.stop() -def __get_children_processes_recursive(pid) -> list[psutil.Process]: +def __get_children_processes_recursive(pid) -> List[psutil.Process]: try: return psutil.Process(pid).children(recursive=True) except psutil.NoSuchProcess: return [] -def _get_sibling_processes() -> list[psutil.Process]: +def _get_sibling_processes() -> List[psutil.Process]: # Returns the CPU usage of all processes except this one. # ASSUMPTIONS: # - `CURRENT_PROC` is a child of root process @@ -105,19 +191,18 @@ def _get_sibling_processes() -> list[psutil.Process]: return [c for c in all_children if c.pid != current_process.pid] -# Monitors +############### Monitors class JupyterKernelMonitor(AbstractIsBusyMonitor): - BASE_URL = "http://localhost:8888" - HEADERS = {"accept": "application/json"} - - def __init__(self, poll_interval: float) -> None: - super().__init__(poll_interval=poll_interval) + def __init__(self, poll_interval: float, metrics: MetricsManager) -> None: + super().__init__(poll_interval=poll_interval, metrics=metrics) self.are_kernels_busy: bool = False def _get(self, path: str) -> dict: - r = requests.get(f"{self.BASE_URL}{path}", headers=self.HEADERS) + r = requests.get( + f"{JUPYTER_NOTEBOOK_BASE_URL}{path}", headers={"accept": "application/json"} + ) return r.json() def _update_kernels_activity(self) -> None: @@ -138,6 +223,9 @@ def _check_if_busy(self) -> bool: self._update_kernels_activity() return self.are_kernels_busy + def get_debug_entry(self) -> Dict[str, Any]: + return {"kernel_monitor": {"is_busy": self.is_busy}} + ProcessID = int TimeSeconds = float @@ -149,8 +237,10 @@ class CPUUsageMonitor(AbstractIsBusyMonitor): and averages over 1 second. """ - def __init__(self, poll_interval: float, *, busy_threshold: float): - super().__init__(poll_interval=poll_interval) + def __init__( + self, poll_interval: float, metrics: MetricsManager, *, busy_threshold: float + ): + super().__init__(poll_interval=poll_interval, metrics=metrics) self.busy_threshold = busy_threshold # snapshot @@ -162,13 +252,13 @@ def __init__(self, poll_interval: float, *, busy_threshold: float): @staticmethod def _sample_cpu_usage( process: psutil.Process, - ) -> tuple[ProcessID, tuple[TimeSeconds, PercentCPU]]: + ) -> Tuple[ProcessID, Tuple[TimeSeconds, PercentCPU]]: """returns: tuple[pid, tuple[time, percent_cpu_usage]]""" return (process.pid, (time.time(), process.cpu_percent())) def _sample_total_cpu_usage( self, - ) -> dict[ProcessID, tuple[TimeSeconds, PercentCPU]]: + ) -> Dict[ProcessID, Tuple[TimeSeconds, PercentCPU]]: futures = [ self.thread_executor.submit(self._sample_cpu_usage, p) for p in _get_sibling_processes() @@ -177,7 +267,7 @@ def _sample_total_cpu_usage( @staticmethod def _get_cpu_over_1_second( - last: tuple[TimeSeconds, PercentCPU], current: tuple[TimeSeconds, PercentCPU] + last: Tuple[TimeSeconds, PercentCPU], current: Tuple[TimeSeconds, PercentCPU] ) -> float: interval = current[0] - last[0] measured_cpu_in_interval = current[1] @@ -205,6 +295,11 @@ def _check_if_busy(self) -> bool: self._update_total_cpu_usage() return self.total_cpu_usage > self.busy_threshold + def get_debug_entry(self) -> Dict[str, Any]: + return { + "cpu_usage": {"is_busy": self.is_busy, "total": self.total_cpu_usage}, + } + BytesRead = int BytesWrite = int @@ -214,11 +309,12 @@ class DiskUsageMonitor(AbstractIsBusyMonitor): def __init__( self, poll_interval: float, + metrics: MetricsManager, *, read_usage_threshold: int, write_usage_threshold: int, ): - super().__init__(poll_interval=poll_interval) + super().__init__(poll_interval=poll_interval, metrics=metrics) self.read_usage_threshold = read_usage_threshold self.write_usage_threshold = write_usage_threshold @@ -232,13 +328,13 @@ def __init__( @staticmethod def _sample_disk_usage( process: psutil.Process, - ) -> tuple[ProcessID, tuple[TimeSeconds, BytesRead, BytesWrite]]: + ) -> Tuple[ProcessID, Tuple[TimeSeconds, BytesRead, BytesWrite]]: counters = process.io_counters() return (process.pid, (time.time(), counters.read_bytes, counters.write_bytes)) def _sample_total_disk_usage( self, - ) -> dict[ProcessID, tuple[TimeSeconds, BytesRead, BytesWrite]]: + ) -> Dict[ProcessID, Tuple[TimeSeconds, BytesRead, BytesWrite]]: futures = [ self.thread_executor.submit(self._sample_disk_usage, p) for p in _get_sibling_processes() @@ -247,9 +343,9 @@ def _sample_total_disk_usage( @staticmethod def _get_bytes_over_one_second( - last: tuple[TimeSeconds, BytesRead, BytesWrite], - current: tuple[TimeSeconds, BytesRead, BytesWrite], - ) -> tuple[BytesRead, BytesWrite]: + last: Tuple[TimeSeconds, BytesRead, BytesWrite], + current: Tuple[TimeSeconds, BytesRead, BytesWrite], + ) -> Tuple[BytesRead, BytesWrite]: interval = current[0] - last[0] measured_bytes_read_in_interval = current[1] - last[1] measured_bytes_write_in_interval = current[2] - last[2] @@ -288,6 +384,17 @@ def _check_if_busy(self) -> bool: or self.total_bytes_write > self.write_usage_threshold ) + def get_debug_entry(self) -> Dict[str, Any]: + return { + "disk_usage": { + "is_busy": self.is_busy, + "total": { + "bytes_read_per_second": self.total_bytes_read, + "bytes_write_per_second": self.total_bytes_write, + }, + } + } + InterfaceName = str BytesReceived = int @@ -295,18 +402,19 @@ def _check_if_busy(self) -> bool: class NetworkUsageMonitor(AbstractIsBusyMonitor): - _EXCLUDE_INTERFACES: set[InterfaceName] = { + _EXCLUDE_INTERFACES: Set[InterfaceName] = { "lo", } def __init__( self, poll_interval: float, + metrics: MetricsManager, *, received_usage_threshold: int, sent_usage_threshold: int, ): - super().__init__(poll_interval=poll_interval) + super().__init__(poll_interval=poll_interval, metrics=metrics) self.received_usage_threshold = received_usage_threshold self.sent_usage_threshold = sent_usage_threshold @@ -316,9 +424,20 @@ def __init__( self.bytes_received: BytesReceived = 0 self.bytes_sent: BytesSent = 0 + self.metrics.register_metric( + "network_bytes_received_total", + help="Total number of bytes received across all network interfaces.", + initial_value=0, + ) + self.metrics.register_metric( + "network_bytes_sent_total", + help="Total number of bytes sent across all network interfaces.", + initial_value=0, + ) + def _sample_total_network_usage( self, - ) -> tuple[TimeSeconds, BytesReceived, BytesSent]: + ) -> Tuple[TimeSeconds, BytesReceived, BytesSent]: net_io_counters = psutil.net_io_counters(pernic=True) total_bytes_received: int = 0 @@ -334,9 +453,9 @@ def _sample_total_network_usage( @staticmethod def _get_bytes_over_one_second( - last: tuple[TimeSeconds, BytesReceived, BytesSent], - current: tuple[TimeSeconds, BytesReceived, BytesSent], - ) -> tuple[BytesReceived, BytesSent]: + last: Tuple[TimeSeconds, BytesReceived, BytesSent], + current: Tuple[TimeSeconds, BytesReceived, BytesSent], + ) -> Tuple[BytesReceived, BytesSent]: interval = current[0] - last[0] measured_bytes_received_in_interval = current[1] - last[1] measured_bytes_sent_in_interval = current[2] - last[2] @@ -360,6 +479,9 @@ def _update_total_network_usage(self) -> None: self.bytes_received = bytes_received self.bytes_sent = bytes_sent + self.metrics.inc_metric("network_bytes_received_total", bytes_received) + self.metrics.inc_metric("network_bytes_sent_total", bytes_sent) + def _check_if_busy(self) -> bool: self._update_total_network_usage() return ( @@ -367,6 +489,17 @@ def _check_if_busy(self) -> bool: or self.bytes_sent > self.sent_usage_threshold ) + def get_debug_entry(self) -> Dict[str, Any]: + return { + "network_usage": { + "is_busy": self.is_busy, + "total": { + "bytes_received_per_second": self.bytes_received, + "bytes_sent_per_second": self.bytes_sent, + }, + } + } + class ActivityManager: def __init__(self, interval: float) -> None: @@ -376,29 +509,45 @@ def __init__(self, interval: float) -> None: self.interval = interval self.last_idle: datetime | None = None - self.jupyter_kernel_monitor = JupyterKernelMonitor(KERNEL_CHECK_INTERVAL_S) - self.cpu_usage_monitor = CPUUsageMonitor( - CHECK_INTERVAL_S, - busy_threshold=BUSY_USAGE_THRESHOLD_CPU, - ) - self.disk_usage_monitor = DiskUsageMonitor( - CHECK_INTERVAL_S, - read_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_READ, - write_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_WRITE, - ) - self.network_monitor = NetworkUsageMonitor( - CHECK_INTERVAL_S, - received_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED, - sent_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_SENT, - ) + self._monitors: list[AbstractIsBusyMonitor] = [] + + self.metrics = MetricsManager() + + if not DISABLE_JUPYTER_KERNEL_MONITOR: + self._monitors.append( + JupyterKernelMonitor( + JUPYTER_NOTEBOOK_KERNEL_CHECK_INTERVAL_S, self.metrics + ) + ) + if not DISABLE_CPU_USAGE_MONITOR: + self._monitors.append( + CPUUsageMonitor( + MONITOR_INTERVAL_S, + self.metrics, + busy_threshold=BUSY_USAGE_THRESHOLD_CPU, + ) + ) + if not DISABLE_DISK_USAGE_MONITOR: + self._monitors.append( + DiskUsageMonitor( + MONITOR_INTERVAL_S, + self.metrics, + read_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_READ, + write_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_WRITE, + ) + ) + if not DISABLE_NETWORK_USAGE_MONITOR: + self._monitors.append( + NetworkUsageMonitor( + MONITOR_INTERVAL_S, + self.metrics, + received_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED, + sent_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_SENT, + ) + ) def check(self): - is_busy = ( - self.jupyter_kernel_monitor.is_busy - or self.cpu_usage_monitor.is_busy - or self.disk_usage_monitor.is_busy - or self.network_monitor.is_busy - ) + is_busy = any(x.is_busy for x in self._monitors) if is_busy: self.last_idle = None @@ -413,6 +562,15 @@ def get_idle_seconds(self) -> float: idle_seconds = (datetime.utcnow() - self.last_idle).total_seconds() return idle_seconds if idle_seconds > 0 else 0 + def get_debug(self) -> Dict[str, Any]: + merged_dict: dict[str, Any] = {} + for x in self._monitors: + merged_dict.update(x.get_debug_entry()) + return merged_dict + + def get_metrics(self) -> str: + return self.metrics.format_metrics() + def _worker(self) -> None: while self._keep_running: with suppress(Exception): @@ -420,10 +578,8 @@ def _worker(self) -> None: time.sleep(self.interval) def start(self) -> None: - self.jupyter_kernel_monitor.start() - self.cpu_usage_monitor.start() - self.disk_usage_monitor.start() - self.network_monitor.start() + for monitor in self._monitors: + monitor.start() self._thread = Thread( target=self._worker, @@ -433,42 +589,14 @@ def start(self) -> None: self._thread.start() def stop(self) -> None: - self.jupyter_kernel_monitor.stop() - self.cpu_usage_monitor.stop() - self.disk_usage_monitor.stop() - self.network_monitor.stop() + for monitor in self._monitors: + monitor.stop() self._keep_running = False self._thread.join() -def _get_response_debug(activity_manager: ActivityManager) -> dict: - return { - "seconds_inactive": activity_manager.get_idle_seconds(), - "cpu_usage": { - "is_busy": activity_manager.cpu_usage_monitor.is_busy, - "total": activity_manager.cpu_usage_monitor.total_cpu_usage, - }, - "disk_usage": { - "is_busy": activity_manager.disk_usage_monitor.is_busy, - "total": { - "bytes_read_per_second": activity_manager.disk_usage_monitor.total_bytes_read, - "bytes_write_per_second": activity_manager.disk_usage_monitor.total_bytes_write, - }, - }, - "network_usage": { - "is_busy": activity_manager.network_monitor.is_busy, - "total": { - "bytes_received_per_second": activity_manager.network_monitor.bytes_received, - "bytes_sent_per_second": activity_manager.network_monitor.bytes_sent, - }, - }, - "kernel_monitor": {"is_busy": activity_manager.jupyter_kernel_monitor.is_busy}, - } - - -def _get_response_root(activity_manager: ActivityManager) -> dict: - return {"seconds_inactive": activity_manager.get_idle_seconds()} +############### Http Server class ServerState: @@ -481,33 +609,43 @@ def __init__(self, server_address, RequestHandlerClass, state): super().__init__(server_address, RequestHandlerClass) -class JSONRequestHandler(BaseHTTPRequestHandler): - def _send_response(self, code: int, data: dict) -> None: +class MainRequestHandler(BaseHTTPRequestHandler): + def _send_json(self, code: int, data: Any) -> None: self.send_response(code) self.send_header("Content-type", "application/json") self.end_headers() self.wfile.write(json.dumps(data).encode("utf-8")) - def do_GET(self): - state = self.server.state + def _send_text(self, code: int, text: str) -> None: + self.send_response(code) + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(text.encode("utf-8")) - if self.path == "/": # The root endpoint - self._send_response(200, _get_response_root(state.activity_manager)) - elif self.path == "/debug": # The debug endpoint - self._send_response(200, _get_response_debug(state.activity_manager)) - else: # Handle case where the endpoint is not found - self._send_response( - 404, _get_response_debug({"error": "Resource not found"}) + @property + def activity_manager(self) -> ActivityManager: + return self.server.state.activity_manager + + def do_GET(self): + if self.path == "/activity": + self._send_json( + 200, {"seconds_inactive": self.activity_manager.get_idle_seconds()} ) + elif self.path == "/debug": + self._send_json(200, self.activity_manager.get_debug()) + elif self.path == "/metrics": + self._send_text(200, self.activity_manager.get_metrics()) + else: # Handle case where the endpoint is not found + self._send_json(404, {"error": "Resource not found"}) def make_server(port: int) -> HTTPServerWithState: state = ServerState() - state.activity_manager = ActivityManager(CHECK_INTERVAL_S) + state.activity_manager = ActivityManager(MONITOR_INTERVAL_S) state.activity_manager.start() server_address = ("", port) # Listen on all interfaces, port 8000 - return HTTPServerWithState(server_address, JSONRequestHandler, state) + return HTTPServerWithState(server_address, MainRequestHandler, state) def main(): diff --git a/tests/_import_utils.py b/tests/_import_utils.py index 0358140..e975136 100644 --- a/tests/_import_utils.py +++ b/tests/_import_utils.py @@ -10,5 +10,3 @@ def allow_imports() -> None: path = (_CURRENT_DIR / "..." / ".." / ".." / "src").absolute().resolve() assert path.exists() sys.path.append(f"{path}") - - import activity_monitor diff --git a/tests/conftest.py b/tests/conftest.py index 78984a4..d9c3de0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,17 +1,30 @@ import ctypes import pytest import socket +import json import threading import time +import requests +import requests_mock from concurrent.futures import ThreadPoolExecutor, wait from multiprocessing import Array, Process from tempfile import NamedTemporaryFile +from tenacity import Retrying +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed +from typing import Callable, Iterable, TYPE_CHECKING -from typing import Callable, Final, Iterable +if TYPE_CHECKING: + from ..docker import activity_monitor +else: + from _import_utils import allow_imports + allow_imports() + import activity_monitor -_LOCAL_LISTEN_PORT: Final[int] = 12345 + +_LOCAL_LISTEN_PORT: int = 12345 class _ListenSocketServer: @@ -133,3 +146,48 @@ def _(*, network: bool, cpu: bool, disk: bool) -> _ActivityGenerator: for instance in created: instance.stop() + + +@pytest.fixture +def mock_jupyter_kernel_monitor(are_kernels_busy: bool) -> Iterable[None]: + with requests_mock.Mocker(real_http=True) as m: + m.get("http://localhost:8888/api/kernels", text=json.dumps([{"id": "atest1"}])) + m.get( + "http://localhost:8888/api/kernels/atest1", + text=json.dumps( + {"execution_state": "running" if are_kernels_busy else "idle"} + ), + ) + yield + + +@pytest.fixture +def server_url() -> str: + return f"http://localhost:{activity_monitor.LISTEN_PORT}" + + +@pytest.fixture +def http_server(mock_jupyter_kernel_monitor: None, server_url: str) -> None: + server = activity_monitor.make_server(activity_monitor.LISTEN_PORT) + + def _run_server_worker() -> None: + server.serve_forever() + + thread = threading.Thread(target=_run_server_worker, daemon=True) + thread.start() + + # ensure server is running + for attempt in Retrying( + stop=stop_after_delay(3), wait=wait_fixed(0.1), reraise=True + ): + with attempt: + result = requests.get(f"{server_url}/activity", timeout=1) + assert result.status_code == 200, result.text + + yield None + + server.shutdown() + server.server_close() + + with pytest.raises(requests.exceptions.RequestException): + requests.get(f"{server_url}/activity", timeout=1) diff --git a/tests/test_activity.py b/tests/test_activity.py new file mode 100644 index 0000000..7e0c313 --- /dev/null +++ b/tests/test_activity.py @@ -0,0 +1,23 @@ +import pytest +import json +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from ..docker import activity +else: + from _import_utils import allow_imports + + allow_imports() + import activity + + +@pytest.fixture +def are_kernels_busy() -> bool: + return True + + +def test_activity(http_server: None, capfd: pytest.CaptureFixture): + activity.main() + capture_result = capfd.readouterr() + assert json.loads(capture_result.out) == {"seconds_inactive": 0} diff --git a/tests/test_activity_monitor.py b/tests/test_activity_monitor.py index 61962e2..9b49507 100644 --- a/tests/test_activity_monitor.py +++ b/tests/test_activity_monitor.py @@ -1,16 +1,11 @@ -import asyncio -import json import psutil import pytest -import pytest_asyncio import requests -import requests_mock -import threading import time -from typing import Callable, Final, Iterable, TYPE_CHECKING +from typing import Callable, List, Dict, TYPE_CHECKING from pytest_mock import MockFixture -from tenacity import AsyncRetrying +from tenacity import Retrying from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed from conftest import _ActivityGenerator @@ -24,14 +19,12 @@ allow_imports() import activity_monitor -pytestmark = pytest.mark.asyncio - @pytest.fixture def mock__get_sibling_processes( mocker: MockFixture, -) -> Callable[[list[int]], list[psutil.Process]]: - def _get_processes(pids: list[int]) -> list[psutil.Process]: +) -> Callable[[List[int]], List[psutil.Process]]: + def _get_processes(pids: List[int]) -> List[psutil.Process]: results = [] for pid in pids: proc = psutil.Process(pid) @@ -39,7 +32,7 @@ def _get_processes(pids: list[int]) -> list[psutil.Process]: results.append(proc) return results - def _(pids: list[int]) -> None: + def _(pids: List[int]) -> None: mocker.patch( "activity_monitor._get_sibling_processes", return_value=_get_processes(pids) ) @@ -47,16 +40,24 @@ def _(pids: list[int]) -> None: return _ -async def test_cpu_usage_monitor_not_busy( +@pytest.fixture +def metrics_manager() -> activity_monitor.MetricsManager: + return activity_monitor.MetricsManager() + + +def test_cpu_usage_monitor_not_busy( socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], + metrics_manager: activity_monitor.MetricsManager, ): activity_generator = create_activity_generator(network=False, cpu=False, disk=False) mock__get_sibling_processes([activity_generator.get_pid()]) - with activity_monitor.CPUUsageMonitor(1, busy_threshold=5) as cpu_usage_monitor: - async for attempt in AsyncRetrying( + with activity_monitor.CPUUsageMonitor( + 1, metrics_manager, busy_threshold=5 + ) as cpu_usage_monitor: + for attempt in Retrying( stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True ): with attempt: @@ -64,35 +65,39 @@ async def test_cpu_usage_monitor_not_busy( assert cpu_usage_monitor.is_busy is False -async def test_cpu_usage_monitor_still_busy( +def test_cpu_usage_monitor_still_busy( socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], + metrics_manager: activity_monitor.MetricsManager, ): activity_generator = create_activity_generator(network=False, cpu=True, disk=False) mock__get_sibling_processes([activity_generator.get_pid()]) - with activity_monitor.CPUUsageMonitor(0.5, busy_threshold=5) as cpu_usage_monitor: + with activity_monitor.CPUUsageMonitor( + 0.5, metrics_manager, busy_threshold=5 + ) as cpu_usage_monitor: # wait for monitor to trigger - await asyncio.sleep(1) + time.sleep(1) # must still result busy assert cpu_usage_monitor.total_cpu_usage > 0 assert cpu_usage_monitor.is_busy is True -async def test_disk_usage_monitor_not_busy( +def test_disk_usage_monitor_not_busy( socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], + metrics_manager: activity_monitor.MetricsManager, ): activity_generator = create_activity_generator(network=False, cpu=False, disk=False) mock__get_sibling_processes([activity_generator.get_pid()]) with activity_monitor.DiskUsageMonitor( - 0.5, read_usage_threshold=0, write_usage_threshold=0 + 0.5, metrics_manager, read_usage_threshold=0, write_usage_threshold=0 ) as disk_usage_monitor: - async for attempt in AsyncRetrying( + for attempt in Retrying( stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True ): with attempt: @@ -103,19 +108,20 @@ async def test_disk_usage_monitor_not_busy( assert disk_usage_monitor.is_busy is False -async def test_disk_usage_monitor_still_busy( +def test_disk_usage_monitor_still_busy( socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], + metrics_manager: activity_monitor.MetricsManager, ): activity_generator = create_activity_generator(network=False, cpu=False, disk=True) mock__get_sibling_processes([activity_generator.get_pid()]) with activity_monitor.DiskUsageMonitor( - 0.5, read_usage_threshold=0, write_usage_threshold=0 + 0.5, metrics_manager, read_usage_threshold=0, write_usage_threshold=0 ) as disk_usage_monitor: # wait for monitor to trigger - await asyncio.sleep(1) + time.sleep(1) write_bytes = disk_usage_monitor.total_bytes_write # NOTE: due to os disk cache reading is not reliable not testing it assert write_bytes > 0 @@ -132,19 +138,20 @@ def mock_no_network_activity(mocker: MockFixture) -> None: ) -async def test_network_usage_monitor_not_busy( +def test_network_usage_monitor_not_busy( mock_no_network_activity: None, socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], + metrics_manager: activity_monitor.MetricsManager, ): activity_generator = create_activity_generator(network=False, cpu=False, disk=False) mock__get_sibling_processes([activity_generator.get_pid()]) with activity_monitor.NetworkUsageMonitor( - 0.5, received_usage_threshold=0, sent_usage_threshold=0 + 0.5, metrics_manager, received_usage_threshold=0, sent_usage_threshold=0 ) as network_usage_monitor: - async for attempt in AsyncRetrying( + for attempt in Retrying( stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True ): with attempt: @@ -159,105 +166,55 @@ def mock_network_monitor_exclude_interfaces(mocker: MockFixture) -> None: assert activity_monitor.NetworkUsageMonitor._EXCLUDE_INTERFACES == set() -async def test_network_usage_monitor_still_busy( +def test_network_usage_monitor_still_busy( mock_network_monitor_exclude_interfaces: None, socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], + metrics_manager: activity_monitor.MetricsManager, ): activity_generator = create_activity_generator(network=True, cpu=False, disk=False) mock__get_sibling_processes([activity_generator.get_pid()]) with activity_monitor.NetworkUsageMonitor( - 0.5, received_usage_threshold=0, sent_usage_threshold=0 + 0.5, metrics_manager, received_usage_threshold=0, sent_usage_threshold=0 ) as network_usage_monitor: # wait for monitor to trigger - await asyncio.sleep(1) + time.sleep(1) assert network_usage_monitor.bytes_received > 0 assert network_usage_monitor.bytes_sent > 0 assert network_usage_monitor.is_busy is True -@pytest.fixture -def mock_jupyter_kernel_monitor(are_kernels_busy: bool) -> Iterable[None]: - with requests_mock.Mocker(real_http=True) as m: - m.get("http://localhost:8888/api/kernels", text=json.dumps([{"id": "atest1"}])) - m.get( - "http://localhost:8888/api/kernels/atest1", - text=json.dumps( - {"execution_state": "running" if are_kernels_busy else "idle"} - ), - ) - yield - - @pytest.mark.parametrize("are_kernels_busy", [True, False]) -async def test_jupyter_kernel_monitor( - mock_jupyter_kernel_monitor: None, are_kernels_busy: bool +def test_jupyter_kernel_monitor( + mock_jupyter_kernel_monitor: None, + are_kernels_busy: bool, + metrics_manager: activity_monitor.MetricsManager, ): - kernel_monitor = activity_monitor.JupyterKernelMonitor(1) + kernel_monitor = activity_monitor.JupyterKernelMonitor(1, metrics_manager) kernel_monitor._update_kernels_activity() assert kernel_monitor.are_kernels_busy is are_kernels_busy -@pytest_asyncio.fixture -async def server_url() -> str: - return f"http://localhost:{activity_monitor.LISTEN_PORT}" - - -@pytest_asyncio.fixture -async def http_server(mock_jupyter_kernel_monitor: None, server_url: str) -> None: - server = activity_monitor.make_server(activity_monitor.LISTEN_PORT) - - def _run_server_worker() -> None: - server.serve_forever() - - thread = threading.Thread(target=_run_server_worker, daemon=True) - thread.start() - - # ensure server is running - async for attempt in AsyncRetrying( - stop=stop_after_delay(3), wait=wait_fixed(0.1), reraise=True - ): - with attempt: - result = requests.get(f"{server_url}/", timeout=1) - assert result.status_code == 200, result.text - - yield None - - server.shutdown() - server.server_close() - - with pytest.raises(requests.exceptions.RequestException): - requests.get(f"{server_url}/", timeout=1) - - @pytest.mark.parametrize("are_kernels_busy", [False]) -async def test_http_server_ok(http_server: None, server_url: str): - result = requests.get(f"{server_url}/", timeout=5) +def test_http_server_ok(http_server: None, server_url: str): + result = requests.get(f"{server_url}/activity", timeout=1) assert result.status_code == 200 -_BIG_THRESHOLD: Final[int] = int(1e10) - - @pytest.fixture def mock_activity_manager_config(mocker: MockFixture) -> None: - mocker.patch("activity_monitor.CHECK_INTERVAL_S", 1) - mocker.patch("activity_monitor.KERNEL_CHECK_INTERVAL_S", 1) - - mocker.patch( - "activity_monitor.BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED", _BIG_THRESHOLD - ) - mocker.patch("activity_monitor.BUSY_USAGE_THRESHOLD_NETWORK_SENT", _BIG_THRESHOLD) + mocker.patch("activity_monitor.MONITOR_INTERVAL_S", 1) + mocker.patch("activity_monitor.JUPYTER_NOTEBOOK_KERNEL_CHECK_INTERVAL_S", 1) @pytest.mark.parametrize("are_kernels_busy", [False]) -async def test_activity_monitor_becomes_not_busy( +def test_activity_monitor_becomes_not_busy( mock_activity_manager_config: None, socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], http_server: None, server_url: str, @@ -265,12 +222,12 @@ async def test_activity_monitor_becomes_not_busy( activity_generator = create_activity_generator(network=False, cpu=False, disk=False) mock__get_sibling_processes([activity_generator.get_pid()]) - async for attempt in AsyncRetrying( + for attempt in Retrying( stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True ): with attempt: # check that all become not busy - result = requests.get(f"{server_url}/debug", timeout=5) + result = requests.get(f"{server_url}/debug", timeout=1) assert result.status_code == 200 debug_response = result.json() assert debug_response["cpu_usage"]["is_busy"] is False @@ -278,7 +235,39 @@ async def test_activity_monitor_becomes_not_busy( assert debug_response["kernel_monitor"]["is_busy"] is False assert debug_response["network_usage"]["is_busy"] is False - result = requests.get(f"{server_url}/", timeout=2) + result = requests.get(f"{server_url}/activity", timeout=1) assert result.status_code == 200 response = result.json() assert response["seconds_inactive"] > 0 + + +@pytest.mark.parametrize("are_kernels_busy", [False]) +def test_metrics_endpoint( + mock_activity_manager_config: None, + socket_server: None, + mock__get_sibling_processes: Callable[[List[int]], List[psutil.Process]], + create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], + http_server: None, + server_url: str, +): + activity_generator = create_activity_generator(network=False, cpu=False, disk=False) + mock__get_sibling_processes([activity_generator.get_pid()]) + + result = requests.get(f"{server_url}/metrics", timeout=1) + assert result.status_code == 200 + assert result.text.count("network_bytes_received_total") == 3 + assert result.text.count("network_bytes_sent_total") == 3 + + +def test_metric_manager(metrics_manager: activity_monitor.MetricsManager): + metrics_manager.register_metric("metric_name", help="metric_help", initial_value=0) + + total_count = 0 + for i in range(10): + total_count += i + + metrics_manager.inc_metric("metric_name", i) + formatted_metrics = metrics_manager.format_metrics() + assert "# HELP metric_name metric_help" in formatted_metrics + assert "# TYPE metric_name counter" in formatted_metrics + assert f"metric_name {total_count}" in formatted_metrics diff --git a/tests/test_scripts_cupling.py b/tests/test_scripts_cupling.py new file mode 100644 index 0000000..ab40af7 --- /dev/null +++ b/tests/test_scripts_cupling.py @@ -0,0 +1,25 @@ +import pytest +from importlib import reload + +from typing import TYPE_CHECKING + +if not TYPE_CHECKING: + from _import_utils import allow_imports + + allow_imports() + + +def test_same_listen_port(monkeypatch: pytest.MonkeyPatch): + import activity + import activity_monitor + + assert activity_monitor.LISTEN_PORT == activity.LISTEN_PORT + + mocked_port = 314 + monkeypatch.setenv("ACTIVITY_MONITOR_LISTEN_PORT", mocked_port) + + reload(activity) + reload(activity_monitor) + + assert activity_monitor.LISTEN_PORT == mocked_port + assert activity.LISTEN_PORT == mocked_port