Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/#21-movie rating #32

Merged
merged 13 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ jobs:
export NUGC_QUEUE_BOOKMARKS_GROUP=bookmarks-group
export NUGC_QUEUE_BOOKMARKS_CONSUMERS=2
export NUGC_QUEUE_BOOKMARKS_NAME=bookmarks-topic
export NUGC_QUEUE_FILM_RATING_NAME=film-rating-topic
export NUGC_QUEUE_FILM_RATING_GROUP=film-rating-group
export NUGC_FILM_RATING_CONSUMERS=2
export NUGC_KAFKA_URL=kafka:9092
export NUGC_USE_STUBS=1
export NUGC_CI=1
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ NUGC_QUEUE_PROGRESS_CONSUMERS=2
NUGC_QUEUE_BOOKMARKS_NAME=bookmarks-topic
NUGC_QUEUE_BOOKMARKS_GROUP=bookmarks-group
NUGC_QUEUE_BOOKMARKS_CONSUMERS=2
NUGC_QUEUE_FILM_RATING_NAME=film-rating-topic
NUGC_QUEUE_FILM_RATING_GROUP=film-rating-group
NUGC_FILM_RATING_CONSUMERS=2
# Kafka
NUGC_KAFKA_URL=kafka:9092
# ELK
Expand Down Expand Up @@ -178,6 +181,9 @@ NUGC_QUEUE_PROGRESS_CONSUMERS=2
NUGC_QUEUE_BOOKMARKS_NAME=bookmarks-topic
NUGC_QUEUE_BOOKMARKS_GROUP=bookmarks-group
NUGC_QUEUE_BOOKMARKS_CONSUMERS=2
NUGC_QUEUE_FILM_RATING_NAME=film-rating-topic
NUGC_QUEUE_FILM_RATING_GROUP=film-rating-group
NUGC_FILM_RATING_CONSUMERS=2
# Kafka
NUGC_KAFKA_URL=kafka:9092
# Config
Expand Down
22 changes: 22 additions & 0 deletions conf/clickhouse/scripts/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,25 @@ ENGINE = Distributed('ugc_cluster', ugc, bookmarks, rand());
CREATE MATERIALIZED VIEW IF NOT EXISTS ugc.bookmarks_consumer ON CLUSTER 'ugc_cluster' TO ugc.bookmarks_distributed
AS SELECT user_id, film_id, bookmarked, bookmarked_at
FROM ugc.bookmarks_queue;

-- Rating
CREATE TABLE IF NOT EXISTS ugc.rating_queue ON CLUSTER 'ugc_cluster'
(
user_id UUID,
film_id UUID,
rating Int32
)
ENGINE=Kafka('kafka:9092', 'film-rating-topic', 'rating-group-kafka', 'JSONEachRow');

CREATE TABLE IF NOT EXISTS ugc.rating ON CLUSTER 'ugc_cluster'
AS ugc.rating_queue
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/rating', '{replica}_rating')
ORDER BY (user_id, film_id, rating);

CREATE TABLE IF NOT EXISTS ugc.rating_distributed ON CLUSTER 'ugc_cluster'
AS ugc.rating
ENGINE = Distributed('ugc_cluster', ugc, rating, rand());

CREATE MATERIALIZED VIEW IF NOT EXISTS ugc.rating_consumer ON CLUSTER 'ugc_cluster' TO ugc.rating_distributed
AS SELECT user_id, film_id, rating
FROM ugc.rating_queue;
1 change: 1 addition & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ services:
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic progress-topic --replication-factor 1 --partitions 3
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic bookmarks-topic --replication-factor 1 --partitions 3
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic film-rating-topic --replication-factor 1 --partitions 3

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
Expand Down
2 changes: 1 addition & 1 deletion src/ugc/api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_token_from_header(headers: Mapping[str, Any]) -> str:

@inject
def get_user_id_from_jwt(headers: Mapping[str, Any], config=Provide[Container.config]) -> UUID:
"""Получение id пользователя из JWT токена."""
"""Получение id пользователя из JWT."""
token = get_token_from_header(headers)
try:
payload = jwt.decode(token, config["JWT_AUTH_SECRET_KEY"], algorithms=[config["JWT_AUTH_ALGORITHM"]])
Expand Down
33 changes: 33 additions & 0 deletions src/ugc/api/v1/handlers/ugc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from ugc.api.utils import orjson_response
from ugc.api.v1 import openapi, serializers
from ugc.containers import Container
from ugc.domain.ratings.exceptions import NoFilmRatingError

if TYPE_CHECKING:
from ugc.domain.bookmarks import BookmarkDispatcherService, BookmarkService
from ugc.domain.progress import ProgressDispatcherService, ProgressService
from ugc.domain.ratings import FilmRatingDispatcherService, FilmRatingService
from ugc.domain.reviews import ReviewService


