From eb923d0a71135b501e8f496f23d9ff73f59bf434 Mon Sep 17 00:00:00 2001 From: Johannes Habel Date: Fri, 16 Feb 2024 21:56:57 +0100 Subject: [PATCH] - improved threaded downloading --- xvideos_api/modules/download.py | 114 +++++++++++++++----------------- 1 file changed, 54 insertions(+), 60 deletions(-) diff --git a/xvideos_api/modules/download.py b/xvideos_api/modules/download.py index 8f28ef0..cb9be7b 100644 --- a/xvideos_api/modules/download.py +++ b/xvideos_api/modules/download.py @@ -1,8 +1,9 @@ # Thanks to: https://github.com/EchterAlsFake/PHUB/blob/master/src/phub/modules/download.py # oh and of course ChatGPT lol -from ffmpeg_progress_yield import FfmpegProgress +import time import requests +from ffmpeg_progress_yield import FfmpegProgress from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable, List @@ -14,68 +15,61 @@ """ -def _thread(url: str, timeout: int) -> bytes: - ''' - Download a single segment using requests. - ''' - try: - response = requests.get(url, timeout=timeout) - response.raise_for_status() # This will raise an exception for HTTP errors - return response.content - except requests.RequestException as e: - print(f"Failed to download segment {url}: {e}") - return b'' - - -def _base_threaded(segments: List[str], - callback: CallbackType, - max_workers: int = 50, - timeout: int = 10) -> dict[str, bytes]: - ''' - Base threaded downloader for threaded backends. - ''' - length = len(segments) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - buffer = {} - future_to_url = {executor.submit(_thread, url, timeout): url for url in segments} - - completed = 0 - for future in as_completed(future_to_url): - url = future_to_url[future] - try: - segment_data = future.result() - if segment_data: - buffer[url] = segment_data +def download_segment(url: str, timeout: int, retries: int = 3, backoff_factor: float = 0.3) -> tuple[str, bytes, bool]: + """ + Attempt to download a single segment, retrying on failure. + Returns a tuple of the URL, content (empty if failed after retries), and a success flag. + """ + for attempt in range(retries): + try: + response = requests.get(url, timeout=timeout) + response.raise_for_status() # Raises stored HTTPError, if one occurred. + return (url, response.content, True) # Success + except requests.RequestException as e: + print(f"Retry {attempt + 1} for {url}: {e}") + time.sleep(backoff_factor * (2 ** attempt)) # Exponential backoff + + # After all retries have failed + return (url, b'', False) # Failed download + + +def threaded(max_workers: int = 20, timeout: int = 10, retries: int = 3): + """ + Creates a wrapper function for the actual download process, with retry logic. + """ + def wrapper(video, quality, callback, path): + """ + Download video segments in parallel, with retries for failures, and write to a file. + """ + segments = list(video.get_segments(quality=quality)) + length = len(segments) + completed, successful_downloads = 0, 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_segment = {executor.submit(download_segment, url, timeout, retries): url for url in segments} + + for future in as_completed(future_to_segment): + segment_url = future_to_segment[future] + try: + _, data, success = future.result() completed += 1 - callback(completed, length) - except Exception as e: - print(f"Error downloading segment {url}: {e}") - - return buffer - - -def threaded(max_workers: int = 100, - timeout: int = 30) -> Callable: - def wrapper(video, - quality, - callback: CallbackType, - path: str) -> None: - segments = list(video.get_segments(quality)) - - buffer = _base_threaded( - segments=segments, - callback=callback, - max_workers=max_workers, - timeout=timeout - ) + if success: + successful_downloads += 1 + callback(completed, length) # Update callback regardless of success to reflect progress + except Exception as e: + raise e + # Writing only successful downloads to the file with open(path, 'wb') as file: - for url in segments: - file.write(buffer.get(url, b'')) - - print(f'Successfully wrote file to {path}') - + for segment_url in segments: + if segment_url in future_to_segment: + future = future_to_segment[segment_url] + try: + _, data, success = future.result() + if success: + file.write(data) + except: + pass # This block could further handle or log missing data scenarios return wrapper