diff --git a/proto/otaclient_iot_logging_server_v1.proto b/proto/otaclient_iot_logging_server_v1.proto index 9e35fd5..fb0b279 100644 --- a/proto/otaclient_iot_logging_server_v1.proto +++ b/proto/otaclient_iot_logging_server_v1.proto @@ -38,8 +38,9 @@ enum LogLevel { enum ErrorCode { NO_FAILURE = 0; - SERVER_ERROR = 1; + SERVER_QUEUE_FULL = 1; NOT_ALLOWED_ECU_ID = 2; + NO_MESSAGE = 3; } message PutLogRequest { diff --git a/src/otaclient_iot_logging_server/v1/api_stub.py b/src/otaclient_iot_logging_server/v1/api_stub.py index 88fe9b2..503023c 100644 --- a/src/otaclient_iot_logging_server/v1/api_stub.py +++ b/src/otaclient_iot_logging_server/v1/api_stub.py @@ -29,5 +29,5 @@ def __init__(self, otaclient_iot_logging_server_stub: Any): self._stub = otaclient_iot_logging_server_stub async def PutLog(self, request: pb2.PutLogRequest, context) -> pb2.PutLogResponse: - response = await self._stub.put_log(types.UpdateRequest.convert(request)) + response = await self._stub.put_log(types.PutLogRequest.convert(request)) return response.export_pb() diff --git a/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py b/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py index 9a2c790..41ca817 100644 --- a/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py +++ b/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py @@ -13,7 +13,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"y\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x11\n\ttimestamp\x18\x03 \x01(\x04\x12\x18\n\x05level\x18\x04 \x01(\x0e\x32\t.LogLevel\x12\x0f\n\x07message\x18\x05 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\x08LogLevel\x12\x0e\n\nUNSPECIFIC\x10\x00\x12\t\n\x05TRACE\x10\x01\x12\t\n\x05\x44\x45\x42UG\x10\x02\x12\x08\n\x04INFO\x10\x03\x12\x08\n\x04WARN\x10\x04\x12\t\n\x05\x45RROR\x10\x05\x12\t\n\x05\x46\x41TAL\x10\x06*E\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x10\n\x0cSERVER_ERROR\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x32I\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x62\x06proto3' + b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"y\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x11\n\ttimestamp\x18\x03 \x01(\x04\x12\x18\n\x05level\x18\x04 \x01(\x0e\x32\t.LogLevel\x12\x0f\n\x07message\x18\x05 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\x08LogLevel\x12\x0e\n\nUNSPECIFIC\x10\x00\x12\t\n\x05TRACE\x10\x01\x12\t\n\x05\x44\x45\x42UG\x10\x02\x12\x08\n\x04INFO\x10\x03\x12\x08\n\x04WARN\x10\x04\x12\t\n\x05\x45RROR\x10\x05\x12\t\n\x05\x46\x41TAL\x10\x06*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32I\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x62\x06proto3' ) _globals = globals() @@ -31,11 +31,11 @@ _globals["_LOGLEVEL"]._serialized_start = 294 _globals["_LOGLEVEL"]._serialized_end = 384 _globals["_ERRORCODE"]._serialized_start = 386 - _globals["_ERRORCODE"]._serialized_end = 455 + _globals["_ERRORCODE"]._serialized_end = 476 _globals["_PUTLOGREQUEST"]._serialized_start = 77 _globals["_PUTLOGREQUEST"]._serialized_end = 198 _globals["_PUTLOGRESPONSE"]._serialized_start = 200 _globals["_PUTLOGRESPONSE"]._serialized_end = 259 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 457 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 530 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 478 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 551 # @@protoc_insertion_point(module_scope) diff --git a/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi b/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi index 566bad8..bddccd3 100644 --- a/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi +++ b/src/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi @@ -23,8 +23,9 @@ class LogLevel(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = [] NO_FAILURE: _ClassVar[ErrorCode] - SERVER_ERROR: _ClassVar[ErrorCode] + SERVER_QUEUE_FULL: _ClassVar[ErrorCode] NOT_ALLOWED_ECU_ID: _ClassVar[ErrorCode] + NO_MESSAGE: _ClassVar[ErrorCode] LOG: LogType METRICS: LogType @@ -36,8 +37,9 @@ WARN: LogLevel ERROR: LogLevel FATAL: LogLevel NO_FAILURE: ErrorCode -SERVER_ERROR: ErrorCode +SERVER_QUEUE_FULL: ErrorCode NOT_ALLOWED_ECU_ID: ErrorCode +NO_MESSAGE: ErrorCode class PutLogRequest(_message.Message): __slots__ = ["ecu_id", "log_type", "timestamp", "level", "message"] diff --git a/src/otaclient_iot_logging_server/v1/servicer.py b/src/otaclient_iot_logging_server/v1/servicer.py index 23795f7..ab0e187 100644 --- a/src/otaclient_iot_logging_server/v1/servicer.py +++ b/src/otaclient_iot_logging_server/v1/servicer.py @@ -60,10 +60,13 @@ async def put_log(self, request: PutLogRequest) -> PutLogResponse: _timestamp = request.timestamp _message = request.message _allowed_ecus = self._allowed_ecus - # don't allow empty request or unknowned ECUs + # don't allow empty request + if not _message: + return PutLogResponse(code=ErrorCode.NO_MESSAGE) + # don't allow unknowned ECUs # if ECU id is unknown(not listed in ecu_info.yaml), drop this log. - if not _message or (_allowed_ecus and _ecu_id not in _allowed_ecus): - return PutLogResponse(ErrorCode.NOT_ALLOWED_ECU_ID) + if _allowed_ecus and _ecu_id not in _allowed_ecus: + return PutLogResponse(code=ErrorCode.NOT_ALLOWED_ECU_ID) _logging_msg = LogMessage( timestamp=_timestamp * 1000, # milliseconds @@ -74,6 +77,6 @@ async def put_log(self, request: PutLogRequest) -> PutLogResponse: self._queue.put_nowait((_ecu_id, _logging_msg)) except Full: logger.debug(f"message dropped: {_logging_msg}") - return PutLogResponse(code=ErrorCode.SERVER_ERROR) + return PutLogResponse(code=ErrorCode.SERVER_QUEUE_FULL) return PutLogResponse(code=ErrorCode.NO_FAILURE) diff --git a/src/otaclient_iot_logging_server/v1/types.py b/src/otaclient_iot_logging_server/v1/types.py index 56b3ca1..39539d7 100644 --- a/src/otaclient_iot_logging_server/v1/types.py +++ b/src/otaclient_iot_logging_server/v1/types.py @@ -44,8 +44,9 @@ class LogLevel(EnumWrapper): class ErrorCode(EnumWrapper): NO_FAILURE = pb2.NO_FAILURE - SERVER_ERROR = pb2.SERVER_ERROR + SERVER_QUEUE_FULL = pb2.SERVER_QUEUE_FULL NOT_ALLOWED_ECU_ID = pb2.NOT_ALLOWED_ECU_ID + NO_MESSAGE = pb2.NO_MESSAGE # message wrapper definitions diff --git a/tests/test_log_proxy_server.py b/tests/test_log_proxy_server.py index 9113cfa..ffe451f 100644 --- a/tests/test_log_proxy_server.py +++ b/tests/test_log_proxy_server.py @@ -19,21 +19,23 @@ import os import random from dataclasses import dataclass -from http import HTTPStatus from pathlib import Path from queue import Queue -from urllib.parse import urljoin -import aiohttp -import aiohttp.client_exceptions +import grpc import pytest -from aiohttp import web from pytest_mock import MockerFixture import otaclient_iot_logging_server.log_proxy_server as log_server_module from otaclient_iot_logging_server._common import LogsQueue from otaclient_iot_logging_server.ecu_info import parse_ecu_info -from otaclient_iot_logging_server.log_proxy_server import LoggingPostHandler +from otaclient_iot_logging_server.v1 import otaclient_iot_logging_server_v1_pb2 as pb2 +from otaclient_iot_logging_server.v1 import ( + otaclient_iot_logging_server_v1_pb2_grpc as v1_grpc, +) +from otaclient_iot_logging_server.v1 import types +from otaclient_iot_logging_server.v1.api_stub import OtaClientIoTLoggingServiceV1 +from otaclient_iot_logging_server.v1.servicer import OTAClientIoTLoggingServerServicer logger = logging.getLogger(__name__) @@ -58,6 +60,9 @@ class _ServerConfig: @dataclass class MessageEntry: ecu_id: str + log_type: LogType + timestamp: int + level: LogLevel message: str @@ -72,78 +77,71 @@ def generate_random_msgs( ) -> list[MessageEntry]: _res: list[MessageEntry] = [] for _ in range(msg_num): - _ecu, *_ = random.sample(ecus_list, 1) - _msg = os.urandom(msg_len).hex() - _res.append(MessageEntry(_ecu, _msg)) + _ecu_id, *_ = random.sample(ecus_list, 1) + _log_type = random.choice(list(types.LogType)) + _timestamp = random.randint(0, 2**64 - 1) + _level = random.choice(list(types.LogLevel)) + _message = os.urandom(msg_len).hex() + _res.append(MessageEntry(_ecu_id, _log_type, _timestamp, _level, _message)) return _res class TestLogProxyServer: - - SERVER_URL = ( - f"http://{_test_server_cfg.LISTEN_ADDRESS}:{_test_server_cfg.LISTEN_PORT}/" - ) + SERVER_URL = f"{_test_server_cfg.LISTEN_ADDRESS}:{_test_server_cfg.LISTEN_PORT}" TOTAL_MSG_NUM = 4096 @pytest.fixture(autouse=True) def mock_ecu_info(self, mocker: MockerFixture): - ecu_info = parse_ecu_info(TEST_DIR / "ecu_info.yaml") - mocker.patch(f"{MODULE}.ecu_info", ecu_info) + self._ecu_info = parse_ecu_info(TEST_DIR / "ecu_info.yaml") + mocker.patch(f"{MODULE}.ecu_info", self._ecu_info) @pytest.fixture(autouse=True) async def launch_server(self, mocker: MockerFixture, mock_ecu_info): - """ - See https://docs.aiohttp.org/en/stable/web_advanced.html#custom-resource-implementation - for more details. - """ mocker.patch(f"{MODULE}.server_cfg", _test_server_cfg) queue: LogsQueue = Queue() self._queue = queue - handler = LoggingPostHandler(queue) - app = web.Application() - # mute the aiohttp server logging - aiohttp_server_logger = logging.getLogger("aiohttp") - aiohttp_server_logger.setLevel("ERROR") - # add handler to the server - app.add_routes([web.post(r"/{ecu_id}", handler.logging_post_handler)]) - # star the server - runner = web.AppRunner(app) - try: - await runner.setup() - site = web.TCPSite( - runner, _test_server_cfg.LISTEN_ADDRESS, _test_server_cfg.LISTEN_PORT - ) - await site.start() - logger.info(f"test log_proxy_server started at {self.SERVER_URL}") - yield - finally: - await runner.cleanup() + servicer = OTAClientIoTLoggingServerServicer( + ecu_info=self._ecu_info, + queue=queue, + ) - @pytest.fixture(autouse=True) - async def client_sesion(self): - client_session = aiohttp.ClientSession( - raise_for_status=True, - timeout=aiohttp.ClientTimeout(total=0.2), # for speedup testing + server = grpc.aio.server() + v1_grpc.add_OtaClientIoTLoggingServiceServicer_to_server( + servicer=OtaClientIoTLoggingServiceV1(servicer), server=server ) + server.add_insecure_port(self.SERVER_URL) try: - yield client_session + await server.start() + yield finally: - await client_session.close() + await server.stop(None) @pytest.fixture(autouse=True) def prepare_test_data(self): self._msgs = generate_random_msgs(msg_num=self.TOTAL_MSG_NUM) - async def test_server(self, client_sesion: aiohttp.ClientSession): + async def test_server(self): # ------ execution ------ # logger.info(f"sending {self.TOTAL_MSG_NUM} msgs to {self.SERVER_URL}...") + + async def send_msg(item): + _req = pb2.PutLogRequest( + ecu_id=item.ecu_id, + log_type=item.log_type, + timestamp=item.timestamp, + level=item.level, + message=item.message, + ) + async with grpc.aio.insecure_channel(self.SERVER_URL) as channel: + stub = v1_grpc.OtaClientIoTLoggingServiceStub(channel) + _response = await stub.PutLog(_req) + assert _response.code == pb2.ErrorCode.NO_FAILURE + for item in self._msgs: - _ecu_id, _msg = item.ecu_id, item.message - _log_upload_endpoint_url = urljoin(self.SERVER_URL, _ecu_id) - async with client_sesion.post(_log_upload_endpoint_url, data=_msg): - pass # raise_for_status is set on session + await send_msg(item) + # ------ check result ------ # # ensure the all msgs are sent in order to the queue by the server. logger.info("checking all the received messages...") @@ -153,20 +151,22 @@ async def test_server(self, client_sesion: aiohttp.ClientSession): assert _log_msg["message"] == item.message assert self._queue.empty() - @pytest.mark.parametrize( - "_ecu_id, _data", - [ - # unknowned ECU's request will be dropped - ("bad_ecu_id", "valid_msg"), - # empty message will be dropped - ("main", ""), - ], - ) - async def test_reject_invalid_request( - self, _ecu_id: str, _data: str, client_sesion: aiohttp.ClientSession - ): - with pytest.raises(aiohttp.client_exceptions.ClientResponseError) as exc_info: - _log_upload_endpoint_url = urljoin(self.SERVER_URL, _ecu_id) - async with client_sesion.post(_log_upload_endpoint_url, data=_data): - pass # raise_for_status is set on session - assert exc_info.value.status == HTTPStatus.BAD_REQUEST + async def test_reject_invalid_ecu_id(self): + _req = pb2.PutLogRequest( + ecu_id="bad_ecu_id", + message="valid_msg", + ) + async with grpc.aio.insecure_channel(self.SERVER_URL) as channel: + stub = v1_grpc.OtaClientIoTLoggingServiceStub(channel) + _response = await stub.PutLog(_req) + assert _response.code == pb2.ErrorCode.NOT_ALLOWED_ECU_ID + + async def test_reject_invalid_message(self): + _req = pb2.PutLogRequest( + ecu_id="main", + message="", + ) + async with grpc.aio.insecure_channel(self.SERVER_URL) as channel: + stub = v1_grpc.OtaClientIoTLoggingServiceStub(channel) + _response = await stub.PutLog(_req) + assert _response.code == pb2.ErrorCode.NO_MESSAGE