Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: downloaders rework #903

Merged
merged 13 commits into from
Nov 26, 2024
54 changes: 39 additions & 15 deletions src/program/services/downloaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .alldebrid import AllDebridDownloader
from .realdebrid import RealDebridDownloader
# from .torbox import TorBoxDownloader
from .torbox import TorBoxDownloader

class InvalidFileSizeException(Exception):
pass
Expand All @@ -30,7 +30,7 @@ def __init__(self):
self.services = {
RealDebridDownloader: RealDebridDownloader(),
AllDebridDownloader: AllDebridDownloader(),
# TorBoxDownloader: TorBoxDownloader()
TorBoxDownloader: TorBoxDownloader()
}
self.service = next(
(service for service in self.services.values() if service.initialized), None
Expand All @@ -48,25 +48,49 @@ def validate(self):

def run(self, item: MediaItem):
logger.debug(f"Running downloader for {item.log_string}")
for stream in item.streams:
download_result = None
try:
download_result = self.download_cached_stream(item, stream)
if download_result:
self.validate_filesize(item, download_result)
# for stream in item.streams:
# download_result = None
# try:
# download_result = self.download_cached_stream(item, stream)
# if download_result:
# self.validate_filesize(item, download_result)
# if not self.update_item_attributes(item, download_result):
# raise Exception("No matching files found!")
# break
# except Exception as e:
# if download_result and download_result.torrent_id:
# self.service.delete_torrent(download_result.torrent_id)
# logger.debug(f"Invalid stream: {stream.infohash} - reason: {e}")
# item.blacklist_stream(stream)

# Chunk streams into groups of 10
chunk_size = 10
for i in range(0, len(item.streams), chunk_size):
logger.debug(f"Processing chunk {i} to {i + chunk_size}")
chunk = item.streams[i:i + chunk_size]
instant_availability = self.get_instant_availability([stream.infohash for stream in chunk])
# Filter out streams that aren't cached
available_streams = [stream for stream in chunk if instant_availability.get(stream.infohash, None)]
if not available_streams:
continue
for stream in available_streams:
download_result = None
try:
download_result = self.download_cached_stream(item, stream, instant_availability[stream.infohash])
if download_result:
self.validate_filesize(item, download_result)
if not self.update_item_attributes(item, download_result):
raise Exception("No matching files found!")
break
except Exception as e:
if download_result and download_result.torrent_id:
self.service.delete_torrent(download_result.torrent_id)
logger.debug(f"Invalid stream: {stream.infohash} - reason: {e}")
item.blacklist_stream(stream)
except Exception as e:
if download_result and download_result.torrent_id:
self.service.delete_torrent(download_result.torrent_id)
logger.debug(f"Invalid stream: {stream.infohash} - reason: {e}")
item.blacklist_stream(stream)
yield item


def download_cached_stream(self, item: MediaItem, stream: Stream) -> DownloadCachedStreamResult:
cached_containers = self.get_instant_availability([stream.infohash]).get(stream.infohash, None)
def download_cached_stream(self, item: MediaItem, stream: Stream, cached_containers: list[dict]) -> DownloadCachedStreamResult:
if not cached_containers:
raise Exception("Not cached!")
the_container = cached_containers[0]
Expand Down
261 changes: 261 additions & 0 deletions src/program/services/downloaders/torbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,264 @@
# response_type=dict,
# )
# return response.data["data"]

import time
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Union

from loguru import logger
from pydantic import BaseModel
from requests import Session

from program.settings.manager import settings_manager
from program.utils.request import (
BaseRequestHandler,
HttpMethod,
ResponseType,
create_service_session,
get_rate_limit_params,
)

from .shared import VIDEO_EXTENSIONS, DownloaderBase, FileFinder, premium_days_left


class TBTorrentStatus(str, Enum):
"""Real-Debrid torrent status enumeration"""
MAGNET_ERROR = "magnet_error"
MAGNET_CONVERSION = "magnet_conversion"
WAITING_FILES = "waiting_files_selection"
DOWNLOADING = "downloading"
DOWNLOADED = "downloaded"
ERROR = "error"
SEEDING = "seeding"
DEAD = "dead"
UPLOADING = "uploading"
COMPRESSING = "compressing"

class TBTorrent(BaseModel):
"""Real-Debrid torrent model"""
Copy link
Member

@dreulavelle dreulavelle Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😁

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uuuups 😆

id: str
hash: str
filename: str
bytes: int
status: TBTorrentStatus
added: datetime
links: List[str]
ended: Optional[datetime] = None
speed: Optional[int] = None
seeders: Optional[int] = None

class TorBoxError(Exception):
"""Base exception for Real-Debrid related errors"""

class TorBoxRequestHandler(BaseRequestHandler):
def __init__(self, session: Session, base_url: str, request_logging: bool = False):
super().__init__(session, response_type=ResponseType.DICT, base_url=base_url, custom_exception=TorBoxError, request_logging=request_logging)

def execute(self, method: HttpMethod, endpoint: str, **kwargs) -> Union[dict, list]:
response = super()._request(method, endpoint, **kwargs)
if response.status_code == 204:
return {}
if not response.data and not response.is_ok:
raise TorBoxError("Invalid JSON response from TorBox")
return response.data

class TorBoxAPI:
"""Handles TorBox API communication"""
BASE_URL = "https://api.torbox.app/v1/api"

def __init__(self, api_key: str, proxy_url: Optional[str] = None):
self.api_key = api_key
rate_limit_params = get_rate_limit_params(per_second=5)
self.session = create_service_session(rate_limit_params=rate_limit_params)
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
if proxy_url:
self.session.proxies = {"http": proxy_url, "https": proxy_url}
self.request_handler = TorBoxRequestHandler(self.session, self.BASE_URL)

class TorBoxDownloader(DownloaderBase):
"""Main Torbox downloader class implementing DownloaderBase"""
MAX_RETRIES = 3
RETRY_DELAY = 1.0

def __init__(self):
self.key = "torbox"
self.settings = settings_manager.settings.downloaders.torbox
self.api = None
self.file_finder = None
self.initialized = self.validate()

def validate(self) -> bool:
"""
Validate Real-Torbox and premium status
Required by DownloaderBase
"""
if not self._validate_settings():
return False

self.api = TorBoxAPI(
api_key=self.settings.api_key,
# proxy_url=self.settings.proxy_url if self.settings.proxy_enabled else None
)
self.file_finder = FileFinder("short_name", "size")

return self._validate_premium()

def _validate_settings(self) -> bool:
"""Validate configuration settings"""
if not self.settings.enabled:
return False
if not self.settings.api_key:
logger.warning("TorBox API key is not set")
return False
# if self.settings.proxy_enabled and not self.settings.proxy_url:
# logger.error("Proxy is enabled but no proxy URL is provided")
# return False
return True

def _validate_premium(self) -> bool:
"""Validate premium status"""
try:
response = self.api.request_handler.execute(HttpMethod.GET, "user/me")
user_info = response["data"]
if not user_info.get("plan") or user_info["plan"] == 0:
logger.error("Premium membership required")
return False

expiration = datetime.fromisoformat(
user_info["premium_expires_at"]
).replace(tzinfo=None)
logger.info(premium_days_left(expiration))
return True
except Exception as e:
logger.error(f"Failed to validate premium status: {e}")
return False

Comment on lines +103 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use specific exception handling for better error clarity

Catching all exceptions with except Exception as e can make debugging harder and may suppress important error information. Consider catching specific exceptions that are likely to occur in this context.

Update the exception handling to catch specific exceptions:

-            except Exception as e:
-                logger.error(f"Failed to validate premium status: {e}")
+            except requests.exceptions.RequestException as e:
+                logger.error(f"Network error during premium status validation: {e}")
+                return False
+            except KeyError as e:
+                logger.error(f"Unexpected response format: missing key {e}")
+                return False
+            except ValueError as e:
+                logger.error(f"Invalid date format in 'premium_expires_at': {e}")
+                return False

Committable suggestion skipped: line range outside the PR's diff.

# TODO
def get_instant_availability(self, infohashes: List[str]) -> Dict[str, list]:
"""
Get instant availability for multiple infohashes with retry logic
Required by DownloaderBase
"""

if len(infohashes) == 0:
return {}

for attempt in range(self.MAX_RETRIES):
try:
response = self.api.request_handler.execute(
HttpMethod.GET,
f"torrents/checkcached?hash={','.join(infohashes)}&format=list&list_files=true"
)

data = response.get("data")

if not data:
return {}

# Return early if data is not a dict
if not isinstance(data, list):
logger.warning(f"Invalid instant availability data from TorBox, expected list, got {type(data)}")
return {}

return {
entry['hash']: [{i: file for i, file in enumerate(entry['files'])}]
#entry['hash']: [{"1": entry['files']}]
for entry in data
if self._contains_valid_video_files(entry['files'])
# if isinstance(entry, dict)
}

except Exception as e:
logger.debug(f"Failed to get instant availability (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}")
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY)
continue
Comment on lines +146 to +150
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use specific exception handling

