diff --git a/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py b/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py index d46a6f4f9..80dc832ed 100644 --- a/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py +++ b/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py @@ -37,6 +37,54 @@ tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request') active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed') +class ErrorsReport(): + ClientConnectorErrors: int + TimeoutErrors: int + ContentTypeErrors: int + ClientOSErrors: int + ServerDisconnectedErrors: int + unknown_errors: int + + def __init__(self): + self.ClientConnectorErrors = 0 + self.TimeoutErrors = 0 + self.ContentTypeErrors = 0 + self.ClientOSErrors = 0 + self.ServerDisconnectedErrors = 0 + self.unknown_errors = 0 + + + def to_dict(self) -> dict: + return {k: v for k, v in self.__dict__.items() if isinstance(v, int)} + + def record_error(self, error: Exception): + if isinstance(error, aiohttp.client_exceptions.ClientConnectorError): + self.ClientConnectorErrors += 1 + print(f"ClientConnectorError: {error}") + elif isinstance(error, asyncio.TimeoutError): + self.TimeoutErrors += 1 + print(f"TimeoutError: {error}") + elif isinstance(error, aiohttp.client_exceptions.ContentTypeError): + self.ContentTypeErrors += 1 + print(f"ContentTypeError: {error}") + elif isinstance(error, aiohttp.client_exceptions.ClientOSError): + self.ClientOSErrors += 1 + print(f"ClientOSError: {error}") + elif isinstance(error, aiohttp.client_exceptions.ServerDisconnectedError): + self.ServerDisconnectedErrors += 1 + print(f"ServerDisconnectedError: {error}") + else: + self.unknown_errors += 1 + print(f"Unknown error: {error}") + + def append_report(self, report: "ErrorsReport"): + self.ClientConnectorErrors += report.ClientConnectorErrors + self.TimeoutErrors += report.TimeoutErrors + self.ContentTypeErrors += report.ContentTypeErrors + self.ClientOSErrors += report.ClientOSErrors + self.ServerDisconnectedErrors += report.ServerDisconnectedErrors + self.unknown_errors += report.unknown_errors + # Add trace config for monitoring in flight requests async def on_request_start(session, trace_config_ctx, params): active_requests_metric.inc() @@ -128,17 +176,6 @@ async def get_request( # The next request will be sent after the interval. await asyncio.sleep(interval) -def init_errors_map() -> Dict[str, int]: - errors = { - "ClientConnectorError": 0, - "TimeoutError": 0, - "ContentTypeError": 0, - "ClientOSError": 0, - "ServerDisconnectedError": 0, - "unknown_error": 0, - } - return errors - async def send_stream_request( backend: str, api_url: str, @@ -151,10 +188,10 @@ async def send_stream_request( tokenizer: PreTrainedTokenizerBase, sax_model: str, model: str, -) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]: +) -> Tuple[Tuple[int, int, float], float, Optional[ErrorsReport]]: """Sends stream request to server""" request_start_time = time.time() - errors = init_errors_map() + errors = ErrorsReport() headers = {"User-Agent": "Benchmark Client"} if backend == "vllm": @@ -192,29 +229,8 @@ async def send_stream_request( if chunk_bytes.decode("utf-8")[6:] != "[DONE]": if backend == "vllm": output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 + except Exception as e: + errors.record_error(e) return None, None, errors request_end_time = time.time() output_token_ids = tokenizer(output).input_ids @@ -234,10 +250,10 @@ async def send_request( tokenizer: PreTrainedTokenizerBase, sax_model: str, model: str, -) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]: +) -> Tuple[Tuple[int, int, float], float, Optional[ErrorsReport]]: """Sends request to server.""" request_start_time = time.time() - errors = init_errors_map() + errors = ErrorsReport() headers = {"User-Agent": "Benchmark Client"} if backend == "vllm": @@ -317,29 +333,8 @@ async def send_request( # Re-send the request if it failed. if "error" not in output: break - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 + except Exception as e: + errors.record_error(e) return None, None, errors request_end_time = time.time() @@ -381,7 +376,7 @@ async def benchmark( api_url: str, tokenizer: PreTrainedTokenizerBase, model: str, -) -> Tuple[List[Tuple[int, int, float]], List[float], Dict[str, int]]: +) -> Tuple[List[Tuple[int, int, float]], List[float], Optional[ErrorsReport]]: """Runs benchmark with asynchronous requests.""" input_requests = sample_requests( args.dataset, @@ -431,13 +426,12 @@ async def benchmark( results = await asyncio.gather(*tasks) combined_latencies = [] combined_ttfts = [] - combined_errors = init_errors_map() + combined_errors = ErrorsReport() for latency, ttft, errors in results: if latency: combined_latencies.append(latency) if errors: - for err, count in errors.items(): - combined_errors[err] = combined_errors[err] + count + combined_errors.append_report(errors) if ttft: combined_ttfts.append(ttft) @@ -461,7 +455,7 @@ def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics **server_metrics }, **benchmark_result, - **errors, + **(errors.to_dict()), }, # dimensions values are strings "dimensions": { @@ -650,7 +644,7 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re benchmark_result = {} print(f"====Result for Model: {model}====") - print(f"Errors: {errors}") + print(f"Errors: {errors.to_dict()}") print(f"Total time: {benchmark_duration:.2f} s") print(f"Successful/total requests: {len(request_latencies)}/{total_requests}") print(f"Requests/min: {60 * total_requests / benchmark_duration:.2f}") @@ -754,19 +748,12 @@ async def main(args: argparse.Namespace): # Summarize results combined_latencies = [] combined_ttfts = [] - combined_errors = { - "ClientConnectorError": 0, - "TimeoutError": 0, - "ContentTypeError": 0, - "ClientOSError": 0, - "unknown_error": 0, - "ServerDisconnectedError": 0, - } + combined_errors = ErrorsReport() for latencies, ttfts, errors in results: combined_latencies.extend(latencies) combined_ttfts.extend(ttfts) - for k, v in errors.items(): - combined_errors[k] = combined_errors[k] + v + if errors: + combined_errors.append_report(errors) benchmark_duration_all_models = time.time() - benchmark_start_time if args.save_aggregated_result: