diff --git a/apps/recommendation/api/src/huggy/core/endpoint/retrieval_endpoint.py b/apps/recommendation/api/src/huggy/core/endpoint/retrieval_endpoint.py index bc2ef222..40723f6e 100644 --- a/apps/recommendation/api/src/huggy/core/endpoint/retrieval_endpoint.py +++ b/apps/recommendation/api/src/huggy/core/endpoint/retrieval_endpoint.py @@ -1,6 +1,7 @@ from abc import abstractmethod from dataclasses import dataclass from datetime import datetime +from typing import Optional from fastapi.encoders import jsonable_encoder from huggy.core.endpoint import VERTEX_CACHE, AbstractEndpoint @@ -271,48 +272,54 @@ def _log_cache_usage(self, cache_key: str, action: str) -> None: ) -class FilterRetrievalEndpoint(RetrievalEndpoint): - MODEL_TYPE = "filter" +class BookingNumberRetrievalEndpoint(RetrievalEndpoint): + MODEL_TYPE = "tops" def get_instance(self, size: int): return { - "model_type": "filter", + "model_type": "tops", + "user_id": str(self.user.user_id), "size": size, "params": self.get_params(), "call_id": self.call_id, "debug": 1, "vector_column_name": "booking_number_desc", "similarity_metric": "dot", + "re_rank": 1, } class CreationTrendRetrievalEndpoint(RetrievalEndpoint): - MODEL_TYPE = "filter" + MODEL_TYPE = "tops" def get_instance(self, size: int): return { - "model_type": "filter", + "model_type": "tops", + "user_id": str(self.user.user_id), "size": size, "params": self.get_params(), "call_id": self.call_id, "debug": 1, "vector_column_name": "booking_creation_trend_desc", "similarity_metric": "dot", + "re_rank": 1, } class ReleaseTrendRetrievalEndpoint(RetrievalEndpoint): - MODEL_TYPE = "filter" + MODEL_TYPE = "tops" def get_instance(self, size: int): return { - "model_type": "filter", + "model_type": "tops", + "user_id": str(self.user.user_id), "size": size, "params": self.get_params(), "call_id": self.call_id, "debug": 1, "vector_column_name": "booking_release_trend_desc", "similarity_metric": "dot", + "re_rank": 1, } @@ -328,7 +335,6 @@ def get_instance(self, size: int): "call_id": self.call_id, "debug": 1, "prefilter": 1, - "vector_column_name": "raw_embeddings", "similarity_metric": "dot", } @@ -339,19 +345,16 @@ class OfferRetrievalEndpoint(RetrievalEndpoint): def init_input( self, user: UserContext, - offer: Offer, + input_offers: Optional[list[Offer]], params_in: PlaylistParams, call_id: str, ): self.user = user - self.offer = offer + self.input_offers = input_offers self.call_id = call_id - if params_in.offers: - self.items = [offer.item_id for offer in params_in.offers] - else: - self.items = [str(self.offer.item_id)] self.params_in = params_in - self.is_geolocated = self.offer.is_geolocated if self.offer else False + self.items = [offer.item_id for offer in self.input_offers] + self.is_geolocated = any(offer.is_geolocated for offer in self.input_offers) def get_instance(self, size: int): return { @@ -363,8 +366,8 @@ def get_instance(self, size: int): "debug": 1, "similarity_metric": "l2", "prefilter": 1, - "vector_column_name": "raw_embeddings", "user_id": str(self.user.user_id), + "re_rank": 1, } @@ -374,27 +377,28 @@ class OfferSemanticRetrievalEndpoint(OfferRetrievalEndpoint): def get_instance(self, size: int): return { "model_type": "similar_offer", - "offer_id": str(self.item_id), + "items": self.items, "size": size, "params": self.get_params(), "call_id": self.call_id, "debug": 1, "similarity_metric": "l2", "prefilter": 1, - "vector_column_name": "raw_embeddings", } -class OfferFilterRetrievalEndpoint(OfferRetrievalEndpoint): - MODEL_TYPE = "filter" +class OfferBookingNumberRetrievalEndpoint(OfferRetrievalEndpoint): + MODEL_TYPE = "tops" def get_instance(self, size: int): return { - "model_type": "filter", + "model_type": "tops", "size": size, "params": self.get_params(), "call_id": self.call_id, "debug": 1, "vector_column_name": "booking_number_desc", + "user_id": str(self.user.user_id), "similarity_metric": "dot", + "re_rank": 1, } diff --git a/apps/recommendation/api/src/huggy/core/model_engine/__init__.py b/apps/recommendation/api/src/huggy/core/model_engine/__init__.py index e40656ab..96b26637 100644 --- a/apps/recommendation/api/src/huggy/core/model_engine/__init__.py +++ b/apps/recommendation/api/src/huggy/core/model_engine/__init__.py @@ -1,5 +1,6 @@ import datetime from abc import ABC, abstractmethod +from typing import Optional import huggy.schemas.offer as o import pytz @@ -19,26 +20,34 @@ class ModelEngine(ABC): """ - Abstract base class to build the scoring pipeline used in the recommendation system. + Abstract base class for building the scoring pipeline used in the recommendation system. Attributes: - user (UserContext): The user context. - params_in (PlaylistParams): The playlist parameters. - call_id (str): The call ID. - context (str): The context. - offer (o.Offer, optional): The offer. Defaults to None. - reco_origin (str): The recommendation origin. One of "unknown", "cold_start", "algo". - model_origin (str): The model origin. - model_params (ModelConfiguration): The model configuration. - scorer (OfferScorer): The offer scorer. + user (UserContext): Contains user-specific data used for generating personalized recommendations. + params_in (PlaylistParams): Input parameters defining the playlist or set of offers being processed. + call_id (str): Unique identifier for the recommendation call session. + context (str): Additional context regarding the recommendation, such as session data or request origin. + input_offers (list[o.Offer], optional): List of offer objects to be scored. Defaults to None if no offers are provided. + reco_origin (str): Indicates the origin of the recommendation. It can be "unknown", "cold_start", or "algo". + model_origin (str): Identifies the origin of the model used for scoring (e.g., algorithm type). + model_params (ModelConfiguration): Configuration object containing the model parameters. + scorer (OfferScorer): Initialized scorer object responsible for evaluating and ranking the offers. Methods: - get_model_configuration: Method to get the model configuration. - get_scorer: Initializes the endpoints (retrieval and ranking) and returns the offer scorer. - get_scoring: Returns a list of offer IDs to be sent to the user. - save_context: Saves the context and offer information to the database. - log_extra_data: Logs extra data related to the model engine. + get_model_configuration(user, params_in): + Retrieves the model configuration based on user data and playlist parameters. + get_scorer(): + Initializes the scoring mechanisms (retrieval and ranking) and returns an OfferScorer instance. + + get_scoring(): + Generates and returns a list of offer IDs, scored and ranked, to be presented to the user. + + save_context(): + Saves the current recommendation context, including the offers and user session data, to the database for tracking and auditing. + + log_extra_data(): + Logs any additional data related to the model's execution, such as performance metrics or anomalies, for monitoring and debugging. """ def __init__( @@ -47,13 +56,10 @@ def __init__( params_in: PlaylistParams, call_id: str, context: str, - offer: o.Offer = None, + input_offers: Optional[list[o.Offer]] = None, ): self.user = user - self.offer = offer - self.offers = ( - list(params_in.offers) if isinstance(params_in.offers, list) else [offer] - ) + self.input_offers = input_offers self.params_in = params_in self.call_id = call_id self.context = context @@ -89,7 +95,7 @@ def get_scorer(self) -> OfferScorer: model_params=self.model_params, retrieval_endpoints=self.model_params.retrieval_endpoints, ranking_endpoint=self.model_params.ranking_endpoint, - offer=self.offer, + input_offers=self.input_offers, ) async def get_scoring(self, db: AsyncSession) -> list[str]: @@ -109,7 +115,7 @@ async def get_scoring(self, db: AsyncSession) -> list[str]: # apply diversification filter if diversification_params.is_active: scored_offers = order_offers_by_score_and_diversify_features( - offers=scored_offers, + scored_offers=scored_offers, score_column=diversification_params.order_column, score_order_ascending=diversification_params.order_ascending, shuffle_recommendation=diversification_params.is_reco_shuffled, @@ -121,7 +127,7 @@ async def get_scoring(self, db: AsyncSession) -> list[str]: scoring_size = min(len(scored_offers), NUMBER_OF_RECOMMENDATIONS) await self.save_context( session=db, - offers=scored_offers[:scoring_size], + scored_offers=scored_offers[:scoring_size], context=self.context, user=self.user, ) @@ -131,18 +137,20 @@ async def get_scoring(self, db: AsyncSession) -> list[str]: async def save_context( self, session: AsyncSession, - offers: list[RankedOffer], + scored_offers: list[RankedOffer], context: str, user: UserContext, ) -> None: - if len(offers) > 0: + if len(scored_offers) > 0: date = datetime.datetime.now(pytz.utc) context_extra_data = await self.log_extra_data() # add similar offer_id origin input. - if self.offer is not None: - context_extra_data["offer_origin_id"] = self.offer.offer_id + if self.input_offers is not None: + context_extra_data["offer_origin_ids"] = ":".join( + [offer.offer_id for offer in self.input_offers] + ) - for idx, o in enumerate(offers): + for idx, o in enumerate(scored_offers): session.add( PastOfferContext( call_id=self.call_id, diff --git a/apps/recommendation/api/src/huggy/core/model_engine/factory.py b/apps/recommendation/api/src/huggy/core/model_engine/factory.py new file mode 100644 index 00000000..53111920 --- /dev/null +++ b/apps/recommendation/api/src/huggy/core/model_engine/factory.py @@ -0,0 +1,156 @@ +from dataclasses import dataclass +from typing import Optional + +import huggy.schemas.offer as o +from huggy.core.model_engine import ModelEngine +from huggy.core.model_engine.recommendation import Recommendation +from huggy.core.model_engine.similar_offer import SimilarOffer +from huggy.schemas.playlist_params import PlaylistParams +from huggy.schemas.user import UserContext +from sqlalchemy.ext.asyncio import AsyncSession + + +@dataclass +class ModelEngineOut: + model: ModelEngine + results: list[str] + + +class ModelEngineFactory: + """ + Factory for creating the appropriate model engine handler. + """ + + @staticmethod + async def handle_prediction( + db: AsyncSession, + user: UserContext, + params_in: PlaylistParams, + call_id: str, + context: str, + *, + use_fallback: bool, + input_offers: Optional[list[o.Offer]] = None, + ) -> ModelEngineOut: + """ + Returns the appropriate model engine based on input context and offers. + Fallback to default recommendation if no results are found or specific conditions apply. + """ + input_offers = input_offers or [] + + model_engine = ModelEngineFactory._determine_model_engine( + user, params_in, call_id, context, input_offers + ) + + # Get results from the selected model engine + results = await model_engine.get_scoring(db) + + # Handle fallback scenario if enabled and no results are found + if use_fallback and len(results) == 0: + model_engine = await ModelEngineFactory._handle_fallback( + user, params_in, call_id, input_offers + ) + results = await model_engine.get_scoring(db) + + return ModelEngineOut(model=model_engine, results=results) + + @staticmethod + def _determine_model_engine( + user: UserContext, + params_in: PlaylistParams, + call_id: str, + context: str, + input_offers: Optional[list[o.Offer]], + ) -> ModelEngine: + """ + Determines the appropriate model engine based on the context and input offers. + """ + if context == "similar_offer": + return ModelEngineFactory._get_similar_offer_model( + user, params_in, call_id, input_offers + ) + elif context == "recommendation": + return ModelEngineFactory._get_recommendation_model( + user, params_in, call_id, input_offers + ) + else: + raise Exception(f"context {context} is not available") + + @staticmethod + def _get_similar_offer_model( + user: UserContext, + params_in: PlaylistParams, + call_id: str, + input_offers: Optional[list[o.Offer]], + ) -> ModelEngine: + """ + Selects a model engine for the 'similar_offer' context. + """ + if input_offers: + if any(offer.is_sensitive for offer in input_offers): + return Recommendation( + user=user, + params_in=params_in, + call_id=call_id, + context="recommendation_fallback", + ) + else: + return SimilarOffer( + user=user, + params_in=params_in, + call_id=call_id, + context="similar_offer", + input_offers=input_offers, + ) + else: + return Recommendation( + user=user, + params_in=params_in, + call_id=call_id, + context="recommendation_fallback", + ) + + @staticmethod + def _get_recommendation_model( + user: UserContext, + params_in: PlaylistParams, + call_id: str, + input_offers: Optional[list[o.Offer]], + ) -> ModelEngine: + """ + Selects a model engine for the 'recommendation' context. + """ + if input_offers: + return SimilarOffer( + user=user, + params_in=params_in, + call_id=call_id, + context="hybrid_recommendation", + input_offers=input_offers, + ) + else: + return Recommendation( + user=user, + params_in=params_in, + call_id=call_id, + context="recommendation", + ) + + @staticmethod + async def _handle_fallback( + user: UserContext, + params_in: PlaylistParams, + call_id: str, + input_offers: Optional[list[o.Offer]], + ) -> ModelEngine: + """ + Handles fallback by using the 'recommendation_fallback' model. + """ + fallback_model = Recommendation( + user=user, + params_in=params_in, + call_id=call_id, + context="recommendation_fallback", + input_offers=input_offers, + ) + return fallback_model diff --git a/apps/recommendation/api/src/huggy/core/model_engine/similar_offer.py b/apps/recommendation/api/src/huggy/core/model_engine/similar_offer.py index 95378d60..3e8d130a 100644 --- a/apps/recommendation/api/src/huggy/core/model_engine/similar_offer.py +++ b/apps/recommendation/api/src/huggy/core/model_engine/similar_offer.py @@ -3,7 +3,6 @@ from huggy.core.model_selection.model_configuration.configuration import ForkOut from huggy.schemas.playlist_params import PlaylistParams from huggy.schemas.user import UserContext -from sqlalchemy.ext.asyncio import AsyncSession class SimilarOffer(ModelEngine): @@ -25,7 +24,7 @@ def get_model_configuration( self, user: UserContext, params_in: PlaylistParams ) -> ForkOut: return select_sim_model_params( - params_in.model_endpoint, offer=self.offer, offers=params_in.offers + params_in.model_endpoint, input_offers=self.input_offers ) def get_scorer(self): @@ -33,7 +32,7 @@ def get_scorer(self): for endpoint in self.model_params.retrieval_endpoints: endpoint.init_input( user=self.user, - offer=self.offer, + input_offers=self.input_offers, params_in=self.params_in, call_id=self.call_id, ) @@ -49,14 +48,5 @@ def get_scorer(self): model_params=self.model_params, retrieval_endpoints=self.model_params.retrieval_endpoints, ranking_endpoint=self.model_params.ranking_endpoint, - offer=self.offer, + input_offers=self.input_offers, ) - - async def get_scoring(self, db: AsyncSession) -> list[str]: - if self.offer is not None and self.offer.item_id is None: - return [] - if ( - len(self.offers) > 0 and len([offer.item_id for offer in self.offers]) == 0 - ): # to review - return [] - return await super().get_scoring(db) diff --git a/apps/recommendation/api/src/huggy/core/model_selection/__init__.py b/apps/recommendation/api/src/huggy/core/model_selection/__init__.py index f95f1280..16ac70b7 100644 --- a/apps/recommendation/api/src/huggy/core/model_selection/__init__.py +++ b/apps/recommendation/api/src/huggy/core/model_selection/__init__.py @@ -1,3 +1,5 @@ +from typing import Optional + from fastapi.exceptions import HTTPException from huggy.core.model_selection.model_configuration.configuration import ( ForkOut, @@ -248,7 +250,7 @@ def select_reco_model_params(model_endpoint: str, user: UserContext) -> ForkOut: def select_sim_model_params( - model_endpoint: str, offer: Offer, offers: list[Offer] + model_endpoint: str, input_offers: Optional[list[Offer]] ) -> ForkOut: """ Choose the model to apply for Similar Offers based on offer interaction. @@ -264,7 +266,7 @@ def select_sim_model_params( model_name = SIMILAR_OFFER_MODEL_CONTEXT model_fork = SIMILAR_OFFER_ENDPOINTS[model_name].generate() return model_fork.get_offer_status( - offer=offer, offers=offers, model_origin=model_name + input_offers=input_offers, model_origin=model_name ) diff --git a/apps/recommendation/api/src/huggy/core/model_selection/endpoint/offer_retrieval.py b/apps/recommendation/api/src/huggy/core/model_selection/endpoint/offer_retrieval.py index 72e4aea2..7aa4e7bd 100644 --- a/apps/recommendation/api/src/huggy/core/model_selection/endpoint/offer_retrieval.py +++ b/apps/recommendation/api/src/huggy/core/model_selection/endpoint/offer_retrieval.py @@ -1,5 +1,5 @@ from huggy.core.endpoint.retrieval_endpoint import ( - OfferFilterRetrievalEndpoint, + OfferBookingNumberRetrievalEndpoint, OfferRetrievalEndpoint, OfferSemanticRetrievalEndpoint, ) @@ -24,7 +24,7 @@ ) -offer_filter_retrieval_endpoint = OfferFilterRetrievalEndpoint( +offer_filter_retrieval_endpoint = OfferBookingNumberRetrievalEndpoint( endpoint_name=RetrievalEndpointName.recommendation_user_retrieval, size=50, use_cache=True, diff --git a/apps/recommendation/api/src/huggy/core/model_selection/endpoint/user_retrieval.py b/apps/recommendation/api/src/huggy/core/model_selection/endpoint/user_retrieval.py index e60f4075..5e642462 100644 --- a/apps/recommendation/api/src/huggy/core/model_selection/endpoint/user_retrieval.py +++ b/apps/recommendation/api/src/huggy/core/model_selection/endpoint/user_retrieval.py @@ -1,13 +1,12 @@ from huggy.core.endpoint.retrieval_endpoint import ( + BookingNumberRetrievalEndpoint, CreationTrendRetrievalEndpoint, - FilterRetrievalEndpoint, RecommendationRetrievalEndpoint, ReleaseTrendRetrievalEndpoint, ) from huggy.core.model_selection.endpoint import RetrievalEndpointName -# default -filter_retrieval_endpoint = FilterRetrievalEndpoint( +filter_retrieval_endpoint = BookingNumberRetrievalEndpoint( endpoint_name=RetrievalEndpointName.recommendation_user_retrieval, size=150, use_cache=True, @@ -32,7 +31,7 @@ ) # version B -filter_retrieval_endpoint_version_b = FilterRetrievalEndpoint( +filter_retrieval_endpoint_version_b = BookingNumberRetrievalEndpoint( endpoint_name=RetrievalEndpointName.recommendation_user_retrieval_version_b, size=150, use_cache=True, diff --git a/apps/recommendation/api/src/huggy/core/model_selection/model_configuration/configuration.py b/apps/recommendation/api/src/huggy/core/model_selection/model_configuration/configuration.py index 635b66e9..b8bb07c1 100644 --- a/apps/recommendation/api/src/huggy/core/model_selection/model_configuration/configuration.py +++ b/apps/recommendation/api/src/huggy/core/model_selection/model_configuration/configuration.py @@ -1,6 +1,7 @@ import copy import typing as t from dataclasses import dataclass +from typing import Optional import huggy.core.model_selection.endpoint.user_ranking as user_ranking import huggy.core.scorer.offer as offer_scorer @@ -133,36 +134,17 @@ def get_user_status(self, user: UserContext, model_origin: str) -> ForkOut: ) def get_offer_status( - self, offer: Offer, offers: list[Offer], model_origin: str + self, input_offers: Optional[list[Offer]], model_origin: str ) -> ForkOut: """ Get model status based on Offer interactions """ - - if offers: - return ForkOut( - copy.deepcopy(self.warm_start_model), - reco_origin="algo", - model_origin=model_origin, - ) - - if not offer.found: - return ForkOut( - copy.deepcopy(self.cold_start_model), - reco_origin="unknown", - model_origin=model_origin, - ) - if self.bookings_count is not None: - if offer.booking_number >= self.bookings_count: - return ForkOut( - copy.deepcopy(self.warm_start_model), - reco_origin="algo", - model_origin=model_origin, - ) + # No cold start logic for similar offer. + # TODO: remove cold_start logic for recommendation return ForkOut( - copy.deepcopy(self.cold_start_model), - reco_origin="cold_start", + copy.deepcopy(self.warm_start_model), + reco_origin="algo", model_origin=model_origin, ) diff --git a/apps/recommendation/api/src/huggy/core/scorer/offer.py b/apps/recommendation/api/src/huggy/core/scorer/offer.py index 168cd912..c728190e 100644 --- a/apps/recommendation/api/src/huggy/core/scorer/offer.py +++ b/apps/recommendation/api/src/huggy/core/scorer/offer.py @@ -39,11 +39,10 @@ def __init__( retrieval_endpoints: list[RetrievalEndpoint], ranking_endpoint: RankingEndpoint, model_params, - offer: t.Optional[o.Offer] = None, + input_offers: t.Optional[list[o.Offer]] = None, ): self.user = user - self.offer = offer - self.offers = params_in.offers + self.input_offers = input_offers self.model_params = model_params self.params_in = params_in self.retrieval_endpoints = retrieval_endpoints @@ -153,7 +152,7 @@ async def get_recommendable_offers( db, self.user, recommendable_items_ids, - offer=self.offer, + input_offers=self.input_offers, query_order=self.model_params.query_order, ) if self.use_cache and result is not None: @@ -165,8 +164,9 @@ async def get_distance( self, item: i.RecommendableItem, user: u.UserContext, - offer: o.Offer, default_max_distance: int, + offer_latitude: t.Optional[float] = None, + offer_longitude: t.Optional[float] = None, ) -> float: # If item is not geolocated then return if not item.is_geolocated: @@ -180,31 +180,55 @@ async def get_distance( user.longitude, ) - if offer is not None and offer.is_geolocated: + if offer_latitude is not None and offer_longitude is not None: distance = haversine_distance( item.example_venue_latitude, item.example_venue_longitude, - offer.latitude, - offer.longitude, + offer_latitude, + offer_longitude, ) within_radius = distance <= default_max_distance return distance, within_radius + async def get_mean_offer_coordinates( + self, input_offers: t.Optional[list[o.Offer]] = None + ) -> tuple[t.Optional[float], t.Optional[float]]: + if input_offers: + geolocated_offers = [offer for offer in input_offers if offer.is_geolocated] + if len(geolocated_offers) > 0: + longitude = sum([offer.longitude for offer in geolocated_offers]) / len( + geolocated_offers + ) + latitude = sum([offer.latitude for offer in geolocated_offers]) / len( + geolocated_offers + ) + return latitude, longitude + return None, None + async def get_nearest_offers( self, db: AsyncSession, user: u.UserContext, recommendable_items_ids: dict[str, i.RecommendableItem], limit: int = 500, - offer: t.Optional[o.Offer] = None, + input_offers: t.Optional[list[o.Offer]] = None, query_order: QueryOrderChoices = QueryOrderChoices.ITEM_RANK, ) -> RecommendableOfferResult: recommendable_offers = [] multiple_item_offers = [] + + offer_latitude, offer_longitude = await self.get_mean_offer_coordinates( + input_offers + ) + for v in recommendable_items_ids.values(): if v.total_offers == 1 or not v.is_geolocated: user_distance, within_radius = await self.get_distance( - v, user, offer, default_max_distance=100_000 + v, + user, + default_max_distance=100_000, + offer_latitude=offer_latitude, + offer_longitude=offer_longitude, ) if within_radius: recommendable_offers.append( @@ -225,7 +249,7 @@ async def get_nearest_offers( user=user, recommendable_items_ids=multiple_item_offers, limit=limit, - offer=offer, + input_offers=input_offers, query_order=query_order, ) for found_offers in offer_distances: diff --git a/apps/recommendation/api/src/huggy/crud/offer.py b/apps/recommendation/api/src/huggy/crud/offer.py index d68e456e..23b2342c 100644 --- a/apps/recommendation/api/src/huggy/crud/offer.py +++ b/apps/recommendation/api/src/huggy/crud/offer.py @@ -10,6 +10,16 @@ class Offer: + @staticmethod + async def parse_offer_list( + db: AsyncSession, offer_list: list[str] + ) -> list[o.Offer]: + offers = [ + await Offer().get_offer_characteristics(db, offer_id) + for offer_id in offer_list + ] + return [o for o in offers if o.found] + async def get_item(self, db: AsyncSession, offer_id: str) -> t.Optional[ItemIds]: try: item_table: ItemIds = await ItemIds().get_available_table(db) diff --git a/apps/recommendation/api/src/huggy/crud/recommendable_offer.py b/apps/recommendation/api/src/huggy/crud/recommendable_offer.py index c8167014..0ac06e81 100644 --- a/apps/recommendation/api/src/huggy/crud/recommendable_offer.py +++ b/apps/recommendation/api/src/huggy/crud/recommendable_offer.py @@ -12,11 +12,16 @@ class RecommendableOffer: - async def is_geolocated(self, user: UserContext, offer: o.Offer) -> bool: + async def is_geolocated( + self, user: UserContext, input_offers: Optional[list[o.Offer]] + ) -> bool: if user is not None and user.is_geolocated: return True - else: - return bool(offer is not None and offer.is_geolocated) + elif input_offers is not None: + return any( + offer is not None and offer.is_geolocated for offer in input_offers + ) + return False async def get_nearest_offers( self, @@ -24,14 +29,16 @@ async def get_nearest_offers( user: UserContext, recommendable_items_ids: list[RecommendableItem], limit: int = 500, - offer: Optional[o.Offer] = None, + input_offers: Optional[list[o.Offer]] = None, query_order: QueryOrderChoices = QueryOrderChoices.ITEM_RANK, ) -> list[o.OfferDistance]: - if await self.is_geolocated(user, offer): + if await self.is_geolocated(user, input_offers): offer_table: RecommendableOffersRaw = ( await RecommendableOffersRaw().get_available_table(db) ) - user_distance = self.get_st_distance(user, offer_table, offer=offer) + user_distance = self.get_st_distance( + user, offer_table, input_offers=input_offers + ) recommendable_items = self.get_items(recommendable_items_ids) @@ -99,8 +106,13 @@ def get_st_distance( self, user: UserContext, offer_table: RecommendableOffersRaw, - offer: o.Offer = None, + input_offers: Optional[list[o.Offer]] = None, ): + if input_offers: + geolocated_offers = [offer for offer in input_offers if offer.is_geolocated] + else: + geolocated_offers = [] + if user is not None and user.is_geolocated: user_point = func.ST_GeographyFromText( f"POINT({user.longitude} {user.latitude})" @@ -108,10 +120,15 @@ def get_st_distance( return func.ST_Distance(user_point, offer_table.venue_geo).label( "user_distance" ) - elif offer is not None and offer.is_geolocated: - offer_point = func.ST_GeographyFromText( - f"POINT({offer.longitude} {offer.latitude})" + elif geolocated_offers: + longitude = sum([offer.longitude for offer in geolocated_offers]) / len( + geolocated_offers + ) + latitude = sum([offer.latitude for offer in geolocated_offers]) / len( + geolocated_offers ) + + offer_point = func.ST_GeographyFromText(f"POINT({longitude} {latitude})") return func.ST_Distance(offer_point, offer_table.venue_geo).label( "user_distance" ) diff --git a/apps/recommendation/api/src/huggy/schemas/playlist_params.py b/apps/recommendation/api/src/huggy/schemas/playlist_params.py index a2378b96..c1cffb06 100644 --- a/apps/recommendation/api/src/huggy/schemas/playlist_params.py +++ b/apps/recommendation/api/src/huggy/schemas/playlist_params.py @@ -4,11 +4,9 @@ from dateutil.parser import parse from fastapi import Query -from huggy.crud.offer import Offer -from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator from pydantic.alias_generators import to_camel from pydantic_core.core_schema import ValidationInfo -from sqlalchemy.ext.asyncio import AsyncSession under_pat = re.compile(r"_([a-z])") @@ -31,6 +29,8 @@ class PlaylistParams(BaseModel): model_config = ConfigDict( alias_generator=to_camel, populate_by_name=True, protected_namespaces=() ) + input_offers: Optional[list[str]] = None + user_id: Optional[str] = None model_endpoint: Optional[str] = None start_date: Optional[datetime] = None end_date: Optional[datetime] = None @@ -39,7 +39,7 @@ class PlaylistParams(BaseModel): price_max: Optional[float] = None price_min: Optional[float] = None is_reco_shuffled: Optional[bool] = None - is_restrained: Optional[str] = None + is_restrained: Optional[bool] = None is_digital: Optional[bool] = None categories: Optional[list[str]] = None subcategories: Optional[list[str]] = None @@ -50,7 +50,6 @@ class PlaylistParams(BaseModel): gtl_l3: Optional[list[str]] = None gtl_l4: Optional[list[str]] = None submixing_feature_dict: Optional[dict] = None - offers: Optional[list[str]] = None @field_validator("start_date", "end_date", mode="before") def parse_datetime(cls, value, info: ValidationInfo) -> datetime: @@ -61,6 +60,20 @@ def parse_datetime(cls, value, info: ValidationInfo) -> datetime: raise ValueError("Datetime format not recognized.") return None + @model_validator(mode="before") + def validate_is_restrained(cls, values): + is_restrained = values.get("is_restrained") + if is_restrained is None: + values["is_restrained"] = True + return values + + @model_validator(mode="before") + def validate_offers(cls, values): + input_offers = values.get("input_offers") + if input_offers is None: + values["input_offers"] = [] + return values + def playlist_type(self): if self.categories and len(self.categories) > 1: return "multipleCategoriesRecommendations" @@ -72,15 +85,16 @@ def playlist_type(self): return "singleSubCategoryRecommendations" return "GenericRecommendations" - async def parse_offers(self, db: AsyncSession) -> list[Offer]: - if self.offers and len(self.offers) > 0: - offer_list = [] - for offer_id in self.offers: - offer = await Offer().get_offer_characteristics(db, offer_id) - offer_list.append(offer) - self.offers = offer_list + def add_offer(self, offer_id: str) -> None: + if self.input_offers is None: + self.input_offers = [] + self.input_offers.append(offer_id) + + def add_model_endpoint(self, model_endpoint: str) -> None: + if model_endpoint is not None: + self.model_endpoint = model_endpoint - async def to_dict(self): + async def to_dict(self) -> dict: return self.dict() @@ -100,7 +114,3 @@ def playlist_type(self): if len(self.subcategories) == 1: return "sameSubCategorySimilarOffers" return "GenericSimilarOffers" - - -class PostSimilarOfferPlaylistParams(PlaylistParams): - user_id: str = None diff --git a/apps/recommendation/api/src/huggy/utils/mixing.py b/apps/recommendation/api/src/huggy/utils/mixing.py index 46477d84..b4ee4d41 100644 --- a/apps/recommendation/api/src/huggy/utils/mixing.py +++ b/apps/recommendation/api/src/huggy/utils/mixing.py @@ -7,7 +7,7 @@ def order_offers_by_score_and_diversify_features( - offers: list[RankedOffer], + scored_offers: list[RankedOffer], score_column="offer_rank", score_order_ascending=True, shuffle_recommendation=None, @@ -17,7 +17,7 @@ def order_offers_by_score_and_diversify_features( submixing_feature_dict=None, ) -> list[RankedOffer]: """ - Group offers by feature. + Group scored_offers by feature. Order offer groups by decreasing number of offers in each group and decreasing maximal score. Order each offers within a group by increasing score. Sort offers by taking the last offer of each group (maximum score), by decreasing size of group. @@ -28,11 +28,11 @@ def order_offers_by_score_and_diversify_features( diversified_offers = [] if shuffle_recommendation: - for recommendation in offers: + for recommendation in scored_offers: setattr(recommendation, score_column, random.random()) offers_by_feature = _get_offers_grouped_by_feature( - offers, feature + scored_offers, feature ) # here we group offers by cat (and score) to_submixed_data = {} @@ -90,11 +90,11 @@ def order_offers_by_score_and_diversify_features( def _get_offers_grouped_by_feature( - offers: list[RankedOffer], feature="subcategory_id" + scored_offers: list[RankedOffer], feature="subcategory_id" ) -> dict: offers_by_feature = {} product_ids = set() - for offer in offers: + for offer in scored_offers: offer_feature = getattr(offer, feature) offer_product_id = offer.item_id if offer_feature in offers_by_feature: # Here we filter subcat diff --git a/apps/recommendation/api/src/huggy/views/home.py b/apps/recommendation/api/src/huggy/views/home.py index 2101d6fd..3f6ee441 100644 --- a/apps/recommendation/api/src/huggy/views/home.py +++ b/apps/recommendation/api/src/huggy/views/home.py @@ -3,11 +3,10 @@ import huggy.schemas.playlist_params as p from fastapi import APIRouter, Depends from fastapi.encoders import jsonable_encoder -from huggy.core.model_engine.recommendation import Recommendation -from huggy.core.model_engine.similar_offer import SimilarOffer +from huggy.core.model_engine.factory import ModelEngineFactory, ModelEngineOut +from huggy.crud.offer import Offer from huggy.crud.user import UserContextDB from huggy.database.session import get_db -from huggy.utils.cloud_logging import logger from huggy.views.common import check_token, get_call_id, setup_trace from sqlalchemy.ext.asyncio import AsyncSession @@ -28,34 +27,28 @@ async def playlist_recommendation( db: AsyncSession = Depends(get_db), call_id: str = Depends(get_call_id), ): - user = await UserContextDB().get_user_context(db, user_id, latitude, longitude) - if modelEndpoint is not None: - playlist_params.model_endpoint = modelEndpoint - if playlist_params.is_restrained is None: - playlist_params.is_restrained = True + # legacy: force modelEndpoint input + playlist_params.add_model_endpoint(modelEndpoint) - if playlist_params.offers: - await playlist_params.parse_offers(db) - logger.info(f"playlist_recommendation: {playlist_params.offers}") - scoring = SimilarOffer( - user, - playlist_params, - call_id=call_id, - context="hybrid_recommendation", - ) - else: - scoring = Recommendation( - user, params_in=playlist_params, call_id=call_id, context="recommendation" - ) + user = await UserContextDB().get_user_context(db, user_id, latitude, longitude) + input_offers = await Offer.parse_offer_list(db, playlist_params.input_offers) - user_recommendations = await scoring.get_scoring(db) + model_engine_out: ModelEngineOut = await ModelEngineFactory.handle_prediction( + db, + user=user, + params_in=playlist_params, + call_id=call_id, + context="recommendation", + input_offers=input_offers, + use_fallback=True, + ) return jsonable_encoder( { - "playlist_recommended_offers": user_recommendations, + "playlist_recommended_offers": model_engine_out.results, "params": { - "reco_origin": scoring.reco_origin, - "model_origin": scoring.model_origin, + "reco_origin": model_engine_out.model.reco_origin, + "model_origin": model_engine_out.model.model_origin, "call_id": call_id, }, } diff --git a/apps/recommendation/api/src/huggy/views/offer.py b/apps/recommendation/api/src/huggy/views/offer.py index cdfe14e7..7afc41d3 100644 --- a/apps/recommendation/api/src/huggy/views/offer.py +++ b/apps/recommendation/api/src/huggy/views/offer.py @@ -3,8 +3,7 @@ import huggy.schemas.playlist_params as p from fastapi import APIRouter, Depends from fastapi.encoders import jsonable_encoder -from huggy.core.model_engine.recommendation import Recommendation -from huggy.core.model_engine.similar_offer import SimilarOffer +from huggy.core.model_engine.factory import ModelEngineFactory, ModelEngineOut from huggy.crud.offer import Offer from huggy.crud.user import UserContextDB from huggy.database.session import get_db @@ -22,36 +21,30 @@ async def __similar_offers( longitude: t.Optional[float], call_id: str, ): - offer_recommendations = [] + # legacy: include main offer_id in the list of offers + playlist_params.add_offer(offer_id) + user = await UserContextDB().get_user_context( db, playlist_params.user_id, latitude, longitude ) + input_offers = await Offer().parse_offer_list(db, playlist_params.input_offers) - offer = await Offer().get_offer_characteristics(db, offer_id) - - scoring = SimilarOffer( - user, playlist_params, call_id=call_id, context="similar_offer", offer=offer + model_engine_out: ModelEngineOut = await ModelEngineFactory.handle_prediction( + db, + user=user, + params_in=playlist_params, + call_id=call_id, + context="similar_offer", + input_offers=input_offers, + use_fallback=True, ) - if not offer.is_sensitive: - offer_recommendations = await scoring.get_scoring(db) - - # fallback to reco - if len(offer_recommendations) == 0: - scoring = Recommendation( - user, - params_in=playlist_params, - call_id=call_id, - context="recommendation_fallback", - ) - offer_recommendations = await scoring.get_scoring(db) - return jsonable_encoder( { - "results": offer_recommendations, + "results": model_engine_out.results, "params": { - "reco_origin": scoring.reco_origin, - "model_origin": scoring.model_origin, + "reco_origin": model_engine_out.model.reco_origin, + "model_origin": model_engine_out.model.model_origin, "call_id": call_id, }, } @@ -64,7 +57,7 @@ async def __similar_offers( ) async def post_similar_offers( offer_id: str, - playlist_params: p.PostSimilarOfferPlaylistParams, + playlist_params: p.PlaylistParams, token: t.Optional[str] = None, latitude: t.Optional[float] = None, longitude: t.Optional[float] = None, diff --git a/pyproject.toml b/pyproject.toml index 825b58be..4d6c8cbd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ lint.ignore = [ convention = "google" [tool.ruff.lint.pycodestyle] -max-doc-length = 100 +max-doc-length = 150 [tool.ruff.lint.flake8-tidy-imports] ban-relative-imports = "all"