The broad exception catch could mask important errors. Consider catching specific exceptions for better error handling.

-            except Exception as e:
+            except TorBoxError as e:
+                logger.debug(f"TorBox API error (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}")
+                if attempt < self.MAX_RETRIES - 1:
+                    time.sleep(self.RETRY_DELAY)
+                continue
+            except (ValueError, KeyError) as e:
+                logger.debug(f"Response parsing error (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}")
+                if attempt < self.MAX_RETRIES - 1:
+                    time.sleep(self.RETRY_DELAY)
+                continue

Committable suggestion skipped: line range outside the PR's diff.


logger.debug("All retry attempts failed for instant availability")
return {}

# def _filter_valid_containers(self, containers: List[dict]) -> List[dict]:
# """Filter and sort valid video containers"""
# valid_containers = [
# container for container in containers
# if self._contains_valid_video_files(container)
# ]
# return sorted(valid_containers, key=len, reverse=True)

def _contains_valid_video_files(self, container: dict) -> bool:
"""Check if container has valid video files"""
return all(
any(
file["name"].endswith(ext) and "sample" not in file["name"].lower()
for ext in VIDEO_EXTENSIONS
)
for file in container
)

def add_torrent(self, infohash: str) -> str:
"""
Add a torrent by infohash
Required by DownloaderBase
"""
if not self.initialized:
raise TorBoxError("Downloader not properly initialized")

