Skip to content

Commit

Permalink
Feature/refactor loadbalancer (#8)
Browse files Browse the repository at this point in the history
* Refactor and add comments

* Fix link
  • Loading branch information
simonkurtz-MSFT authored May 15, 2024
1 parent eb8b88e commit b156dfe
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 58 deletions.
2 changes: 1 addition & 1 deletion PACKAGE_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ This project would not have been possible without the incredible work that [@and

## Prerequisites

It helps to have some familiarity with how the [OpenAI Python API library](https://github.com/openai/openai-python) works. If you have used it before, then the code in `aoai.py` here will look very familiar to you.
It helps to have some familiarity with how the [OpenAI Python API library](https://github.com/openai/openai-python) works. If you have used it before, then the code in the [aoai.py](https://github.com/simonkurtz-MSFT/python-openai-loadbalancer/blob/main/aoai.py) test harness for this package will look very familiar to you.
It's also good to have some knowledge of authentication and identities.

## Getting Started
Expand Down
108 changes: 51 additions & 57 deletions src/openai_priority_loadbalancer/openai_priority_loadbalancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,41 @@ def _get_soonest_retry_after(self):
self._log.info(f"Soonest Retry After: {soonest_backend} - {str(delay)} second(s)")
return delay

def _handle_200_399_response(self, request, response, backend_index):
# Successful requests
self._log.info(f"Request sent to server: {request.url}, Status code: {response.status_code}")
self.backends[backend_index].successful_call_count += 1

def _handle_429_5xx_response(self, request, response, backend_index):
# If the server is throttling or there's a server error, retry with a different server
self._log.info(f"Request sent to server: {request.url}, Status Code: {response.status_code} - FAIL")

retry_after = int(response.headers.get('Retry-After', '-1'))

if retry_after == -1:
retry_after = int(response.headers.get('x-ratelimit-reset-requests', '-1'))

if retry_after == -1:
retry_after = int(response.headers.get('x-ratelimit-reset-requests', '10'))

self._log.info(f"Backend {self.backends[backend_index].host} is throttling. Retry after {retry_after} second(s).")

backend = self.backends[backend_index]
backend.is_throttling = True
backend.retry_after = datetime.now(tzutc()) + timedelta(seconds = retry_after)
self._get_remaining_backends()

def _handle_4xx_response(self, request, response):
# Would likely be a 4xx error other than 429
self._log.warning(f"Request sent to server: {request.url}, Status code: {response.status_code} - FAIL")

def modify_request(self, request, backend_index):
# Modify the request. Note that only the URL and Host header are being modified on the original request object. We make the smallest incision possible to avoid side effects.
# Update URL and host header as both must match the backend server.
request.url = request.url.copy_with(host = self.backends[backend_index].host)
request.headers = request.headers.copy() # Create a mutable copy of the headers
request.headers['host'] = self.backends[backend_index].host

def _return_429(self):
self._log.warning("No backend available!")
retry_after = str(self._get_soonest_retry_after())
Expand All @@ -105,52 +140,32 @@ async def handle_async_request(self, request):
response = None

while True and self._remaining_backends > 0:
# 1) Determine the appropriate backend to use
backend_index = self._get_backend_index()

if backend_index == -1:
return self._return_429()

# Modify the request. Note that only the URL and Host header are being modified on the original request object. We make the smallest incision possible to avoid side effects.
# Update URL and host header as both must match the backend server.
request.url = request.url.copy_with(host = self.backends[backend_index].host)
request.headers = request.headers.copy() # Create a mutable copy of the headers
request.headers['host'] = self.backends[backend_index].host
# 2) Modify the intercepted request
self.modify_request(request, backend_index)

# Send the request to the backend
# 3) Send the request to the selected backend (via async)
try:
response = await self._transport.send(request)
except Exception as e:
self._log.error(traceback.print_exc())

# 4) Evaluate the response from the backend
if response is not None and (response.status_code == 429 or response.status_code >= 500):
# If the server is throttling or there's a server error, retry with a different server
self._log.info(f"Request sent to server: {request.url}, Status Code: {response.status_code} - FAIL")

retry_after = int(response.headers.get('Retry-After', '-1'))

if retry_after == -1:
retry_after = int(response.headers.get('x-ratelimit-reset-requests', '-1'))

if retry_after == -1:
retry_after = int(response.headers.get('x-ratelimit-reset-requests', '10'))

self._log.info(f"Backend {self.backends[backend_index].host} is throttling. Retry after {retry_after} second(s).")

backend = self.backends[backend_index]
backend.is_throttling = True
backend.retry_after = datetime.now(tzutc()) + timedelta(seconds = retry_after)
self._get_remaining_backends()
self._handle_429_5xx_response(request, response, backend_index)
continue

elif response is not None and (response.status_code >= 200 and response.status_code <= 399):
# Successful requests
self._log.info(f"Request sent to server: {request.url}, Status code: {response.status_code}")
self.backends[backend_index].successful_call_count += 1
self._handle_200_399_response(request, response, backend_index)
break

else:
# Would likely be a 4xx error other than 429
self._log.warning(f"Request sent to server: {request.url}, Status code: {response.status_code} - FAIL")
self._handle_4xx_response(request, response)
break

if self._remaining_backends == 0:
Expand All @@ -170,53 +185,32 @@ def handle_request(self, request):
response = None

while True and self._remaining_backends > 0:
# 1) Determine the appropriate backend to use
backend_index = self._get_backend_index()

if backend_index == -1:
return self._return_429()

# Modify the request. Note that only the URL and Host header are being modified on the original request object. We make the smallest incision possible to avoid side effects.
# Update URL and host header as both must match the backend server.
request.url = request.url.copy_with(host = self.backends[backend_index].host)
headers = request.headers.copy() # Create a mutable copy of the headers
headers['host'] = self.backends[backend_index].host
request.headers = headers # Assign the modified headers back to request.headers
# 2) Modify the intercepted request
self.modify_request(request, backend_index)

# Send the request to the backend
# 3) Send the request to the selected backend
try:
response = self._transport.send(request)
except Exception as e:
self._log.error(traceback.print_exc())

# 4) Evaluate the response from the backend
if response is not None and (response.status_code == 429 or response.status_code >= 500):
# If the server is throttling or there's a server error, retry with a different server
self._log.warning(f"Request sent to server: {request.url}, Status Code: {response.status_code} - FAIL")

retry_after = int(response.headers.get('Retry-After', '-1'))

if retry_after == -1:
retry_after = int(response.headers.get('x-ratelimit-reset-requests', '-1'))

if retry_after == -1:
retry_after = int(response.headers.get('x-ratelimit-reset-requests', '10'))

self._log.info(f"Backend {self.backends[backend_index].host} is throttling. Retry after {retry_after} second(s).")

backend = self.backends[backend_index]
backend.is_throttling = True
backend.retry_after = datetime.now(tzutc()) + timedelta(seconds = retry_after)
self._get_remaining_backends()
self._handle_429_5xx_response(request, response, backend_index)
continue

elif response is not None and (response.status_code >= 200 and response.status_code <= 399):
# Successful requests
self._log.info(f"Request sent to server: {request.url}, Status code: {response.status_code}")
self.backends[backend_index].successful_call_count += 1
self._handle_200_399_response(request, response, backend_index)
break

else:
# Would likely be a 4xx error other than 429
self._log.warning(f"Request sent to server: {request.url}, Status code: {response.status_code} - FAIL")
self._handle_4xx_response(request, response)
break

if self._remaining_backends == 0:
Expand Down

0 comments on commit b156dfe

Please sign in to comment.