Skip to content

Commit

Permalink
Encapsulate error collection in ErrorsReport class
Browse files Browse the repository at this point in the history
  • Loading branch information
Bslabe123 committed Nov 15, 2024
1 parent abdc036 commit f0ff475
Showing 1 changed file with 64 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,54 @@
tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request')
active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed')

class ErrorsReport():
ClientConnectorErrors: int
TimeoutErrors: int
ContentTypeErrors: int
ClientOSErrors: int
ServerDisconnectedErrors: int
unknown_errors: int

def __init__(self):
self.ClientConnectorErrors = 0
self.TimeoutErrors = 0
self.ContentTypeErrors = 0
self.ClientOSErrors = 0
self.ServerDisconnectedErrors = 0
self.unknown_errors = 0


def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items() if isinstance(v, int)}

def record_error(self, error: Exception):
if isinstance(error, aiohttp.client_exceptions.ClientConnectorError):
self.ClientConnectorErrors += 1
print(f"ClientConnectorError: {error}")
elif isinstance(error, asyncio.TimeoutError):
self.TimeoutErrors += 1
print(f"TimeoutError: {error}")
elif isinstance(error, aiohttp.client_exceptions.ContentTypeError):
self.ContentTypeErrors += 1
print(f"ContentTypeError: {error}")
elif isinstance(error, aiohttp.client_exceptions.ClientOSError):
self.ClientOSErrors += 1
print(f"ClientOSError: {error}")
elif isinstance(error, aiohttp.client_exceptions.ServerDisconnectedError):
self.ServerDisconnectedErrors += 1
print(f"ServerDisconnectedError: {error}")
else:
self.unknown_errors += 1
print(f"Unknown error: {error}")

def append_report(self, report: "ErrorsReport"):
self.ClientConnectorErrors += report.ClientConnectorErrors
self.TimeoutErrors += report.TimeoutErrors
self.ContentTypeErrors += report.ContentTypeErrors
self.ClientOSErrors += report.ClientOSErrors
self.ServerDisconnectedErrors += report.ServerDisconnectedErrors
self.unknown_errors += report.unknown_errors

# Add trace config for monitoring in flight requests
async def on_request_start(session, trace_config_ctx, params):
active_requests_metric.inc()
Expand Down Expand Up @@ -128,17 +176,6 @@ async def get_request(
# The next request will be sent after the interval.
await asyncio.sleep(interval)

def init_errors_map() -> Dict[str, int]:
errors = {
"ClientConnectorError": 0,
"TimeoutError": 0,
"ContentTypeError": 0,
"ClientOSError": 0,
"ServerDisconnectedError": 0,
"unknown_error": 0,
}
return errors

async def send_stream_request(
backend: str,
api_url: str,
Expand All @@ -151,10 +188,10 @@ async def send_stream_request(
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
) -> Tuple[Tuple[int, int, float], float, Optional[ErrorsReport]]:
"""Sends stream request to server"""
request_start_time = time.time()
errors = init_errors_map()
errors = ErrorsReport()

headers = {"User-Agent": "Benchmark Client"}
if backend == "vllm":
Expand Down Expand Up @@ -192,29 +229,8 @@ async def send_stream_request(
if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
if backend == "vllm":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
except Exception as e:
errors.record_error(e)
return None, None, errors
request_end_time = time.time()
output_token_ids = tokenizer(output).input_ids
Expand All @@ -234,10 +250,10 @@ async def send_request(
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
) -> Tuple[Tuple[int, int, float], float, Optional[ErrorsReport]]:
"""Sends request to server."""
request_start_time = time.time()
errors = init_errors_map()
errors = ErrorsReport()

headers = {"User-Agent": "Benchmark Client"}
if backend == "vllm":
Expand Down Expand Up @@ -317,29 +333,8 @@ async def send_request(
# Re-send the request if it failed.
if "error" not in output:
break
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
except Exception as e:
errors.record_error(e)
return None, None, errors

request_end_time = time.time()
Expand Down Expand Up @@ -381,7 +376,7 @@ async def benchmark(
api_url: str,
tokenizer: PreTrainedTokenizerBase,
model: str,
) -> Tuple[List[Tuple[int, int, float]], List[float], Dict[str, int]]:
) -> Tuple[List[Tuple[int, int, float]], List[float], Optional[ErrorsReport]]:
"""Runs benchmark with asynchronous requests."""
input_requests = sample_requests(
args.dataset,
Expand Down Expand Up @@ -431,13 +426,12 @@ async def benchmark(
results = await asyncio.gather(*tasks)
combined_latencies = []
combined_ttfts = []
combined_errors = init_errors_map()
combined_errors = ErrorsReport()
for latency, ttft, errors in results:
if latency:
combined_latencies.append(latency)
if errors:
for err, count in errors.items():
combined_errors[err] = combined_errors[err] + count
combined_errors.append_report(errors)
if ttft:
combined_ttfts.append(ttft)

Expand All @@ -461,7 +455,7 @@ def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics
**server_metrics
},
**benchmark_result,
**errors,
**(errors.to_dict()),
},
# dimensions values are strings
"dimensions": {
Expand Down Expand Up @@ -650,7 +644,7 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re
benchmark_result = {}

print(f"====Result for Model: {model}====")
print(f"Errors: {errors}")
print(f"Errors: {errors.to_dict()}")
print(f"Total time: {benchmark_duration:.2f} s")
print(f"Successful/total requests: {len(request_latencies)}/{total_requests}")
print(f"Requests/min: {60 * total_requests / benchmark_duration:.2f}")
Expand Down Expand Up @@ -754,19 +748,12 @@ async def main(args: argparse.Namespace):
# Summarize results
combined_latencies = []
combined_ttfts = []
combined_errors = {
"ClientConnectorError": 0,
"TimeoutError": 0,
"ContentTypeError": 0,
"ClientOSError": 0,
"unknown_error": 0,
"ServerDisconnectedError": 0,
}
combined_errors = ErrorsReport()
for latencies, ttfts, errors in results:
combined_latencies.extend(latencies)
combined_ttfts.extend(ttfts)
for k, v in errors.items():
combined_errors[k] = combined_errors[k] + v
if errors:
combined_errors.append_report(errors)

benchmark_duration_all_models = time.time() - benchmark_start_time
if args.save_aggregated_result:
Expand Down

0 comments on commit f0ff475

Please sign in to comment.