try:
magnet = f"magnet:?xt=urn:btih:{infohash}"
response = self.api.request_handler.execute(
HttpMethod.POST,
"torrents/createtorrent",
data={"magnet": magnet.lower()}
)
return response["data"]["torrent_id"]
except Exception as e:
logger.error(f"Failed to add torrent {infohash}: {e}")
raise

# TODO
def select_files(self, torrent_id: str, files: List[str]):
"""
Select files from a torrent
Required by DownloaderBase
"""
if not self.initialized:
raise TorBoxError("Downloader not properly initialized")

# I think that's not required for TorBox

# TODO
def get_torrent_info(self, torrent_id: str) -> dict:
"""
Get information about a torrent
Required by DownloaderBase
"""
if not self.initialized:
raise TorBoxError("Downloader not properly initialized")

# Does TorBox have a method to get torrent info?

# try:
# return self.api.request_handler.execute(HttpMethod.GET, f"torrents/torrentinfo/{torrent_id}")['data']
# except Exception as e:
# logger.error(f"Failed to get torrent info for {torrent_id}: {e}")
# raise

# TODO
def delete_torrent(self, torrent_id: str):
"""
Delete a torrent
Required by DownloaderBase
"""

if not self.initialized:
raise TorBoxError("Downloader not properly initialized")

logger.debug(f"Deleting torrent {torrent_id}")

try:
self.api.request_handler.execute(HttpMethod.POST, f"torrents/controltorrent", data={"torrent_id": torrent_id, "operation": "delete"})
except Exception as e:
logger.error(f"Failed to delete torrent {torrent_id}: {e}")
raise
Loading