Skip to content

Commit

Permalink
Reactive: Add sigterm handler and process termination. (#332)
Browse files Browse the repository at this point in the history
* remove logrotate config for runner logs

* add sigterm handler

* kill unnecessary processes

* use list comprehension

* just exit, kombu will requeue

* restore default handler at the end
  • Loading branch information
cbartz authored Jul 31, 2024
1 parent 7c95111 commit 7453734
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 101 deletions.
7 changes: 3 additions & 4 deletions src-docs/logrotate.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ Logrotate setup and configuration.
---------------
- **LOG_ROTATE_TIMER_SYSTEMD_SERVICE**
- **METRICS_LOGROTATE_CONFIG**
- **RUNNER_LOGROTATE_CONFIG**
- **REACTIVE_LOGROTATE_CONFIG**

---

<a href="../src/logrotate.py#L85"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../src/logrotate.py#L77"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>function</kbd> `setup`

Expand All @@ -33,7 +32,7 @@ Enable and configure logrotate.

---

<a href="../src/logrotate.py#L22"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../src/logrotate.py#L21"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>class</kbd> `LogrotateFrequency`
The frequency of log rotation.
Expand All @@ -53,7 +52,7 @@ The frequency of log rotation.

---

<a href="../src/logrotate.py#L38"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../src/logrotate.py#L37"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>class</kbd> `LogrotateConfig`
Configuration for logrotate.
Expand Down
6 changes: 3 additions & 3 deletions src-docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ Package for common metrics-related code.
- **events**: # Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

- **runner_logs**: # Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

- **storage**: # Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

Expand All @@ -25,5 +22,8 @@ Package for common metrics-related code.
- **github**: # Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

- **runner_logs**: # Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.



27 changes: 24 additions & 3 deletions src-docs/reactive.consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Module responsible for consuming jobs from the message queue.

---

<a href="../src/reactive/consumer.py#L32"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../src/reactive/consumer.py#L36"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>function</kbd> `consume`

Expand Down Expand Up @@ -36,7 +36,28 @@ Log the job details and acknowledge the message. If the job details are invalid,

---

<a href="../src/reactive/consumer.py#L16"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../reactive/consumer/signal_handler#L66"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>function</kbd> `signal_handler`

```python
signal_handler(signal_code: Signals) → Generator[NoneType, NoneType, NoneType]
```

Set a signal handler and after the context, restore the default handler.

The signal handler exits the process.



**Args:**

- <b>`signal_code`</b>: The signal code to handle.


---

<a href="../src/reactive/consumer.py#L20"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>class</kbd> `JobDetails`
A class to translate the payload.
Expand All @@ -54,7 +75,7 @@ A class to translate the payload.

---

<a href="../src/reactive/consumer.py#L28"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../src/reactive/consumer.py#L32"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>class</kbd> `JobError`
Raised when a job error occurs.
Expand Down
10 changes: 5 additions & 5 deletions src-docs/reactive.runner_manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ Module for managing reactive runners.
- **MQ_URI_ENV_VAR**
- **QUEUE_NAME_ENV_VAR**
- **REACTIVE_RUNNER_SCRIPT_FILE**
- **REACTIVE_RUNNER_TIMEOUT_INTERVAL**
- **PYTHON_BIN**
- **ACTIVE_SCRIPTS_COMMAND_LINE**
- **TIMEOUT_BIN**
- **REACTIVE_RUNNER_CMD_LINE_PREFIX**
- **PID_CMD_COLUMN_WIDTH**
- **PIDS_COMMAND_LINE**
- **UBUNTU_USER**

---

<a href="../src/reactive/runner_manager.py#L31"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../src/reactive/runner_manager.py#L39"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>function</kbd> `reconcile`

Expand Down Expand Up @@ -46,7 +46,7 @@ Raises a ReactiveRunnerError if the runner fails to spawn.

---

<a href="../src/reactive/runner_manager.py#L27"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../src/reactive/runner_manager.py#L35"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

## <kbd>class</kbd> `ReactiveRunnerError`
Raised when a reactive runner error occurs.
Expand Down
9 changes: 0 additions & 9 deletions src/logrotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from errors import LogrotateSetupError
from metrics.events import METRICS_LOG_PATH
from metrics.runner_logs import RUNNER_LOGS_DIR_PATH
from reactive.runner_manager import REACTIVE_RUNNER_LOG_DIR

LOG_ROTATE_TIMER_SYSTEMD_SERVICE = "logrotate.timer"
Expand Down Expand Up @@ -64,13 +63,6 @@ class LogrotateConfig(BaseModel):
create=True,
)

