diff --git a/lib/connection/requester.py b/lib/connection/requester.py index 260cd2201..df3ac7605 100755 --- a/lib/connection/requester.py +++ b/lib/connection/requester.py @@ -58,24 +58,6 @@ socket.getaddrinfo = cached_getaddrinfo -class HTTPBearerAuth(AuthBase): - def __init__(self, token): - self.token = token - - def __call__(self, request): - request.headers["Authorization"] = f"Bearer {self.token}" - return request - - -class HTTPXBearerAuth(httpx.Auth): - def __init__(self, token: str) -> None: - self.token = token - - def auth_flow(self, request: httpx.Request) -> Generator: - request.headers["Authorization"] = f"Bearer {self.token}" - yield request - - class BaseRequester: def __init__(self): self._url = None @@ -151,6 +133,15 @@ def rate(self): return self._rate +class HTTPBearerAuth(AuthBase): + def __init__(self, token): + self.token = token + + def __call__(self, request): + request.headers["Authorization"] = f"Bearer {self.token}" + return request + + class Requester(BaseRequester): def __init__(self): super().__init__() @@ -275,25 +266,42 @@ def request(self, path, proxy=None): raise RequestException(err_msg) +class HTTPXBearerAuth(httpx.Auth): + def __init__(self, token: str) -> None: + self.token = token + + def auth_flow(self, request: httpx.Request) -> Generator: + request.headers["Authorization"] = f"Bearer {self.token}" + yield request + + +class ProxyRoatingTransport(httpx.AsyncBaseTransport): + def __init__(self, proxies, **kwargs) -> None: + self._transports = [ + httpx.AsyncHTTPTransport(proxy=proxy, **kwargs) for proxy in proxies + ] + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + transport = random.choice(self._transports) + return await transport.handle_async_request(request) + + class AsyncRequester(BaseRequester): def __init__(self): super().__init__() - proxy = None - try: - proxy = self.parse_proxy(random.choice(options["proxies"])) - except IndexError: - pass - - transport = httpx.AsyncHTTPTransport( - verify=False, - cert=self._cert, - limits=httpx.Limits(max_connections=options["thread_count"]), - # httpx doesn't let you choose different proxy for each request - # https://github.com/encode/httpx/discussions/3183 - proxy=proxy, - retries=options["max_retries"], - socket_options=self._socket_options, + tpargs = { + "verify": False, + "cert": self._cert, + "limits": httpx.Limits(max_connections=options["thread_count"]), + "socket_options": self._socket_options, + } + transport = ( + ProxyRoatingTransport( + list(map(self.parse_proxy, options["proxies"])), **tpargs + ) + if options["proxies"] + else httpx.AsyncHTTPTransport(**tpargs) ) self.session = httpx.AsyncClient( @@ -344,59 +352,60 @@ async def request(self, path: str) -> AsyncResponse: url = safequote(self._url + path if self._url else path) parsed_url = urlparse(url) - try: - if self.agents: - self.set_header("user-agent", random.choice(self.agents)) + for _ in range(options["max_retries"] + 1): + try: + if self.agents: + self.set_header("user-agent", random.choice(self.agents)) - # Use "target" extension to avoid the URL path from being normalized - request = self.session.build_request( - options["http_method"], - url, - headers=self.headers, - data=options["data"], - ) - if p := parsed_url.path: - request.extensions = {"target": p.encode()} + # Use "target" extension to avoid the URL path from being normalized + request = self.session.build_request( + options["http_method"], + url, + headers=self.headers, + data=options["data"], + ) + if p := parsed_url.path: + request.extensions = {"target": p.encode()} - xresponse = await self.session.send( - request, - stream=True, - follow_redirects=options["follow_redirects"], - ) - response = await AsyncResponse.create(xresponse) - await xresponse.aclose() + xresponse = await self.session.send( + request, + stream=True, + follow_redirects=options["follow_redirects"], + ) + response = await AsyncResponse.create(xresponse) + await xresponse.aclose() - log_msg = f'"{options["http_method"]} {response.url}" {response.status} - {response.length}B' + log_msg = f'"{options["http_method"]} {response.url}" {response.status} - {response.length}B' - if response.redirect: - log_msg += f" - LOCATION: {response.redirect}" + if response.redirect: + log_msg += f" - LOCATION: {response.redirect}" - logger.info(log_msg) + logger.info(log_msg) - return response + return response - except Exception as e: - logger.exception(e) + except Exception as e: + logger.exception(e) - if isinstance(e, httpx.ConnectError): - if str(e).startswith("[Errno -2]"): - err_msg = "Couldn't resolve DNS" + if isinstance(e, httpx.ConnectError): + if str(e).startswith("[Errno -2]"): + err_msg = "Couldn't resolve DNS" + else: + err_msg = f"Cannot connect to: {urlparse(url).netloc}" + elif isinstance(e, SSLError): + err_msg = "Unexpected SSL error" + elif isinstance(e, httpx.TooManyRedirects): + err_msg = f"Too many redirects: {url}" + elif isinstance(e, httpx.ProxyError): + err_msg = "Cannot establish the proxy connection" + elif isinstance(e, httpx.InvalidURL): + err_msg = f"Invalid URL: {url}" + elif isinstance(e, httpx.TimeoutException): + err_msg = f"Request timeout: {url}" + elif isinstance(e, httpx.ReadError) or isinstance(e, httpx.DecodingError): # not sure + err_msg = f"Failed to read response body: {url}" else: - err_msg = f"Cannot connect to: {urlparse(url).netloc}" - elif isinstance(e, SSLError): - err_msg = "Unexpected SSL error" - elif isinstance(e, httpx.TooManyRedirects): - err_msg = f"Too many redirects: {url}" - elif isinstance(e, httpx.ProxyError): - err_msg = "Cannot establish the proxy connection" - elif isinstance(e, httpx.InvalidURL): - err_msg = f"Invalid URL: {url}" - elif isinstance(e, httpx.TimeoutException): - err_msg = f"Request timeout: {url}" - elif isinstance(e, httpx.ReadError) or isinstance(e, httpx.DecodingError): # not sure - err_msg = f"Failed to read response body: {url}" - else: - err_msg = f"There was a problem in the request to: {url}" + err_msg = f"There was a problem in the request to: {url}" raise RequestException(err_msg)