From 2cbc0a6250f301c37bf68a29457d66ccb42b4b33 Mon Sep 17 00:00:00 2001 From: Henrijs Date: Sat, 1 Jun 2024 20:32:16 +0300 Subject: [PATCH 1/2] Implement Trakt lists as sensors --- README.md | 24 ++++++ custom_components/trakt_tv/apis/trakt.py | 83 ++++++++++++++++++++- custom_components/trakt_tv/configuration.py | 6 ++ custom_components/trakt_tv/models/kind.py | 6 ++ custom_components/trakt_tv/models/media.py | 11 ++- custom_components/trakt_tv/schema.py | 14 +++- custom_components/trakt_tv/sensor.py | 26 ++++++- 7 files changed, 166 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7b9d479..ad09fbb 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,18 @@ trakt_tv: - friends only_upcoming: max_medias: 5 + lists: + - friendly_name: "Christmas Watchlist" + private_list: True # Set to True if the list is your own private list + list_id: "christmas-watchlist" # Can be the slug, because it's a private list + max_medias: 5 + - friendly_name: "2024 Academy Awards" + list_id: 26885014 + max_medias: 5 + - friendly_name: "Star Trek Movies" + list_id: 967660 + media_type: "movie" # Filters the list to only show movies + max_medias: 5 ``` #### Integration Settings @@ -175,6 +187,18 @@ There are three parameters for each sensor: - `exclude` should be a list of shows you'd like to exclude, since it's based on your watched history. To find keys to put there, go on trakt.tv, search for a show, click on it, notice the url slug, copy/paste it. So, if I want to hide "Friends", I'll do the steps mentioned above, then land on https://trakt.tv/shows/friends, I'll just have to copy/paste the last part, `friends`, that's it You can also use the Trakt.tv "hidden" function to hide a show from [your calendar](https://trakt.tv/calendars/my/shows) or the [progress page](https://trakt.tv/users//progress) +##### Lists sensor + +Lists sensor allows you to fetch both public and private lists from Trakt, each list will be a sensor. The items in the list will be sorted by their rank on Trakt. + +There are four parameters for each sensor: + + - `friendly_name` (MANDATORY) should be a string for the name of the sensor. This has to be unique for each list. + - `list_id` (MANDATORY) should be the Trakt list ID. For public lists the ID has to be numeric, for private lists the ID can be either the numeric ID or the slug from the URL. To get the numeric ID of a public list, copy the link address of the list before opening it. This will give you a URL like `https://trakt.tv/lists/2142753`. The `2142753` part is the numeric ID you need to use. + - `private_list` (OPTIONAL) has to be set to `true` if using your own private list. Default is `false` + - `media_type` (OPTIONAL) can be used to filter the media type within the list, possible values are `show`, `movie`, `episode`. Default is blank, which will show all media types + - `max_medias` (OPTIONAL) should be a positive number for how many items to grab. Default is `3` + #### Example For example, adding only the following to `configuration.yaml` will create two sensors. diff --git a/custom_components/trakt_tv/apis/trakt.py b/custom_components/trakt_tv/apis/trakt.py index 60f5560..77bf450 100644 --- a/custom_components/trakt_tv/apis/trakt.py +++ b/custom_components/trakt_tv/apis/trakt.py @@ -17,7 +17,7 @@ from ..const import API_HOST, DOMAIN from ..exception import TraktException from ..models.kind import BASIC_KINDS, UPCOMING_KINDS, TraktKind -from ..models.media import Medias +from ..models.media import Medias, Show, Movie, Episode from ..utils import cache_insert, cache_retrieve, deserialize_json LOGGER = logging.getLogger(__name__) @@ -390,6 +390,79 @@ async def fetch_recommendations(self, configured_kinds: list[TraktKind]): return res + async def fetch_list(self, path: str, list_id: str, user_path: bool, max_items: int, media_type: str): + """ Fetch the list, if user_path is True, the list will be fetched from the user end-point """ + # Add the user path if needed + if user_path: + path = f"users/me/{path}" + + # Replace the list_id in the path + path = path.replace("{list_id}", list_id) + + # Add media type filter to the path + if media_type: + # Check if the media type is supported + if Medias.trakt_to_class(media_type): + path = f"{path}/{media_type}" + else: + LOGGER.warn(f"Filtering list on {media_type} is not supported") + return None + + # Add the limit to the path + path = f"{path}?limit={max_items}" + + return await self.request( + "get", path + ) + + async def fetch_lists(self, configured_kind: TraktKind): + + # Get config for all lists + configuration = Configuration(data=self.hass.data) + lists = configuration.get_sensor_config(configured_kind.value.identifier) + + # Fetch the lists + data = await gather( + *[ + self.fetch_list( + configured_kind.value.path, + list_config['list_id'], + list_config['private_list'], + list_config['max_medias'], + list_config['media_type'] + ) + for list_config in lists + ] + ) + + # Process the results + language = configuration.get_language() + + res = {} + for list_config, raw_medias in zip(lists, data): + if raw_medias is not None: + medias = [] + for media in raw_medias: + # Get model based on media type in data + media_type = media.get('type') + model = Medias.trakt_to_class(media_type) + + if model: + medias.append(model.from_trakt(media)) + else: + LOGGER.warn(f"Media type {media_type} in {list_config['friendly_name']} is not supported") + + if not medias: + LOGGER.warn(f"No entries found for list {list_config['friendly_name']}") + continue + + await gather( + *[media.get_more_information(language) for media in medias] + ) + res[list_config['friendly_name']] = Medias(medias) + + return {configured_kind: res} + async def retrieve_data(self): async with timeout(1800): configuration = Configuration(data=self.hass.data) @@ -420,6 +493,9 @@ async def retrieve_data(self): configured_kind=TraktKind.NEXT_TO_WATCH_UPCOMING, only_upcoming=True, ), + "lists": lambda: self.fetch_lists( + configured_kind=TraktKind.LIST, + ), } """First, let's configure which sensors we need depending on configuration""" @@ -443,6 +519,11 @@ async def retrieve_data(self): sources.append(sub_source) coroutine_sources_data.append(source_function.get(sub_source)()) + """Finally let's add the lists sensors if needed""" + if configuration.source_exists("lists"): + sources.append("lists") + coroutine_sources_data.append(source_function.get("lists")()) + sources_data = await gather(*coroutine_sources_data) return { diff --git a/custom_components/trakt_tv/configuration.py b/custom_components/trakt_tv/configuration.py index 72a897b..9db2fdf 100644 --- a/custom_components/trakt_tv/configuration.py +++ b/custom_components/trakt_tv/configuration.py @@ -78,6 +78,12 @@ def recommendation_identifier_exists(self, identifier: str) -> bool: def get_recommendation_max_medias(self, identifier: str) -> int: return self.get_max_medias(identifier, "recommendation") + def get_sensor_config(self, identifier: str) -> list: + try: + return self.conf["sensors"][identifier] + except KeyError: + return [] + def source_exists(self, source: str) -> bool: try: self.conf["sensors"][source] diff --git a/custom_components/trakt_tv/models/kind.py b/custom_components/trakt_tv/models/kind.py index b098dba..876f6db 100644 --- a/custom_components/trakt_tv/models/kind.py +++ b/custom_components/trakt_tv/models/kind.py @@ -11,6 +11,11 @@ class CalendarInformation: path: str model: Media +@dataclass +class ListInformation: + identifier: str + name: str + path: str class TraktKind(Enum): SHOW = CalendarInformation("show", "Shows", "shows", Show) @@ -23,6 +28,7 @@ class TraktKind(Enum): NEXT_TO_WATCH_UPCOMING = CalendarInformation( "only_upcoming", "Only Upcoming", "shows", Show ) + LIST = ListInformation("lists", "", "lists/{list_id}/items") @classmethod def from_string(cls, string): diff --git a/custom_components/trakt_tv/models/media.py b/custom_components/trakt_tv/models/media.py index ae866ad..d189a01 100644 --- a/custom_components/trakt_tv/models/media.py +++ b/custom_components/trakt_tv/models/media.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod, abstractstaticmethod from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Type from custom_components.trakt_tv.apis.tmdb import get_movie_data, get_show_data @@ -295,3 +295,12 @@ def to_homeassistant(self) -> Dict[str, Any]: medias = sorted(self.items, key=lambda media: media.released) medias = [media.to_homeassistant() for media in medias] return [first_item] + medias + + @staticmethod + def trakt_to_class(trakt_type: str) -> Type[Show] | Type[Movie] | Type[Episode] | None: + type_to_class = { + 'show': Show, + 'episode': Show, + 'movie': Movie + } + return type_to_class.get(trakt_type, None) \ No newline at end of file diff --git a/custom_components/trakt_tv/schema.py b/custom_components/trakt_tv/schema.py index 066de18..6f60abb 100644 --- a/custom_components/trakt_tv/schema.py +++ b/custom_components/trakt_tv/schema.py @@ -4,7 +4,7 @@ import pytz from dateutil.tz import tzlocal from homeassistant.helpers import config_validation as cv -from voluptuous import ALLOW_EXTRA, PREVENT_EXTRA, In, Required, Schema +from voluptuous import ALLOW_EXTRA, PREVENT_EXTRA, In, Required, Schema, All from .const import DOMAIN, LANGUAGE_CODES from .models.kind import BASIC_KINDS, NEXT_TO_WATCH_KINDS, TraktKind @@ -41,6 +41,7 @@ def sensors_schema() -> Dict[str, Any]: "all_upcoming": upcoming_schema(), "next_to_watch": next_to_watch_schema(), "recommendation": recommendation_schema(), + "lists": All([lists_schema()]), } @@ -75,5 +76,16 @@ def recommendation_schema() -> Dict[str, Any]: return subschemas +def lists_schema() -> dict[Required, Any]: + schema = { + Required("list_id"): cv.string, + Required("friendly_name"): cv.string, + Required("max_medias", default=3): cv.positive_int, + Required("private_list", default=False): cv.boolean, + Required("media_type", default=""): cv.string, + } + + return schema + configuration_schema = dictionary_to_schema(domain_schema(), extra=ALLOW_EXTRA) diff --git a/custom_components/trakt_tv/sensor.py b/custom_components/trakt_tv/sensor.py index 7cb21d9..3c23c77 100644 --- a/custom_components/trakt_tv/sensor.py +++ b/custom_components/trakt_tv/sensor.py @@ -69,6 +69,25 @@ async def async_setup_entry(hass, config_entry, async_add_entities): ) sensors.append(sensor) + for trakt_kind in TraktKind: + if trakt_kind != TraktKind.LIST: + continue + + identifier = trakt_kind.value.identifier + + if configuration.source_exists(identifier): + for list_entry in configuration.get_sensor_config(identifier): + sensor = TraktSensor( + hass=hass, + config_entry=list_entry, + coordinator=coordinator, + trakt_kind=trakt_kind, + source=identifier, + prefix=f"Trakt List {list_entry['friendly_name']}", + mdi_icon="mdi:view-list", + ) + sensors.append(sensor) + async_add_entities(sensors) @@ -104,7 +123,10 @@ def name(self): @property def medias(self): if self.coordinator.data: - return self.coordinator.data.get(self.source, {}).get(self.trakt_kind, None) + medias = self.coordinator.data.get(self.source, {}).get(self.trakt_kind, None) + if self.trakt_kind == TraktKind.LIST: + return medias.get(self.config_entry['friendly_name'], None) + return medias return None @property @@ -119,6 +141,8 @@ def configuration(self): @property def data(self): if self.medias: + if self.trakt_kind == TraktKind.LIST: + return self.medias.to_homeassistant() max_medias = self.configuration["max_medias"] return self.medias.to_homeassistant()[0 : max_medias + 1] return [] From 4ef47c2649d79868fc5d780d3f82db86362b6acc Mon Sep 17 00:00:00 2001 From: Henrijs Date: Sat, 1 Jun 2024 21:31:07 +0300 Subject: [PATCH 2/2] Fix linting --- custom_components/trakt_tv/apis/trakt.py | 32 ++++++++++++---------- custom_components/trakt_tv/models/kind.py | 2 ++ custom_components/trakt_tv/models/media.py | 12 ++++---- custom_components/trakt_tv/schema.py | 3 +- custom_components/trakt_tv/sensor.py | 6 ++-- 5 files changed, 31 insertions(+), 24 deletions(-) diff --git a/custom_components/trakt_tv/apis/trakt.py b/custom_components/trakt_tv/apis/trakt.py index 77bf450..6a77f35 100644 --- a/custom_components/trakt_tv/apis/trakt.py +++ b/custom_components/trakt_tv/apis/trakt.py @@ -17,7 +17,7 @@ from ..const import API_HOST, DOMAIN from ..exception import TraktException from ..models.kind import BASIC_KINDS, UPCOMING_KINDS, TraktKind -from ..models.media import Medias, Show, Movie, Episode +from ..models.media import Episode, Medias, Movie, Show from ..utils import cache_insert, cache_retrieve, deserialize_json LOGGER = logging.getLogger(__name__) @@ -390,8 +390,10 @@ async def fetch_recommendations(self, configured_kinds: list[TraktKind]): return res - async def fetch_list(self, path: str, list_id: str, user_path: bool, max_items: int, media_type: str): - """ Fetch the list, if user_path is True, the list will be fetched from the user end-point """ + async def fetch_list( + self, path: str, list_id: str, user_path: bool, max_items: int, media_type: str + ): + """Fetch the list, if user_path is True, the list will be fetched from the user end-point""" # Add the user path if needed if user_path: path = f"users/me/{path}" @@ -411,9 +413,7 @@ async def fetch_list(self, path: str, list_id: str, user_path: bool, max_items: # Add the limit to the path path = f"{path}?limit={max_items}" - return await self.request( - "get", path - ) + return await self.request("get", path) async def fetch_lists(self, configured_kind: TraktKind): @@ -426,10 +426,10 @@ async def fetch_lists(self, configured_kind: TraktKind): *[ self.fetch_list( configured_kind.value.path, - list_config['list_id'], - list_config['private_list'], - list_config['max_medias'], - list_config['media_type'] + list_config["list_id"], + list_config["private_list"], + list_config["max_medias"], + list_config["media_type"], ) for list_config in lists ] @@ -444,22 +444,26 @@ async def fetch_lists(self, configured_kind: TraktKind): medias = [] for media in raw_medias: # Get model based on media type in data - media_type = media.get('type') + media_type = media.get("type") model = Medias.trakt_to_class(media_type) if model: medias.append(model.from_trakt(media)) else: - LOGGER.warn(f"Media type {media_type} in {list_config['friendly_name']} is not supported") + LOGGER.warn( + f"Media type {media_type} in {list_config['friendly_name']} is not supported" + ) if not medias: - LOGGER.warn(f"No entries found for list {list_config['friendly_name']}") + LOGGER.warn( + f"No entries found for list {list_config['friendly_name']}" + ) continue await gather( *[media.get_more_information(language) for media in medias] ) - res[list_config['friendly_name']] = Medias(medias) + res[list_config["friendly_name"]] = Medias(medias) return {configured_kind: res} diff --git a/custom_components/trakt_tv/models/kind.py b/custom_components/trakt_tv/models/kind.py index 876f6db..8da74c2 100644 --- a/custom_components/trakt_tv/models/kind.py +++ b/custom_components/trakt_tv/models/kind.py @@ -11,12 +11,14 @@ class CalendarInformation: path: str model: Media + @dataclass class ListInformation: identifier: str name: str path: str + class TraktKind(Enum): SHOW = CalendarInformation("show", "Shows", "shows", Show) NEW_SHOW = CalendarInformation("new_show", "New Shows", "shows/new", Show) diff --git a/custom_components/trakt_tv/models/media.py b/custom_components/trakt_tv/models/media.py index d189a01..594486c 100644 --- a/custom_components/trakt_tv/models/media.py +++ b/custom_components/trakt_tv/models/media.py @@ -297,10 +297,8 @@ def to_homeassistant(self) -> Dict[str, Any]: return [first_item] + medias @staticmethod - def trakt_to_class(trakt_type: str) -> Type[Show] | Type[Movie] | Type[Episode] | None: - type_to_class = { - 'show': Show, - 'episode': Show, - 'movie': Movie - } - return type_to_class.get(trakt_type, None) \ No newline at end of file + def trakt_to_class( + trakt_type: str, + ) -> Type[Show] | Type[Movie] | Type[Episode] | None: + type_to_class = {"show": Show, "episode": Show, "movie": Movie} + return type_to_class.get(trakt_type, None) diff --git a/custom_components/trakt_tv/schema.py b/custom_components/trakt_tv/schema.py index 6f60abb..0a3f814 100644 --- a/custom_components/trakt_tv/schema.py +++ b/custom_components/trakt_tv/schema.py @@ -4,7 +4,7 @@ import pytz from dateutil.tz import tzlocal from homeassistant.helpers import config_validation as cv -from voluptuous import ALLOW_EXTRA, PREVENT_EXTRA, In, Required, Schema, All +from voluptuous import ALLOW_EXTRA, PREVENT_EXTRA, All, In, Required, Schema from .const import DOMAIN, LANGUAGE_CODES from .models.kind import BASIC_KINDS, NEXT_TO_WATCH_KINDS, TraktKind @@ -76,6 +76,7 @@ def recommendation_schema() -> Dict[str, Any]: return subschemas + def lists_schema() -> dict[Required, Any]: schema = { Required("list_id"): cv.string, diff --git a/custom_components/trakt_tv/sensor.py b/custom_components/trakt_tv/sensor.py index 3c23c77..293786d 100644 --- a/custom_components/trakt_tv/sensor.py +++ b/custom_components/trakt_tv/sensor.py @@ -123,9 +123,11 @@ def name(self): @property def medias(self): if self.coordinator.data: - medias = self.coordinator.data.get(self.source, {}).get(self.trakt_kind, None) + medias = self.coordinator.data.get(self.source, {}).get( + self.trakt_kind, None + ) if self.trakt_kind == TraktKind.LIST: - return medias.get(self.config_entry['friendly_name'], None) + return medias.get(self.config_entry["friendly_name"], None) return medias return None