Skip to content

Commit

Permalink
Merge pull request #31 from ReznikovRoman/feature/#19-elk_sentry
Browse files Browse the repository at this point in the history
feature/#19-elk
  • Loading branch information
ReznikovRoman authored Jul 11, 2022
2 parents f43aba4 + 9d113ed commit 9680e83
Show file tree
Hide file tree
Showing 16 changed files with 227 additions and 7 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@ NUGC_QUEUE_PROGRESS_CONSUMERS=2
NUGC_QUEUE_BOOKMARKS_NAME=bookmarks-topic
NUGC_QUEUE_BOOKMARKS_GROUP=bookmarks-group
NUGC_QUEUE_BOOKMARKS_CONSUMERS=2
# Kafka
NUGC_KAFKA_URL=kafka:9092
# ELK
NUGC_LOGSTASH_HOST=logstash
NUGC_LOGSTASH_PORT=5044
NUGC_LOGSTASH_LOGGER_VERSION=1
# Config
NUGC_USE_STUBS=0
NUGC_TESTING=0
NUGC_CI=0
# Kafka
NUGC_KAFKA_URL=kafka:9092
```

### Запуск проекта:
Expand Down
16 changes: 16 additions & 0 deletions conf/elk/logstash/logstash.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
input {
udp {
codec => "json"
port => "5044"
}
}
filter {
}
output {
stdout {
}
elasticsearch {
hosts => [ "${ES_HOST}" ]
index => "ugc.%{+YYYY.MM}"
}
}
42 changes: 42 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,49 @@ services:
schema-registry:
condition: service_started

elasticsearch:
image: elasticsearch:7.17.2
environment:
- xpack.security.enabled=false
- discovery.type=single-node
ports:
- "9200:9200"
volumes:
- es_data_elk:/usr/share/elasticsearch/data
healthcheck:
test: curl -u elastic:elastic -s -f elasticsearch:9200/_cat/health >/dev/null || exit 1
interval: 5s
timeout: 5s
retries: 5

logstash:
image: logstash:7.17.2
environment:
- LS_JAVA_OPTS=-Xms256m -Xmx256m
- ES_HOST=elasticsearch:9200
- XPACK_MONITORING_ENABLED=false
- LOGSTASH_ELASTICSEARCH_HOST=http://elasticsearch:9200
ports:
- "5044:5044/udp"
volumes:
- ./conf/elk/logstash/logstash.conf:/config/logstash.conf:ro
command: logstash -f /config/logstash.conf
depends_on:
elasticsearch:
condition: service_healthy

kibana:
image: kibana:7.17.2
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy


volumes:
redis_data:
mongodb_data:
es_data_elk:
42 changes: 42 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,49 @@ services:
depends_on:
- server

elasticsearch:
image: elasticsearch:7.17.2
environment:
- xpack.security.enabled=false
- discovery.type=single-node
ports:
- "9200:9200"
volumes:
- es_data_elk:/usr/share/elasticsearch/data
healthcheck:
test: curl -u elastic:elastic -s -f elasticsearch:9200/_cat/health >/dev/null || exit 1
interval: 5s
timeout: 5s
retries: 5

logstash:
image: logstash:7.17.2
environment:
- LS_JAVA_OPTS=-Xms256m -Xmx256m
- ES_HOST=elasticsearch:9200
- XPACK_MONITORING_ENABLED=false
- LOGSTASH_ELASTICSEARCH_HOST=http://elasticsearch:9200
ports:
- "5044:5044/udp"
volumes:
- ./conf/elk/logstash/logstash.conf:/config/logstash.conf:ro
command: logstash -f /config/logstash.conf
depends_on:
elasticsearch:
condition: service_healthy

kibana:
image: kibana:7.17.2
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy


volumes:
redis_data:
mongodb_data:
es_data_elk:
3 changes: 3 additions & 0 deletions requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ motor==3.0.0

gunicorn==20.1.0
uvloop==0.16.0
loguru==0.6.0
ecs-logging==2.0.0
python-logstash==0.4.8
11 changes: 11 additions & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ ecdsa==0.17.0 \
--hash=sha256:5cf31d5b33743abe0dfc28999036c849a69d548f994b535e527ee3cb7f3ef676 \
--hash=sha256:b9f500bb439e4153d0330610f5d26baaf18d17b8ced1bc54410d189385ea68aa
# via python-jose
ecs-logging==2.0.0 \
--hash=sha256:8acbc164a7c14d96cb6b3b3e90e8b6972f93ddbd0215e71c6823856518f742ff \
--hash=sha256:d4b791f044e8b96a4707edd4156a35fbe0e593c27ba5f05bae6b007cfab7622c
# via -r requirements.in
email-validator==1.2.1 \
--hash=sha256:6757aea012d40516357c0ac2b1a4c31219ab2f899d26831334c5d069e8b6c3d8 \
--hash=sha256:c8589e691cf73eb99eed8d10ce0e9cbb05a0886ba920c8bcb7c82873f4c5789c
Expand Down Expand Up @@ -439,6 +443,10 @@ kafka-python==2.0.2 \
--hash=sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3 \
--hash=sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e
# via aiokafka
loguru==0.6.0 \
--hash=sha256:066bd06758d0a513e9836fd9c6b5a75bfb3fd36841f4b996bc60b547a309d41c \
--hash=sha256:4e2414d534a2ab57573365b3e6d0234dfb1d84b68b7f3b948e6fb743860a77c3
# via -r requirements.in
markupsafe==2.1.1 \
--hash=sha256:0212a68688482dc52b2d45013df70d169f542b7394fc744c02a57374a4207003 \
--hash=sha256:089cf3dbf0cd6c100f02945abeb18484bd1ee57a079aefd52cffd17fba910b88 \
Expand Down Expand Up @@ -759,6 +767,9 @@ python-jose==3.3.0 \
--hash=sha256:55779b5e6ad599c6336191246e95eb2293a9ddebd555f796a65f838f07e5d78a \
--hash=sha256:9b1376b023f8b298536eedd47ae1089bcdb848f1535ab30555cd92002d78923a
# via -r requirements.in
python-logstash==0.4.8 \
--hash=sha256:d04e1ce11ecc107e4a4f3b807fc57d96811e964a554081b3bbb44732f74ef5f9
# via -r requirements.in
python-ulid==1.1.0 \
--hash=sha256:5fb5e4a91db8ca93e8938a613360b3def299b60d41f847279a8c39c9b2e9c65e \
--hash=sha256:88c952f6be133dbede19c907d72d26717d2691ec8421512b573144794d891e24
Expand Down
3 changes: 3 additions & 0 deletions src/ugc/common/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Final

REQUEST_ID_HEADER: Final[str] = "X-Request-Id"
8 changes: 8 additions & 0 deletions src/ugc/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,11 @@ class AuthorizationError(NetflixUGCError):
message = "Authorization error"
code = "authorization_error"
status_code = HTTPStatus.UNAUTHORIZED


class RequiredHeaderMissingError(NetflixUGCError):
"""Отсутствует обязательный заголовок в запросе."""

message = "Required header is missing"
code = "missing_header"
status_code: int = HTTPStatus.BAD_REQUEST
23 changes: 23 additions & 0 deletions src/ugc/containers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging.config
import sys
from functools import partial

import loguru
import orjson
from dependency_injector import containers, providers

from ugc.core import logging as ugc_logging
from ugc.domain import bookmarks, processors, progress, reviews
from ugc.domain.bookmarks.models import FilmBookmark
from ugc.domain.progress.models import UserFilmProgress
Expand All @@ -27,6 +30,20 @@ class Container(containers.DeclarativeContainer):

config = providers.Configuration()

logstash_handler = providers.Resource(
ugc_logging.init_logstash_handler,
host=config.LOGSTASH_HOST,
port=config.LOGSTASH_PORT,
version=config.LOGSTASH_LOGGER_VERSION,
)

logger = providers.Resource(
ugc_logging.init_logger,
handler=logstash_handler,
log_format="[{time}] [{level}] [Request: {request_id}] [{name}]: {message}",
level=logging.INFO,
)

configure_logging = providers.Resource(
logging.basicConfig,
level=logging.INFO,
Expand Down Expand Up @@ -223,10 +240,16 @@ async def dummy_resource() -> None:


def _override_with_dummy_resources(container: Container) -> Container:
if container.config.CI() or container.config.TESTING():
container.logger.override(providers.Resource(_init_dummy_logger))
container.kafka_producer_client.override(providers.Resource(dummy_resource))
container.kafka_consumer_progress_client.override(providers.Resource(dummy_resource))
container.kafka_consumer_bookmark_client.override(providers.Resource(dummy_resource))
container.kafka_producer.override(sentinel)
container.kafka_bookmark_consumer.override(sentinel)
container.kafka_progress_consumer.override(sentinel)
return container


def _init_dummy_logger() -> None:
loguru.logger.add(sys.stderr, format="{time} {level} {message}", filter="my_module", level="INFO")
5 changes: 5 additions & 0 deletions src/ugc/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ class Settings(BaseSettings):
# Kafka
KAFKA_URL: str

# ELK
LOGSTASH_HOST: str | None = Field("localhost")
LOGSTASH_PORT: int | None = Field(5044)
LOGSTASH_LOGGER_VERSION: int | None = Field(1)

# Config
USE_STUBS: bool = Field(False)
TESTING: bool = Field(False)
Expand Down
33 changes: 33 additions & 0 deletions src/ugc/core/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from logstash import LogstashHandler
from loguru import logger

if TYPE_CHECKING:
from loguru import Logger, Record


def init_logstash_handler(host: str, port: int, version: int) -> LogstashHandler:
logstash_handler = LogstashHandler(host=host, port=port, version=version)
yield logstash_handler


def init_logger(handler: logging.Handler, log_format: str, level: str | int) -> Logger:
logger.add(
handler,
format=log_format,
level=level,
filter=request_id_filter,
)
yield logger


def request_id_filter(record: Record) -> bool:
"""Добавление поля `request_id` к записи в лог."""
extra_fields = record.get("extra")
request_id = extra_fields.get("request_id")
record["request_id"] = request_id
return True
5 changes: 3 additions & 2 deletions src/ugc/domain/processors.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import dataclasses
import logging
from typing import Awaitable, Callable, List

from loguru import logger

from ugc.infrastructure.queue.consumers import AsyncConsumer
from ugc.infrastructure.queue.typedefs import IConsumerRecord

Expand Down Expand Up @@ -39,7 +40,7 @@ async def process_event(self) -> None:
try:
await self.message_callback(message)
except Exception as e:
logging.error("Error while processing message: {exception}".format(exception=e))
logger.error("Error while processing message: {exception}".format(exception=e))
raise e

async def _processor_loop(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/ugc/infrastructure/queue/producers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging
from abc import ABC, abstractmethod
from typing import AsyncIterator, Awaitable, Callable

from aiokafka.errors import KafkaError
from aiokafka.producer import AIOKafkaProducer
from aiokafka.structs import RecordMetadata
from loguru import logger

from .typedefs import IAsyncRecordMetadata, Message

Expand All @@ -28,7 +28,7 @@ async def send(self, queue: str, /, key: str, message: Message) -> Awaitable[Rec
try:
return await self._client.send(queue, key=key, value=message)
except KafkaError as e:
logging.error(e)
logger.error(e)


async def init_kafka_producer_client(
Expand Down
3 changes: 2 additions & 1 deletion src/ugc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ugc.api.v1.routes import setup_routes_v1
from ugc.core.config import get_settings
from ugc.middleware.errors import exceptions_middleware
from ugc.middleware.request_id import request_id_middleware

from .containers import Container, get_processors, override_providers

Expand All @@ -17,7 +18,7 @@ async def create_app() -> web.Application:

container.config.from_pydantic(settings)

app = web.Application(middlewares=[exceptions_middleware])
app = web.Application(middlewares=[request_id_middleware, exceptions_middleware])
app.container = container

api_v1 = web.Application()
Expand Down
23 changes: 23 additions & 0 deletions src/ugc/middleware/request_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Awaitable, Callable

from loguru import logger

from aiohttp import web

from ugc.common.constants import REQUEST_ID_HEADER
from ugc.common.exceptions import RequiredHeaderMissingError


@web.middleware
async def request_id_middleware(request: web.Request, handler: Callable[..., Awaitable]) -> web.Response:
"""Обработка обязательного заголовка `X-Request-Id`."""
request_id = request.headers.get(REQUEST_ID_HEADER)
if not request_id:
raise RequiredHeaderMissingError(message="Required header `X-Request-Id` is missing")
_add_extra_logging_field(request_id)
response = await handler(request)
return response


def _add_extra_logging_field(request_id: str) -> None:
logger.configure(extra={"request_id": request_id})
5 changes: 5 additions & 0 deletions tests/unit/testlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class APIClient(TestClient):
def __init__(self, *args, **kwargs):
super(APIClient, self).__init__(*args, **kwargs)

async def _request(self, method: str, path: str, **kwargs: Any) -> ClientResponse:
headers = kwargs.pop("headers", {})
headers.update({"X-Request-Id": "XXX-XXX-XXX"})
return await super(APIClient, self)._request(method, path, headers=headers, **kwargs)

async def head(self, *args, **kwargs) -> APIResponse:
return await self._api_call("head", kwargs.get("expected_status_code", 200), *args, **kwargs)

Expand Down

0 comments on commit 9680e83

Please sign in to comment.