Expand Down Expand Up @@ -83,6 +85,37 @@ async def get_film_progress(
return orjson_response(progress, status=HTTPStatus.OK)


@docs(**openapi.get_film_rating)
@inject
async def get_film_rating(
request: web.Request, *,
film_rating_service: FilmRatingService = Provide[Container.film_rating_service],
) -> web.Response:
"""Получение рейтинга фильма."""
film_id: UUID = request.match_info["film_id"]
try:
film_rating = await film_rating_service.get_film_rating(film_id)
except NoFilmRatingError:
return orjson_response(status=HTTPStatus.NO_CONTENT)
return orjson_response(film_rating, status=HTTPStatus.OK)


@docs(**openapi.add_film_rating)
@request_schema(serializers.FilmRatingCreate)
@inject
async def add_film_rating(
request: web.Request, *,
film_rating_dispatcher: FilmRatingDispatcherService = Provide[Container.film_rating_dispatcher_service],
) -> web.Response:
"""Добавление пользовательского рейтинга фильму."""
film_id: UUID = request.match_info["film_id"]
user_id = get_user_id_from_jwt(request.headers)
validated_data = request["data"]
rating = validated_data["rating"]
await film_rating_dispatcher.dispatch_film_rating(film_id=film_id, user_id=user_id, rating=rating)
return orjson_response(status=HTTPStatus.ACCEPTED)


@docs(**openapi.create_film_review)
@request_schema(serializers.FilmReviewCreate)
@inject
Expand Down
24 changes: 24 additions & 0 deletions src/ugc/api/v1/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@
},
}

get_film_rating = {
"tags": ["ratings"],
"summary": "Получить рейтинг фильма.",
"responses": {
HTTPStatus.OK: {
"description": "Рейтинг фильма.",
"schema": serializers.FilmRating,
},
HTTPStatus.NO_CONTENT: {"description": "У фильма нет рейтинга."},
HTTPStatus.INTERNAL_SERVER_ERROR: {"description": "Ошибка сервера."},
},
}

add_film_rating = {
"tags": ["ratings"],
"summary": "Поставить оценку фильму.",
"security": [{"JWT": []}],
"responses": {
HTTPStatus.ACCEPTED: {"description": "Рейтинг фильма сохранен."},
HTTPStatus.UNAUTHORIZED: {"description": "Пользователь не авторизован."},
HTTPStatus.INTERNAL_SERVER_ERROR: {"description": "Ошибка сервера."},
},
}

create_film_review = {
"tags": ["review"],
"summary": "Создать пользовательскую рецензию на фильм.",
Expand Down
11 changes: 11 additions & 0 deletions src/ugc/api/v1/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ def setup_routes_v1(app: web.Application) -> None:
allow_head=False,
),

# Film ratings
web.get(
path="/ratings/films/{film_id}",
handler=ugc.get_film_rating,
allow_head=False,
),
web.post(
path="/users/me/ratings/films/{film_id}",
handler=ugc.add_film_rating,
),

# Reviews
web.post(
path="/users/me/reviews/films/{film_id}",
Expand Down
14 changes: 14 additions & 0 deletions src/ugc/api/v1/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ class FilmProgressDetail(Schema):
viewed_frame = fields.Integer()


class FilmRatingCreate(Schema):
"""Сериалайзер для создания оценки фильму."""

rating = fields.Integer(
strict=True, required=True, validate=[Range(min=1, max=10, error="Rating must be between 1 and 10")])


class FilmRating(Schema):
"""Сериалайзер средней оценки фильма."""

film_id = fields.UUID()
rating = fields.Float()


class FilmReviewCreate(Schema):
"""Сериалайзер для создания рецензии на фильм."""

Expand Down
6 changes: 5 additions & 1 deletion src/ugc/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from http import HTTPStatus


class NetflixUGCError(Exception):
class BaseNetflixUGCError(Exception):
"""Базовая ошибка сервиса."""


class NetflixUGCError(BaseNetflixUGCError):
"""Ошибка сервиса Netflix UGC."""

message: str
Expand Down
101 changes: 81 additions & 20 deletions src/ugc/containers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import logging.config
import sys
from functools import partial
from typing import AsyncIterator

import loguru
import orjson
from dependency_injector import containers, providers

from ugc.core import logging as ugc_logging
from ugc.domain import bookmarks, processors, progress, reviews
from ugc.domain import bookmarks, processors, progress, ratings, reviews
from ugc.domain.bookmarks.models import FilmBookmark
from ugc.domain.progress.models import UserFilmProgress
from ugc.domain.ratings.models import FilmRating
from ugc.domain.reviews.constants import REVIEWS_COLLECTION_NAME
from ugc.helpers import sentinel
from ugc.infrastructure.db import mongo, redis, repositories
Expand Down Expand Up @@ -114,6 +116,18 @@ class Container(containers.DeclarativeContainer):
client=kafka_consumer_bookmark_client,
)

kafka_consumer_film_rating_client = providers.Resource(
consumers.init_kafka_consumer_client,
config=config,
topic=config.QUEUE_FILM_RATING_NAME,
group_id=config.QUEUE_FILM_RATING_GROUP,
**consumer_client_config,
)
kafka_film_rating_consumer = providers.Singleton(
consumers.KafkaConsumer,
client=kafka_consumer_film_rating_client,
)

