Skip to content

Commit

Permalink
feature: support systemd sd_notify protocol (#6)
Browse files Browse the repository at this point in the history
This PR adds support for system sd_notify protocol, supporting otaclient-logger service running with Type=notify. otaclient-logger will wait for 2 seconds before reporting the READY status to systemd for the otaclient-logger server itself is ready.
  • Loading branch information
Bodong-Yang authored Aug 1, 2024
1 parent 0efe1a3 commit f772d83
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 3 deletions.
67 changes: 67 additions & 0 deletions src/otaclient_iot_logging_server/_sd_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A simple implementation of systemd sd_nofity protocol."""


from __future__ import annotations

import logging
import os
import socket

logger = logging.getLogger(__name__)

READY_MSG = "READY=1"
SD_NOTIFY_SOCKET_ENV = "NOTIFY_SOCKET"


def sd_notify_enabled() -> bool:
return bool(os.getenv(SD_NOTIFY_SOCKET_ENV))


def get_notify_socket() -> str | None:
"""Get the notify socket provided by systemd if set.
If the provided socket_path is an abstract socket which starts
with a "@" char, regulate the socket_path by replacing the
"@" char with NULL char and then return the regulated one.
"""
socket_path = os.getenv(SD_NOTIFY_SOCKET_ENV)
if not socket_path:
return

# systemd provide abstract socket to us
if socket_path.startswith("@"):
socket_path = "\0" + socket_path[1:]
return socket_path


def sd_notify(msg: str) -> bool | None:
if not (notify_socket := get_notify_socket()):
return

with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as socket_conn:
try:
socket_conn.connect(notify_socket)
except Exception as e:
logger.warning(f"failed to connect to {notify_socket=}: {e!r}")
return False

try:
socket_conn.sendall(msg.encode())
logger.info(f"sent ready message to {notify_socket=}")
return True
except Exception as e:
logger.warning(f"failed to send ready message to {notify_socket=}: {e!r}")
return False
31 changes: 28 additions & 3 deletions src/otaclient_iot_logging_server/log_proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import annotations

import asyncio
import logging
import time
from http import HTTPStatus
Expand All @@ -24,6 +25,11 @@
from aiohttp.web import Request

from otaclient_iot_logging_server._common import LogMessage, LogsQueue
from otaclient_iot_logging_server._sd_notify import (
READY_MSG,
sd_notify,
sd_notify_enabled,
)
from otaclient_iot_logging_server.configs import server_cfg
from otaclient_iot_logging_server.ecu_info import ecu_info

Expand Down Expand Up @@ -56,7 +62,6 @@ async def logging_post_handler(self, request: Request):
_ecu_id = request.match_info["ecu_id"]
_raw_logging = await request.text()
_allowed_ecus = self._allowed_ecus

# don't allow empty request or unknowned ECUs
# if ECU id is unknown(not listed in ecu_info.yaml), drop this log.
if not _raw_logging or (_allowed_ecus and _ecu_id not in _allowed_ecus):
Expand All @@ -67,7 +72,6 @@ async def logging_post_handler(self, request: Request):
message=_raw_logging,
)
# logger.debug(f"receive log from {_ecu_id}: {_logging_msg}")

try:
self._queue.put_nowait((_ecu_id, _logging_msg))
except Full:
Expand All @@ -77,10 +81,31 @@ async def logging_post_handler(self, request: Request):
return web.Response(status=HTTPStatus.OK)


WAIT_BEFORE_SEND_READY_MSG = 2 # seconds


def launch_server(queue: Queue[tuple[str, LogMessage]]) -> None:
handler = LoggingPostHandler(queue=queue)
app = web.Application()
app.add_routes([web.post(r"/{ecu_id}", handler.logging_post_handler)])

loop = asyncio.new_event_loop()

if sd_notify_enabled():
logger.info(
"otaclient-logger service is configured to send ready msg to systemd, "
f"wait for {WAIT_BEFORE_SEND_READY_MSG} seconds for the server starting up ..."
)

loop.call_later(
WAIT_BEFORE_SEND_READY_MSG,
sd_notify,
READY_MSG,
)
# typing: run_app is a NoReturn method, unless received signal
web.run_app(app, host=server_cfg.LISTEN_ADDRESS, port=server_cfg.LISTEN_PORT) # type: ignore
web.run_app(
app,
host=server_cfg.LISTEN_ADDRESS,
port=server_cfg.LISTEN_PORT,
loop=loop,
) # type: ignore
94 changes: 94 additions & 0 deletions tests/test__sd_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

import socket
from typing import Any

import pytest
import pytest_mock

from otaclient_iot_logging_server import _sd_notify

SD_NOTIFY_MODULE = _sd_notify.__name__


@pytest.fixture
def socket_object_mock(mocker: pytest_mock.MockerFixture):
return mocker.MagicMock(spec=socket.socket)


@pytest.fixture
def socket_conn_mock(mocker: pytest_mock.MockerFixture, socket_object_mock):
class DummySocketObject:

_mock: Any = socket_object_mock

def __new__(cls, _family, _type, *args):
cls._family = _family
cls._type = _type
return object.__new__(cls)

def __enter__(self):
return self._mock

def __exit__(self, *_):
return

mocker.patch(f"{SD_NOTIFY_MODULE}.socket.socket", DummySocketObject)
yield DummySocketObject


@pytest.mark.parametrize(
"input, expected",
(
(socket_path := "/a/normal/socket/path", socket_path),
("@a/abstract/unix/socket", "\0a/abstract/unix/socket"),
),
)
def test_get_notify_socket(input, expected, monkeypatch):
monkeypatch.setenv(_sd_notify.SD_NOTIFY_SOCKET_ENV, input)
assert _sd_notify.get_notify_socket() == expected


@pytest.mark.parametrize(
"input, expected",
(
("/a/normal/socket/path", True),
("@a/abstract/unix/socket", True),
(None, False),
),
)
def test_sd_notify_enabled(input, expected, monkeypatch):
if input:
monkeypatch.setenv(_sd_notify.SD_NOTIFY_SOCKET_ENV, input)
assert _sd_notify.sd_notify_enabled() == expected


def test_sd_notify(socket_conn_mock, socket_object_mock, monkeypatch):
NOTIFY_SOCKET = "any_non_empty_value"
monkeypatch.setenv(_sd_notify.SD_NOTIFY_SOCKET_ENV, NOTIFY_SOCKET)

# ------ execute ------ #
_sd_notify.sd_notify(_sd_notify.READY_MSG)

# ------ check result ------ #
dummy_socket_class = socket_conn_mock
assert dummy_socket_class._family == socket.AF_UNIX
assert dummy_socket_class._type == socket.SOCK_DGRAM

socket_object_mock.connect.assert_called_once_with(NOTIFY_SOCKET)
socket_object_mock.sendall.assert_called_once_with(_sd_notify.READY_MSG.encode())

2 comments on commit f772d83

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
src/otaclient_iot_logging_server
   __init__.py30100% 
   __main__.py18194%49
   _common.py150100% 
   _log_setting.py271062%63, 65–66, 68–69, 73–74, 77–78, 80
   _sd_notify.py33875%42, 52, 57–59, 65–67
   _utils.py53296%73, 137
   _version.py80100% 
   aws_iot_logger.py1005545%65–67, 69–72, 75, 81–87, 90–92, 95–100, 104–107, 111–113, 116–118, 121–126, 139, 145–147, 149–153, 157, 197, 204–207
   boto3_session.py35974%50, 58–59, 61, 78–79, 83, 85, 93
   config_file_monitor.py43686%64–66, 83–85
   configs.py44197%74
   ecu_info.py37197%75
   greengrass_config.py96594%155, 266–269
   log_proxy_server.py441272%52, 77–79, 88–90, 92, 94–95, 100, 106
TOTAL55611080% 

Tests Skipped Failures Errors Time
44 0 💤 0 ❌ 0 🔥 12.045s ⏱️

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
src/otaclient_iot_logging_server
   __init__.py30100% 
   __main__.py18194%49
   _common.py150100% 
   _log_setting.py271062%63, 65–66, 68–69, 73–74, 77–78, 80
   _sd_notify.py33875%42, 52, 57–59, 65–67
   _utils.py54296%73, 137
   _version.py80100% 
   aws_iot_logger.py1015545%65–67, 69–72, 75, 81–87, 90–92, 95–100, 104–107, 111–113, 116–118, 121–126, 139, 145–147, 149–153, 157, 197, 204–207
   boto3_session.py35974%50, 58–59, 61, 78–79, 83, 85, 93
   config_file_monitor.py44686%64–66, 83–85
   configs.py45197%74
   ecu_info.py37197%75
   greengrass_config.py97594%155, 266–269
   log_proxy_server.py441272%52, 77–79, 88–90, 92, 94–95, 100, 106
TOTAL56111080% 

Tests Skipped Failures Errors Time
44 0 💤 0 ❌ 0 🔥 11.942s ⏱️

Please sign in to comment.