Skip to content

Commit

Permalink
feat: finish up trakt integration (#333)
Browse files Browse the repository at this point in the history
Co-authored-by: Spoked <Spoked@localhost>
  • Loading branch information
dreulavelle and Spoked authored Jun 4, 2024
1 parent 25cd6b2 commit 5ca02a4
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime

import pydantic
import requests
from fastapi import APIRouter, HTTPException, Request
from program.indexers.trakt import get_imdbid_from_tmdb
from program.settings.manager import settings_manager
Expand Down
5 changes: 4 additions & 1 deletion backend/program/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class HashCache:
"""A class for caching hashes with additional metadata and a time-to-live (TTL) mechanism."""

def __init__(self, ttl: int = 420, maxsize: int = 2000):
def __init__(self, ttl: int = 900, maxsize: int = 10000):
"""
Initializes the HashCache with a specified TTL and maximum size.
Expand Down Expand Up @@ -78,3 +78,6 @@ def clear_cache(self) -> None:
def _get_cache_entry(self, infohash: str) -> dict:
"""Helper function to get a cache entry or create a new one if it doesn't exist."""
return self.cache.get(infohash, {"blacklisted": False, "downloaded": False, "added_at": datetime.now()})


hash_cache = HashCache()
193 changes: 124 additions & 69 deletions backend/program/content/trakt.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Trakt content module"""
import time
from types import SimpleNamespace
from urllib.parse import urlparse

from urllib.parse import urlencode, urlparse
from utils.request import RateLimiter, post
import regex

from program.media.item import MediaItem, Movie, Show
Expand All @@ -27,8 +27,8 @@ def __init__(self):
if not self.initialized:
return
self.next_run_time = 0
self.items_already_seen = set() # Use a set for faster lookups
self.items_to_yield = {}
self.items_already_seen = set()
self.missing()
logger.success("Trakt initialized!")

def validate(self) -> bool:
Expand All @@ -51,79 +51,84 @@ def validate(self) -> bool:
return False
return True

def missing(self):
"""Log missing items from Trakt"""
if not self.settings.watchlist:
logger.log("TRAKT", "No watchlist configured.")
if not self.settings.user_lists:
logger.log("TRAKT", "No user lists configured.")
if not self.settings.fetch_trending:
logger.log("TRAKT", "Trending fetching is disabled.")
if not self.settings.fetch_popular:
logger.log("TRAKT", "Popular fetching is disabled.")

def run(self):
"""Fetch media from Trakt and yield Movie or Show instances."""
"""Fetch media from Trakt and yield Movie, Show, or MediaItem instances."""
current_time = time.time()
if current_time < self.next_run_time:
return

self.next_run_time = current_time + self.settings.update_interval
watchlist_ids = self._get_watchlist(self.settings.watchlist)
collection_ids = self._get_collections(self.settings.collections)
user_list_ids = self._get_list(self.settings.user_lists)
trending_ids = self._get_trending_items() if self.settings.fetch_trending else []
popular_ids = self._get_popular_items() if self.settings.fetch_popular else []

# Combine all IMDb IDs and types
all_items = watchlist_ids + collection_ids + user_list_ids + trending_ids + popular_ids
all_ids = set(all_items)
logger.log("TRAKT", f"Fetched {len(all_ids)} unique IMDb IDs from Trakt.")
all_items = {
"Watchlist": watchlist_ids,
"User Lists": user_list_ids,
"Trending": trending_ids,
"Popular": popular_ids
}

for imdb_id, item_type in all_ids:
if imdb_id in self.items_already_seen or not imdb_id:
continue
self.items_already_seen.add(imdb_id)

if item_type == "movie":
media_item = Movie({
"imdb_id": imdb_id,
"requested_by": self.key
})
else:
media_item = Show({
"imdb_id": imdb_id,
"requested_by": self.key
})

yield media_item
self.items_to_yield.clear()

def _get_watchlist(self, watchlist_items: list) -> list:
total_new_items = 0

for source, items in all_items.items():
new_items_count = 0
for imdb_id, item_type in items:
if imdb_id in self.items_already_seen or not imdb_id:
continue
self.items_already_seen.add(imdb_id)
new_items_count += 1

if source == "Popular":
media_item = MediaItem({
"imdb_id": imdb_id,
"requested_by": self.key
})
elif item_type == "movie":
media_item = Movie({
"imdb_id": imdb_id,
"requested_by": self.key
})
else:
media_item = Show({
"imdb_id": imdb_id,
"requested_by": self.key
})

yield media_item

if new_items_count > 0:
logger.log("TRAKT", f"New items fetched from {source}: {new_items_count}")
total_new_items += new_items_count
if total_new_items > 0:
logger.log("TRAKT", f"Total new items fetched: {total_new_items}")

def _get_watchlist(self, watchlist_users: list) -> list:
"""Get IMDb IDs from Trakt watchlist"""
if not watchlist_items:
logger.warning("No watchlist items configured.")
if not watchlist_users:
return []
imdb_ids = []
for url in watchlist_items:
match = regex.match(r'https://trakt.tv/users/([^/]+)/watchlist', url)
if not match:
logger.error(f"Invalid watchlist URL: {url}")
continue
user = match.group(1)
for user in watchlist_users:
items = get_watchlist_items(self.api_url, self.headers, user)
imdb_ids.extend(self._extract_imdb_ids(items))
return imdb_ids

def _get_collections(self, collection_items: list) -> list:
"""Get IMDb IDs from Trakt collections"""
if not collection_items:
logger.warning("No collection items configured.")
return []
imdb_ids = []
for url in collection_items:
match = regex.match(r'https://trakt.tv/users/([^/]+)/collection', url)
if not match:
logger.error(f"Invalid collection URL: {url}")
continue
user = match.group(1)
items = get_user_list(self.api_url, self.headers, user, "collection")
imdb_ids.extend(self._extract_imdb_ids(items))
return imdb_ids

def _get_list(self, list_items: list) -> list:
"""Get IMDb IDs from Trakt user list"""
if not list_items:
logger.warning("No user list items configured.")
if not list_items or not any(list_items):
return []
imdb_ids = []
for url in list_items:
Expand All @@ -134,7 +139,15 @@ def _get_list(self, list_items: list) -> list:
user, list_name = match.groups()
list_name = urlparse(url).path.split('/')[-1]
items = get_user_list(self.api_url, self.headers, user, list_name)
imdb_ids.extend(self._extract_imdb_ids(items))
for item in items:
if hasattr(item, "movie"):
imdb_id = getattr(item.movie.ids, "imdb", None)
if imdb_id:
imdb_ids.append((imdb_id, "movie"))
elif hasattr(item, "show"):
imdb_id = getattr(item.show.ids, "imdb", None)
if imdb_id:
imdb_ids.append((imdb_id, "show"))
return imdb_ids

def _get_trending_items(self) -> list:
Expand All @@ -147,37 +160,81 @@ def _get_popular_items(self) -> list:
"""Get IMDb IDs from Trakt popular items"""
popular_movies = get_popular_items(self.api_url, self.headers, "movies", self.settings.popular_count)
popular_shows = get_popular_items(self.api_url, self.headers, "shows", self.settings.popular_count)
return self._extract_imdb_ids(popular_movies + popular_shows)
return self._extract_imdb_ids_with_none_type(popular_movies + popular_shows)

def _extract_imdb_ids(self, items: list) -> list:
"""Extract IMDb IDs and types from a list of items"""
imdb_ids = []
for item in items:
show = getattr(item, "show", None)
if show:
ids = getattr(show, "ids", None)
if hasattr(item, "show"):
ids = getattr(item.show, "ids", None)
if ids:
imdb_id = getattr(ids, "imdb", None)
if imdb_id:
imdb_ids.append((imdb_id, "show"))
else:
ids = getattr(item, "ids", None)
elif hasattr(item, "movie"):
ids = getattr(item.movie, "ids", None)
if ids:
imdb_id = getattr(ids, "imdb", None)
if imdb_id:
imdb_ids.append((imdb_id, "movie"))
return imdb_ids

def _extract_imdb_ids_with_none_type(self, items: list) -> list:
"""Extract IMDb IDs from a list of items, returning None for type"""
imdb_ids = []
for item in items:
ids = getattr(item, "ids", None)
if ids:
imdb_id = getattr(ids, "imdb", None)
if imdb_id:
imdb_ids.append((imdb_id, None))
return imdb_ids

def perform_oauth_flow(self) -> str:
"""Initiate the OAuth flow and return the authorization URL."""
params = {
"response_type": "code",
"client_id": self.settings.oauth_client_id,
"redirect_uri": self.settings.oauth_redirect_uri,
}
auth_url = f"{self.api_url}/oauth/authorize?{urlencode(params)}"
return auth_url

def handle_oauth_callback(self, code: str) -> bool:
"""Handle the OAuth callback and exchange the code for an access token."""
token_url = f"{self.api_url}/oauth/token"
payload = {
"code": code,
"client_id": self.settings.oauth_client_id,
"client_secret": self.settings.oauth_client_secret,
"redirect_uri": self.settings.oauth_redirect_uri,
"grant_type": "authorization_code",
}
response = post(token_url, data=payload, additional_headers=self.headers)
if response.is_ok:
token_data = response.data
self.settings.access_token = token_data.get("access_token")
self.settings.refresh_token = token_data.get("refresh_token")
settings_manager.save() # Save the tokens to settings
return True
else:
logger.error(f"Failed to obtain OAuth token: {response.status_code}")
return False

## API functions for Trakt

rate_limiter = RateLimiter(max_calls=1000, period=300)

def _fetch_data(url, headers, params):
"""Fetch paginated data from Trakt API."""
"""Fetch paginated data from Trakt API with rate limiting."""
all_data = []
page = 1

while True:
try:
response = get(url, params={**params, "page": page}, additional_headers=headers)
with rate_limiter:
response = get(url, params={**params, "page": page}, additional_headers=headers)
if response.is_ok:
data = response.data
if not data:
Expand All @@ -186,6 +243,9 @@ def _fetch_data(url, headers, params):
if len(data) <= params["limit"]:
break
page += 1
elif response.status_code == 429:
logger.warning("Rate limit exceeded. Retrying after rate limit period.")
rate_limiter.limit_hit()
else:
logger.error(f"Failed to fetch data: {response.status_code}")
break
Expand All @@ -209,11 +269,6 @@ def get_liked_lists(api_url, headers, limit=10):
url = f"{api_url}/users/likes/lists"
return _fetch_data(url, headers, {"limit": limit})

def get_recommendations(api_url, headers, media_type, limit=10):
"""Get recommendations from Trakt with pagination support."""
url = f"{api_url}/recommendations/{media_type}"
return _fetch_data(url, headers, {"limit": limit})

def get_trending_items(api_url, headers, media_type, limit=10):
"""Get trending items from Trakt with pagination support."""
url = f"{api_url}/{media_type}/trending"
Expand Down
6 changes: 3 additions & 3 deletions backend/program/downloaders/realdebrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ def _is_wanted_movie(self, container: dict, item: Movie) -> bool:
)

# lets create a regex pattern to remove deleted scenes and samples and trailers from the filenames list
unwanted_regex = regex.compile(r"\b(?:deleted.scene|sample|trailer|featurette)\b", regex.IGNORECASE)
filenames = [file for file in filenames if not unwanted_regex.search(file["filename"])]
# unwanted_regex = regex.compile(r"\b(?:deleted.scene|sample|trailer|featurette)\b", regex.IGNORECASE)
# filenames = [file for file in filenames if not unwanted_regex.search(file["filename"])]

if not filenames:
return False

Expand Down
20 changes: 12 additions & 8 deletions backend/program/libraries/plex.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,20 @@ def run(self):

# Gather all results
for future in concurrent.futures.as_completed(futures):
chunk_results = future.result()
items.extend(chunk_results)
with self.lock:
self.last_fetch_times[section.key] = datetime.now()
processed_sections.add(section.key)
try:
chunk_results = future.result(timeout=2) # Add timeout to speed up shutdown
items.extend(chunk_results)
with self.lock:
self.last_fetch_times[section.key] = datetime.now()
processed_sections.add(section.key)
except concurrent.futures.TimeoutError:
logger.warning("Timeout while waiting for chunk processing result.")
except Exception as e:
logger.exception(f"Failed to get chunk result: {e}")

if not processed_sections:
logger.error("Failed to process any sections. Check your library_path settings.")

logger.log("PLEX", f"Processed {len(items)} items.")
return []

return items
except Exception as e:
logger.exception(f"Unexpected error occurred: {e}")
Expand Down
8 changes: 4 additions & 4 deletions backend/program/media/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def upsert(self, item: MediaItem) -> None:
finally:
self.lock.release_write()

def _merge_items(self, existing_item, new_item):
def _merge_items(self, existing_item: MediaItem, new_item: MediaItem) -> None:
"""Merge new item data into existing item without losing existing state."""
if existing_item.state == States.Completed and new_item.state != States.Completed:
return
Expand Down Expand Up @@ -215,7 +215,7 @@ def get_incomplete_items(self) -> Dict[ItemId, MediaItem]:
finally:
self.lock.release_read()

def save(self, filename):
def save(self, filename: str) -> None:
if not self._items:
return

Expand All @@ -241,12 +241,12 @@ def save(self, filename):
except OSError as remove_error:
logger.error(f"Failed to remove temporary file: {remove_error}")

def load(self, filename):
def load(self, filename: str) -> None:
try:
with open(filename, "rb") as file:
from_disk: MediaItemContainer = dill.load(file) # noqa: S301
except FileNotFoundError:
logger.error(f"Cannot find cached media data at {filename}")
logger.error(f"Unable to find the media library file. Starting fresh.")
return
except (EOFError, dill.UnpicklingError) as e:
logger.error(f"Failed to unpickle media data: {e}. Starting fresh.")
Expand Down
Loading

0 comments on commit 5ca02a4

Please sign in to comment.