Skip to content

Commit

Permalink
- trying to improve threaded downloading, so that if the server doesn…
Browse files Browse the repository at this point in the history
…'t respond, it doesn't block the thread. Testing this now...
  • Loading branch information
EchterAlsFake committed Feb 16, 2024
1 parent 22cc643 commit 20126a2
Showing 1 changed file with 56 additions and 58 deletions.
114 changes: 56 additions & 58 deletions xnxx_api/modules/download.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Thanks to: https://github.com/EchterAlsFake/PHUB/blob/master/src/phub/modules/download.py
# oh and of course ChatGPT lol

from ffmpeg_progress_yield import FfmpegProgress
import time
import requests
from ffmpeg_progress_yield import FfmpegProgress
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List

Expand All @@ -14,67 +15,64 @@
"""


def _thread(url: str, timeout: int) -> bytes:
'''
Download a single segment using requests.
'''
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status() # This will raise an exception for HTTP errors
return response.content
except requests.RequestException as e:
print(f"Failed to download segment {url}: {e}")
return b''


def _base_threaded(segments: List[str],
callback: CallbackType,
max_workers: int = 50,
timeout: int = 10) -> dict[str, bytes]:
'''
Base threaded downloader for threaded backends.
'''
length = len(segments)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
buffer = {}
future_to_url = {executor.submit(_thread, url, timeout): url for url in segments}

completed = 0
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
segment_data = future.result()
if segment_data:
buffer[url] = segment_data
def download_segment(url: str, timeout: int, retries: int = 3, backoff_factor: float = 0.3) -> tuple[str, bytes, bool]:
"""
Attempt to download a single segment, retrying on failure.
Returns a tuple of the URL, content (empty if failed after retries), and a success flag.
"""
for attempt in range(retries):
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status() # Raises stored HTTPError, if one occurred.
return (url, response.content, True) # Success
except requests.RequestException as e:
print(f"Retry {attempt + 1} for {url}: {e}")
time.sleep(backoff_factor * (2 ** attempt)) # Exponential backoff

# After all retries have failed
return (url, b'', False) # Failed download


def improved_threaded(max_workers: int = 50, timeout: int = 10, retries: int = 3):
"""
Creates a wrapper function for the actual download process, with retry logic.
"""
def wrapper(segments, callback, path):
"""
Download video segments in parallel, with retries for failures, and write to a file.
"""
length = len(segments)
completed, successful_downloads = 0, 0

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_segment = {executor.submit(download_segment, url, timeout, retries): url for url in segments}

for future in as_completed(future_to_segment):
segment_url = future_to_segment[future]
try:
_, data, success = future.result()
completed += 1
callback(completed, length)
except Exception as e:
print(f"Error downloading segment {url}: {e}")
if success:
successful_downloads += 1
callback(completed, length) # Update callback regardless of success to reflect progress
except Exception as e:
print(f"Unhandled error downloading segment {segment_url}: {e}")

return buffer


def threaded(max_workers: int = 100,
timeout: int = 30) -> Callable:
def wrapper(video,
quality,
callback: CallbackType,
path: str) -> None:
segments = list(video.get_segments(quality))

buffer = _base_threaded(
segments=segments,
callback=callback,
max_workers=max_workers,
timeout=timeout
)
print(f"Completed downloading {completed} out of {length} segments, with {successful_downloads} successful.")

# Writing only successful downloads to the file
with open(path, 'wb') as file:
for url in segments:
file.write(buffer.get(url, b''))

print(f'Successfully wrote file to {path}')
for segment_url in segments:
if segment_url in future_to_segment:
future = future_to_segment[segment_url]
try:
_, data, success = future.result()
if success:
file.write(data)
except:
pass # This block could further handle or log missing data scenarios

print(f"Successfully wrote file to {path}")

return wrapper

Expand Down

0 comments on commit 20126a2

Please sign in to comment.