RUNNER_LOGROTATE_CONFIG = LogrotateConfig(
name="github-runner-logs",
log_path_glob_pattern=f"{RUNNER_LOGS_DIR_PATH}/*",
rotate=0,
create=False,
notifempty=False,
)

REACTIVE_LOGROTATE_CONFIG = LogrotateConfig(
name="reactive-runner",
Expand Down Expand Up @@ -118,7 +110,6 @@ def _configure() -> None:
"""Configure logrotate."""
_write_config(REACTIVE_LOGROTATE_CONFIG)
_write_config(METRICS_LOGROTATE_CONFIG)
_write_config(RUNNER_LOGROTATE_CONFIG)


def _write_config(logrotate_config: LogrotateConfig) -> None:
Expand Down
80 changes: 67 additions & 13 deletions src/reactive/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
# See LICENSE file for licensing details.

"""Module responsible for consuming jobs from the message queue."""
import contextlib
import logging
import signal
import sys
from contextlib import closing
from typing import cast
from types import FrameType
from typing import Generator, cast

from kombu import Connection
from kombu.simple import SimpleQueue
Expand Down Expand Up @@ -44,15 +48,65 @@ def consume(mongodb_uri: str, queue_name: str) -> None:
"""
with Connection(mongodb_uri) as conn:
with closing(SimpleQueue(conn, queue_name)) as simple_queue:
msg = simple_queue.get(block=True)
try:
job_details = cast(JobDetails, JobDetails.parse_raw(msg.payload))
except ValidationError as exc:
msg.reject(requeue=True)
raise JobError(f"Invalid job details: {msg.payload}") from exc
logger.info(
"Received job with labels %s and run_url %s",
job_details.labels,
job_details.run_url,
)
msg.ack()
with signal_handler(signal.SIGTERM):
msg = simple_queue.get(block=True)
try:
job_details = cast(JobDetails, JobDetails.parse_raw(msg.payload))
except ValidationError as exc:
msg.reject(requeue=True)
raise JobError(f"Invalid job details: {msg.payload}") from exc
logger.info(
"Received job with labels %s and run_url %s",
job_details.labels,
job_details.run_url,
)
msg.ack()


@contextlib.contextmanager
def signal_handler(signal_code: signal.Signals) -> Generator[None, None, None]:
"""Set a signal handler and after the context, restore the default handler.
The signal handler exits the process.
Args:
signal_code: The signal code to handle.
"""
_set_signal_handler(signal_code)
try:
yield
finally:
_restore_signal_handler(signal_code)


def _set_signal_handler(signal_code: signal.Signals) -> None:
"""Set a signal handler which exits the process.
Args:
signal_code: The signal code to handle.
"""

def sigterm_handler(signal_code: int, _: FrameType | None) -> None:
"""Handle a signal.
Call sys.exit with the signal code. Kombu should automatically
requeue unacknowledged messages.
Args:
signal_code: The signal code to handle.
"""
print(
f"Signal '{signal.strsignal(signal_code)}' received. Will terminate.", file=sys.stderr
)
sys.exit(signal_code)

signal.signal(signal_code, sigterm_handler)


def _restore_signal_handler(signal_code: signal.Signals) -> None:
"""Restore the default signal handler.
Args:
signal_code: The signal code to restore.
"""
signal.signal(signal_code, signal.SIG_DFL)
63 changes: 38 additions & 25 deletions src/reactive/runner_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

"""Module for managing reactive runners."""
import logging
import os
import shutil
import signal

# All commands run by subprocess are secure.
import subprocess # nosec
Expand All @@ -17,10 +19,16 @@
QUEUE_NAME_ENV_VAR = "QUEUE_NAME"
REACTIVE_RUNNER_LOG_DIR = Path("/var/log/reactive_runner")
REACTIVE_RUNNER_SCRIPT_FILE = "scripts/reactive_runner.py"
REACTIVE_RUNNER_TIMEOUT_INTERVAL = "1h"
PYTHON_BIN = "/usr/bin/python3"
ACTIVE_SCRIPTS_COMMAND_LINE = ["ps", "axo", "cmd", "--no-headers"]
TIMEOUT_BIN = "/usr/bin/timeout"
REACTIVE_RUNNER_CMD_LINE_PREFIX = f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}"
PID_CMD_COLUMN_WIDTH = len(REACTIVE_RUNNER_CMD_LINE_PREFIX)
PIDS_COMMAND_LINE = [
"ps",
"axo",
f"cmd:{PID_CMD_COLUMN_WIDTH},pid",
"--no-headers",
"--sort=-start_time",
]
UBUNTU_USER = "ubuntu"


Expand All @@ -41,45 +49,52 @@ def reconcile(quantity: int, mq_uri: str, queue_name: str) -> int:
Returns:
The number of reactive runner processes spawned.
"""
current_quantity = _get_current_quantity()
pids = _get_pids()
current_quantity = len(pids)
logger.info("Current quantity of reactive runner processes: %s", current_quantity)
delta = quantity - current_quantity
if delta > 0:
logger.info("Will spawn %d new reactive runner processes", delta)
logger.info("Will spawn %d new reactive runner process(es)", delta)
_setup_logging_for_processes()
for _ in range(delta):
_spawn_runner(mq_uri=mq_uri, queue_name=queue_name)
elif delta < 0:
logger.info(
"%d reactive runner processes are running. "
"Will skip spawning. Additional processes should terminate after %s.",
current_quantity,
REACTIVE_RUNNER_TIMEOUT_INTERVAL,
)
logger.info("Will kill %d process(es).", -delta)
for pid in pids[:-delta]:
logger.info("Killing reactive runner process with pid %s", pid)
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
# There can be a race condition that the process has already terminated.
# We just ignore and log the fact.
logger.info(
"Failed to kill process with pid %s. Process might have terminated it self.",
pid,
)
else:
logger.info("No changes to number of reactive runner processes needed.")

return max(delta, 0)
return delta


def _get_current_quantity() -> int:
"""Determine the current quantity of reactive runners.
def _get_pids() -> list[int]:
"""Get the PIDs of the reactive runners processes.
Returns:
The number of reactive runners.
The PIDs of the reactive runner processes sorted by start time in descending order.
Raises:
ReactiveRunnerError: If the number of reactive runners cannot be determined
ReactiveRunnerError: If the command to get the PIDs fails
"""
result = secure_run_subprocess(cmd=ACTIVE_SCRIPTS_COMMAND_LINE)
result = secure_run_subprocess(cmd=PIDS_COMMAND_LINE)
if result.returncode != 0:
raise ReactiveRunnerError("Failed to get list of processes")
commands = result.stdout.decode().split("\n") if result.stdout else []
return sum(
1
for command in commands
if command.startswith(f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}")
)

return [
int(line.rstrip().rsplit(maxsplit=1)[-1])
for line in result.stdout.decode().split("\n")
if line.startswith(REACTIVE_RUNNER_CMD_LINE_PREFIX)
]


def _setup_logging_for_processes() -> None:
Expand All @@ -105,8 +120,6 @@ def _spawn_runner(mq_uri: str, queue_name: str) -> None:
# We trust the command.
command = " ".join(
[
TIMEOUT_BIN,
REACTIVE_RUNNER_TIMEOUT_INTERVAL,
PYTHON_BIN,
REACTIVE_RUNNER_SCRIPT_FILE,
">>",
Expand Down
Loading

0 comments on commit 7453734

Please sign in to comment.