# Domain -> Progress

progress_factory = providers.Factory(progress.FilmProgressFactory)
Expand All @@ -129,19 +143,19 @@ class Container(containers.DeclarativeContainer):
progress_repository=progress_repository,
)

progress_processor = providers.Singleton(
progress.ProgressProcessor,
progress_factory=progress_factory,
progress_service=progress_service,
)

progress_dispatcher_service = providers.Factory(
progress.ProgressDispatcherService,
progress_factory=progress_factory,
producer=kafka_producer,
config=config,
)

progress_processor = providers.Singleton(
progress.ProgressProcessor,
progress_factory=progress_factory,
progress_service=progress_service,
)

progress_processor_service = providers.Factory(
processors.ProcessorService,
consumer=kafka_progress_consumer,
Expand All @@ -164,26 +178,62 @@ class Container(containers.DeclarativeContainer):
bookmark_repository=bookmark_repository,
)

bookmark_processor = providers.Singleton(
bookmarks.BookmarkProcessor,
bookmark_factory=bookmark_factory,
bookmark_service=bookmark_service,
)

bookmark_dispatcher_service = providers.Factory(
bookmarks.BookmarkDispatcherService,
bookmark_factory=bookmark_factory,
producer=kafka_producer,
config=config,
)

bookmark_processor = providers.Singleton(
bookmarks.BookmarkProcessor,
bookmark_factory=bookmark_factory,
bookmark_service=bookmark_service,
)

bookmark_processor_service = providers.Factory(
processors.ProcessorService,
consumer=kafka_bookmark_consumer,
concurrency=config.QUEUE_BOOKMARKS_CONSUMERS,
message_callback=bookmark_processor,
)

# Domain -> FilmRating

film_rating_factory = providers.Factory(ratings.FilmRatingFactory)

film_rating_repository = providers.Singleton(
ratings.FilmRatingRepository,
redis_client=redis_client,
film_rating_factory=film_rating_factory,
redis_repository=redis_repository_factory(model=FilmRating),
)

film_rating_service = providers.Singleton(
ratings.FilmRatingService,
film_rating_repository=film_rating_repository,
)

film_rating_dispatcher_service = providers.Factory(
ratings.FilmRatingDispatcherService,
film_rating_factory=film_rating_factory,
producer=kafka_producer,
config=config,
)

film_rating_processor = providers.Singleton(
ratings.FilmRatingProcessor,
film_rating_factory=film_rating_factory,
film_rating_service=film_rating_service,
)

film_rating_processor_service = providers.Factory(
processors.ProcessorService,
consumer=kafka_film_rating_consumer,
concurrency=config.QUEUE_FILM_RATING_CONSUMERS,
message_callback=film_rating_processor,
)

# Domain -> Reviews

review_factory = providers.Factory(reviews.FilmReviewFactory)
Expand Down Expand Up @@ -217,22 +267,31 @@ def override_providers(container: Container) -> Container:
InMemoryProcessor,
queue=providers.Singleton(InMemoryQueue),
)
film_rating_processor = providers.Singleton(
InMemoryProcessor,
queue=providers.Singleton(InMemoryQueue),
)
container.progress_dispatcher_service.add_kwargs(producer=progress_processor)
container.bookmark_dispatcher_service.add_kwargs(producer=bookmark_processor)
container.film_rating_dispatcher_service.add_kwargs(producer=film_rating_processor)
container.progress_processor_service.add_kwargs(consumer=progress_processor)
container.bookmark_processor_service.add_kwargs(consumer=bookmark_processor)
container.film_rating_processor_service.add_kwargs(consumer=film_rating_processor)

return container


async def get_processors(container: Container) -> list[processors.ProcessorService]:
processor_services = [
container.progress_processor_service(),
container.bookmark_processor_service(),
async def get_processors(container: Container) -> AsyncIterator[processors.ProcessorService]:
processor_providers = [
container.progress_processor_service,
container.bookmark_processor_service,
container.film_rating_processor_service,
]
if container.progress_processor_service.is_async_mode_enabled():
return [await processor_service for processor_service in processor_services]
return processor_services
for provider in processor_providers:
try:
yield await provider()
except TypeError:
yield provider()


async def dummy_resource() -> None:
Expand All @@ -245,9 +304,11 @@ def _override_with_dummy_resources(container: Container) -> Container:
container.kafka_producer_client.override(providers.Resource(dummy_resource))
container.kafka_consumer_progress_client.override(providers.Resource(dummy_resource))
container.kafka_consumer_bookmark_client.override(providers.Resource(dummy_resource))
container.kafka_consumer_film_rating_client.override(providers.Resource(dummy_resource))
container.kafka_producer.override(sentinel)
container.kafka_bookmark_consumer.override(sentinel)
container.kafka_progress_consumer.override(sentinel)
container.kafka_film_rating_consumer.override(sentinel)
return container


Expand Down
Loading