Skip to content

Commit

Permalink
add test code for grpc part
Browse files Browse the repository at this point in the history
  • Loading branch information
airkei committed Jan 7, 2025
1 parent 0779418 commit a41fd72
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 80 deletions.
3 changes: 2 additions & 1 deletion proto/otaclient_iot_logging_server_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ enum LogLevel {

enum ErrorCode {
NO_FAILURE = 0;
SERVER_ERROR = 1;
SERVER_QUEUE_FULL = 1;
NOT_ALLOWED_ECU_ID = 2;
NO_MESSAGE = 3;
}

message PutLogRequest {
Expand Down
2 changes: 1 addition & 1 deletion src/otaclient_iot_logging_server/v1/api_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ def __init__(self, otaclient_iot_logging_server_stub: Any):
self._stub = otaclient_iot_logging_server_stub

async def PutLog(self, request: pb2.PutLogRequest, context) -> pb2.PutLogResponse:
response = await self._stub.put_log(types.UpdateRequest.convert(request))
response = await self._stub.put_log(types.PutLogRequest.convert(request))
return response.export_pb()

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ class LogLevel(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
NO_FAILURE: _ClassVar[ErrorCode]
SERVER_ERROR: _ClassVar[ErrorCode]
SERVER_QUEUE_FULL: _ClassVar[ErrorCode]
NOT_ALLOWED_ECU_ID: _ClassVar[ErrorCode]
NO_MESSAGE: _ClassVar[ErrorCode]

LOG: LogType
METRICS: LogType
Expand All @@ -36,8 +37,9 @@ WARN: LogLevel
ERROR: LogLevel
FATAL: LogLevel
NO_FAILURE: ErrorCode
SERVER_ERROR: ErrorCode
SERVER_QUEUE_FULL: ErrorCode
NOT_ALLOWED_ECU_ID: ErrorCode
NO_MESSAGE: ErrorCode

class PutLogRequest(_message.Message):
__slots__ = ["ecu_id", "log_type", "timestamp", "level", "message"]
Expand Down
11 changes: 7 additions & 4 deletions src/otaclient_iot_logging_server/v1/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ async def put_log(self, request: PutLogRequest) -> PutLogResponse:
_timestamp = request.timestamp
_message = request.message
_allowed_ecus = self._allowed_ecus
# don't allow empty request or unknowned ECUs
# don't allow empty request
if not _message:
return PutLogResponse(code=ErrorCode.NO_MESSAGE)
# don't allow unknowned ECUs
# if ECU id is unknown(not listed in ecu_info.yaml), drop this log.
if not _message or (_allowed_ecus and _ecu_id not in _allowed_ecus):
return PutLogResponse(ErrorCode.NOT_ALLOWED_ECU_ID)
if _allowed_ecus and _ecu_id not in _allowed_ecus:
return PutLogResponse(code=ErrorCode.NOT_ALLOWED_ECU_ID)

_logging_msg = LogMessage(
timestamp=_timestamp * 1000, # milliseconds
Expand All @@ -74,6 +77,6 @@ async def put_log(self, request: PutLogRequest) -> PutLogResponse:
self._queue.put_nowait((_ecu_id, _logging_msg))
except Full:
logger.debug(f"message dropped: {_logging_msg}")
return PutLogResponse(code=ErrorCode.SERVER_ERROR)
return PutLogResponse(code=ErrorCode.SERVER_QUEUE_FULL)

return PutLogResponse(code=ErrorCode.NO_FAILURE)
3 changes: 2 additions & 1 deletion src/otaclient_iot_logging_server/v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ class LogLevel(EnumWrapper):

class ErrorCode(EnumWrapper):
NO_FAILURE = pb2.NO_FAILURE
SERVER_ERROR = pb2.SERVER_ERROR
SERVER_QUEUE_FULL = pb2.SERVER_QUEUE_FULL
NOT_ALLOWED_ECU_ID = pb2.NOT_ALLOWED_ECU_ID
NO_MESSAGE = pb2.NO_MESSAGE


# message wrapper definitions
Expand Down
134 changes: 67 additions & 67 deletions tests/test_log_proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@
import os
import random
from dataclasses import dataclass
from http import HTTPStatus
from pathlib import Path
from queue import Queue
from urllib.parse import urljoin

import aiohttp
import aiohttp.client_exceptions
import grpc
import pytest
from aiohttp import web
from pytest_mock import MockerFixture

import otaclient_iot_logging_server.log_proxy_server as log_server_module
from otaclient_iot_logging_server._common import LogsQueue
from otaclient_iot_logging_server.ecu_info import parse_ecu_info
from otaclient_iot_logging_server.log_proxy_server import LoggingPostHandler
from otaclient_iot_logging_server.v1 import otaclient_iot_logging_server_v1_pb2 as pb2
from otaclient_iot_logging_server.v1 import (
otaclient_iot_logging_server_v1_pb2_grpc as v1_grpc,
)
from otaclient_iot_logging_server.v1 import types
from otaclient_iot_logging_server.v1.api_stub import OtaClientIoTLoggingServiceV1
from otaclient_iot_logging_server.v1.servicer import OTAClientIoTLoggingServerServicer

logger = logging.getLogger(__name__)

Expand All @@ -58,6 +60,9 @@ class _ServerConfig:
@dataclass
class MessageEntry:
ecu_id: str
log_type: LogType
timestamp: int
level: LogLevel
message: str


Expand All @@ -72,78 +77,71 @@ def generate_random_msgs(
) -> list[MessageEntry]:
_res: list[MessageEntry] = []
for _ in range(msg_num):
_ecu, *_ = random.sample(ecus_list, 1)
_msg = os.urandom(msg_len).hex()
_res.append(MessageEntry(_ecu, _msg))
_ecu_id, *_ = random.sample(ecus_list, 1)
_log_type = random.choice(list(types.LogType))
_timestamp = random.randint(0, 2**64 - 1)
_level = random.choice(list(types.LogLevel))
_message = os.urandom(msg_len).hex()
_res.append(MessageEntry(_ecu_id, _log_type, _timestamp, _level, _message))
return _res


class TestLogProxyServer:

SERVER_URL = (
f"http://{_test_server_cfg.LISTEN_ADDRESS}:{_test_server_cfg.LISTEN_PORT}/"
)
SERVER_URL = f"{_test_server_cfg.LISTEN_ADDRESS}:{_test_server_cfg.LISTEN_PORT}"
TOTAL_MSG_NUM = 4096

@pytest.fixture(autouse=True)
def mock_ecu_info(self, mocker: MockerFixture):
ecu_info = parse_ecu_info(TEST_DIR / "ecu_info.yaml")
mocker.patch(f"{MODULE}.ecu_info", ecu_info)
self._ecu_info = parse_ecu_info(TEST_DIR / "ecu_info.yaml")
mocker.patch(f"{MODULE}.ecu_info", self._ecu_info)

@pytest.fixture(autouse=True)
async def launch_server(self, mocker: MockerFixture, mock_ecu_info):
"""
See https://docs.aiohttp.org/en/stable/web_advanced.html#custom-resource-implementation
for more details.
"""
mocker.patch(f"{MODULE}.server_cfg", _test_server_cfg)

queue: LogsQueue = Queue()
self._queue = queue

handler = LoggingPostHandler(queue)
app = web.Application()
# mute the aiohttp server logging
aiohttp_server_logger = logging.getLogger("aiohttp")
aiohttp_server_logger.setLevel("ERROR")
# add handler to the server
app.add_routes([web.post(r"/{ecu_id}", handler.logging_post_handler)])
# star the server
runner = web.AppRunner(app)
try:
await runner.setup()
site = web.TCPSite(
runner, _test_server_cfg.LISTEN_ADDRESS, _test_server_cfg.LISTEN_PORT
)
await site.start()
logger.info(f"test log_proxy_server started at {self.SERVER_URL}")
yield
finally:
await runner.cleanup()
servicer = OTAClientIoTLoggingServerServicer(
ecu_info=self._ecu_info,
queue=queue,
)

@pytest.fixture(autouse=True)
async def client_sesion(self):
client_session = aiohttp.ClientSession(
raise_for_status=True,
timeout=aiohttp.ClientTimeout(total=0.2), # for speedup testing
server = grpc.aio.server()
v1_grpc.add_OtaClientIoTLoggingServiceServicer_to_server(
servicer=OtaClientIoTLoggingServiceV1(servicer), server=server
)
server.add_insecure_port(self.SERVER_URL)
try:
yield client_session
await server.start()
yield
finally:
await client_session.close()
await server.stop(None)

@pytest.fixture(autouse=True)
def prepare_test_data(self):
self._msgs = generate_random_msgs(msg_num=self.TOTAL_MSG_NUM)

async def test_server(self, client_sesion: aiohttp.ClientSession):
async def test_server(self):
# ------ execution ------ #
logger.info(f"sending {self.TOTAL_MSG_NUM} msgs to {self.SERVER_URL}...")

async def send_msg(item):
_req = pb2.PutLogRequest(
ecu_id=item.ecu_id,
log_type=item.log_type,
timestamp=item.timestamp,
level=item.level,
message=item.message,
)
async with grpc.aio.insecure_channel(self.SERVER_URL) as channel:
stub = v1_grpc.OtaClientIoTLoggingServiceStub(channel)
_response = await stub.PutLog(_req)
assert _response.code == pb2.ErrorCode.NO_FAILURE

for item in self._msgs:
_ecu_id, _msg = item.ecu_id, item.message
_log_upload_endpoint_url = urljoin(self.SERVER_URL, _ecu_id)
async with client_sesion.post(_log_upload_endpoint_url, data=_msg):
pass # raise_for_status is set on session
await send_msg(item)

# ------ check result ------ #
# ensure the all msgs are sent in order to the queue by the server.
logger.info("checking all the received messages...")
Expand All @@ -153,20 +151,22 @@ async def test_server(self, client_sesion: aiohttp.ClientSession):
assert _log_msg["message"] == item.message
assert self._queue.empty()

@pytest.mark.parametrize(
"_ecu_id, _data",
[
# unknowned ECU's request will be dropped
("bad_ecu_id", "valid_msg"),
# empty message will be dropped
("main", ""),
],
)
async def test_reject_invalid_request(
self, _ecu_id: str, _data: str, client_sesion: aiohttp.ClientSession
):
with pytest.raises(aiohttp.client_exceptions.ClientResponseError) as exc_info:
_log_upload_endpoint_url = urljoin(self.SERVER_URL, _ecu_id)
async with client_sesion.post(_log_upload_endpoint_url, data=_data):
pass # raise_for_status is set on session
assert exc_info.value.status == HTTPStatus.BAD_REQUEST
async def test_reject_invalid_ecu_id(self):
_req = pb2.PutLogRequest(
ecu_id="bad_ecu_id",
message="valid_msg",
)
async with grpc.aio.insecure_channel(self.SERVER_URL) as channel:
stub = v1_grpc.OtaClientIoTLoggingServiceStub(channel)
_response = await stub.PutLog(_req)
assert _response.code == pb2.ErrorCode.NOT_ALLOWED_ECU_ID

async def test_reject_invalid_message(self):
_req = pb2.PutLogRequest(
ecu_id="main",
message="",
)
async with grpc.aio.insecure_channel(self.SERVER_URL) as channel:
stub = v1_grpc.OtaClientIoTLoggingServiceStub(channel)
_response = await stub.PutLog(_req)
assert _response.code == pb2.ErrorCode.NO_MESSAGE

0 comments on commit a41fd72

Please sign in to comment.