diff --git a/PACKAGE_README.md b/PACKAGE_README.md index 3448d03..1ab8e27 100644 --- a/PACKAGE_README.md +++ b/PACKAGE_README.md @@ -29,7 +29,7 @@ This project would not have been possible without the incredible work that [@and ## Prerequisites -It helps to have some familiarity with how the [OpenAI Python API library](https://github.com/openai/openai-python) works. If you have used it before, then the code in `aoai.py` here will look very familiar to you. +It helps to have some familiarity with how the [OpenAI Python API library](https://github.com/openai/openai-python) works. If you have used it before, then the code in the [aoai.py](https://github.com/simonkurtz-MSFT/python-openai-loadbalancer/blob/main/aoai.py) test harness for this package will look very familiar to you. It's also good to have some knowledge of authentication and identities. ## Getting Started diff --git a/src/openai_priority_loadbalancer/openai_priority_loadbalancer.py b/src/openai_priority_loadbalancer/openai_priority_loadbalancer.py index 17ee1c8..c7dfc43 100644 --- a/src/openai_priority_loadbalancer/openai_priority_loadbalancer.py +++ b/src/openai_priority_loadbalancer/openai_priority_loadbalancer.py @@ -87,6 +87,41 @@ def _get_soonest_retry_after(self): self._log.info(f"Soonest Retry After: {soonest_backend} - {str(delay)} second(s)") return delay + def _handle_200_399_response(self, request, response, backend_index): + # Successful requests + self._log.info(f"Request sent to server: {request.url}, Status code: {response.status_code}") + self.backends[backend_index].successful_call_count += 1 + + def _handle_429_5xx_response(self, request, response, backend_index): + # If the server is throttling or there's a server error, retry with a different server + self._log.info(f"Request sent to server: {request.url}, Status Code: {response.status_code} - FAIL") + + retry_after = int(response.headers.get('Retry-After', '-1')) + + if retry_after == -1: + retry_after = int(response.headers.get('x-ratelimit-reset-requests', '-1')) + + if retry_after == -1: + retry_after = int(response.headers.get('x-ratelimit-reset-requests', '10')) + + self._log.info(f"Backend {self.backends[backend_index].host} is throttling. Retry after {retry_after} second(s).") + + backend = self.backends[backend_index] + backend.is_throttling = True + backend.retry_after = datetime.now(tzutc()) + timedelta(seconds = retry_after) + self._get_remaining_backends() + + def _handle_4xx_response(self, request, response): + # Would likely be a 4xx error other than 429 + self._log.warning(f"Request sent to server: {request.url}, Status code: {response.status_code} - FAIL") + + def modify_request(self, request, backend_index): + # Modify the request. Note that only the URL and Host header are being modified on the original request object. We make the smallest incision possible to avoid side effects. + # Update URL and host header as both must match the backend server. + request.url = request.url.copy_with(host = self.backends[backend_index].host) + request.headers = request.headers.copy() # Create a mutable copy of the headers + request.headers['host'] = self.backends[backend_index].host + def _return_429(self): self._log.warning("No backend available!") retry_after = str(self._get_soonest_retry_after()) @@ -105,52 +140,32 @@ async def handle_async_request(self, request): response = None while True and self._remaining_backends > 0: + # 1) Determine the appropriate backend to use backend_index = self._get_backend_index() if backend_index == -1: return self._return_429() - # Modify the request. Note that only the URL and Host header are being modified on the original request object. We make the smallest incision possible to avoid side effects. - # Update URL and host header as both must match the backend server. - request.url = request.url.copy_with(host = self.backends[backend_index].host) - request.headers = request.headers.copy() # Create a mutable copy of the headers - request.headers['host'] = self.backends[backend_index].host + # 2) Modify the intercepted request + self.modify_request(request, backend_index) - # Send the request to the backend + # 3) Send the request to the selected backend (via async) try: response = await self._transport.send(request) except Exception as e: self._log.error(traceback.print_exc()) + # 4) Evaluate the response from the backend if response is not None and (response.status_code == 429 or response.status_code >= 500): - # If the server is throttling or there's a server error, retry with a different server - self._log.info(f"Request sent to server: {request.url}, Status Code: {response.status_code} - FAIL") - - retry_after = int(response.headers.get('Retry-After', '-1')) - - if retry_after == -1: - retry_after = int(response.headers.get('x-ratelimit-reset-requests', '-1')) - - if retry_after == -1: - retry_after = int(response.headers.get('x-ratelimit-reset-requests', '10')) - - self._log.info(f"Backend {self.backends[backend_index].host} is throttling. Retry after {retry_after} second(s).") - - backend = self.backends[backend_index] - backend.is_throttling = True - backend.retry_after = datetime.now(tzutc()) + timedelta(seconds = retry_after) - self._get_remaining_backends() + self._handle_429_5xx_response(request, response, backend_index) continue elif response is not None and (response.status_code >= 200 and response.status_code <= 399): - # Successful requests - self._log.info(f"Request sent to server: {request.url}, Status code: {response.status_code}") - self.backends[backend_index].successful_call_count += 1 + self._handle_200_399_response(request, response, backend_index) break else: - # Would likely be a 4xx error other than 429 - self._log.warning(f"Request sent to server: {request.url}, Status code: {response.status_code} - FAIL") + self._handle_4xx_response(request, response) break if self._remaining_backends == 0: @@ -170,53 +185,32 @@ def handle_request(self, request): response = None while True and self._remaining_backends > 0: + # 1) Determine the appropriate backend to use backend_index = self._get_backend_index() if backend_index == -1: return self._return_429() - # Modify the request. Note that only the URL and Host header are being modified on the original request object. We make the smallest incision possible to avoid side effects. - # Update URL and host header as both must match the backend server. - request.url = request.url.copy_with(host = self.backends[backend_index].host) - headers = request.headers.copy() # Create a mutable copy of the headers - headers['host'] = self.backends[backend_index].host - request.headers = headers # Assign the modified headers back to request.headers + # 2) Modify the intercepted request + self.modify_request(request, backend_index) - # Send the request to the backend + # 3) Send the request to the selected backend try: response = self._transport.send(request) except Exception as e: self._log.error(traceback.print_exc()) + # 4) Evaluate the response from the backend if response is not None and (response.status_code == 429 or response.status_code >= 500): - # If the server is throttling or there's a server error, retry with a different server - self._log.warning(f"Request sent to server: {request.url}, Status Code: {response.status_code} - FAIL") - - retry_after = int(response.headers.get('Retry-After', '-1')) - - if retry_after == -1: - retry_after = int(response.headers.get('x-ratelimit-reset-requests', '-1')) - - if retry_after == -1: - retry_after = int(response.headers.get('x-ratelimit-reset-requests', '10')) - - self._log.info(f"Backend {self.backends[backend_index].host} is throttling. Retry after {retry_after} second(s).") - - backend = self.backends[backend_index] - backend.is_throttling = True - backend.retry_after = datetime.now(tzutc()) + timedelta(seconds = retry_after) - self._get_remaining_backends() + self._handle_429_5xx_response(request, response, backend_index) continue elif response is not None and (response.status_code >= 200 and response.status_code <= 399): - # Successful requests - self._log.info(f"Request sent to server: {request.url}, Status code: {response.status_code}") - self.backends[backend_index].successful_call_count += 1 + self._handle_200_399_response(request, response, backend_index) break else: - # Would likely be a 4xx error other than 429 - self._log.warning(f"Request sent to server: {request.url}, Status code: {response.status_code} - FAIL") + self._handle_4xx_response(request, response) break if self._remaining_backends == 0: