Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate error collection in ErrorsReport class #882

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,54 @@
tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request')
active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed')

class ErrorsReport():
ClientConnectorErrors: int
TimeoutErrors: int
ContentTypeErrors: int
ClientOSErrors: int
ServerDisconnectedErrors: int
unknown_errors: int

def __init__(self):
self.ClientConnectorErrors = 0
self.TimeoutErrors = 0
self.ContentTypeErrors = 0
self.ClientOSErrors = 0
self.ServerDisconnectedErrors = 0
self.unknown_errors = 0


def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items() if isinstance(v, int)}

def record_error(self, error: Exception):
if isinstance(error, aiohttp.client_exceptions.ClientConnectorError):
self.ClientConnectorErrors += 1
print(f"ClientConnectorError: {error}")
elif isinstance(error, asyncio.TimeoutError):
self.TimeoutErrors += 1
print(f"TimeoutError: {error}")
elif isinstance(error, aiohttp.client_exceptions.ContentTypeError):
self.ContentTypeErrors += 1
print(f"ContentTypeError: {error}")
elif isinstance(error, aiohttp.client_exceptions.ClientOSError):
self.ClientOSErrors += 1
print(f"ClientOSError: {error}")
elif isinstance(error, aiohttp.client_exceptions.ServerDisconnectedError):
self.ServerDisconnectedErrors += 1
print(f"ServerDisconnectedError: {error}")
else:
self.unknown_errors += 1
print(f"Unknown error: {error}")

def append_report(self, report: "ErrorsReport"):
self.ClientConnectorErrors += report.ClientConnectorErrors
self.TimeoutErrors += report.TimeoutErrors
self.ContentTypeErrors += report.ContentTypeErrors
self.ClientOSErrors += report.ClientOSErrors
self.ServerDisconnectedErrors += report.ServerDisconnectedErrors
self.unknown_errors += report.unknown_errors

# Add trace config for monitoring in flight requests
async def on_request_start(session, trace_config_ctx, params):
active_requests_metric.inc()
Expand Down Expand Up @@ -128,17 +176,6 @@ async def get_request(
# The next request will be sent after the interval.
await asyncio.sleep(interval)

def init_errors_map() -> Dict[str, int]:
errors = {
"ClientConnectorError": 0,
"TimeoutError": 0,
"ContentTypeError": 0,
"ClientOSError": 0,
"ServerDisconnectedError": 0,
"unknown_error": 0,
}
return errors

async def send_stream_request(
backend: str,
api_url: str,
Expand All @@ -151,10 +188,10 @@ async def send_stream_request(
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
) -> Tuple[Tuple[int, int, float], float, Optional[ErrorsReport]]:
"""Sends stream request to server"""
request_start_time = time.time()
errors = init_errors_map()
errors = ErrorsReport()

headers = {"User-Agent": "Benchmark Client"}
if backend == "vllm":
Expand Down Expand Up @@ -192,29 +229,8 @@ async def send_stream_request(
if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
if backend == "vllm":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
except Exception as e:
errors.record_error(e)
return None, None, errors
request_end_time = time.time()
output_token_ids = tokenizer(output).input_ids
Expand All @@ -234,10 +250,10 @@ async def send_request(
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
) -> Tuple[Tuple[int, int, float], float, Optional[ErrorsReport]]:
"""Sends request to server."""
request_start_time = time.time()
errors = init_errors_map()
errors = ErrorsReport()

headers = {"User-Agent": "Benchmark Client"}
if backend == "vllm":
Expand Down Expand Up @@ -317,29 +333,8 @@ async def send_request(
# Re-send the request if it failed.
if "error" not in output:
break
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
except Exception as e:
errors.record_error(e)
return None, None, errors

request_end_time = time.time()
Expand Down Expand Up @@ -381,7 +376,7 @@ async def benchmark(
api_url: str,
tokenizer: PreTrainedTokenizerBase,
model: str,
) -> Tuple[List[Tuple[int, int, float]], List[float], Dict[str, int]]:
) -> Tuple[List[Tuple[int, int, float]], List[float], Optional[ErrorsReport]]:
"""Runs benchmark with asynchronous requests."""
input_requests = sample_requests(
args.dataset,
Expand Down Expand Up @@ -431,13 +426,12 @@ async def benchmark(
results = await asyncio.gather(*tasks)
combined_latencies = []
combined_ttfts = []
combined_errors = init_errors_map()
combined_errors = ErrorsReport()
for latency, ttft, errors in results:
if latency:
combined_latencies.append(latency)
if errors:
for err, count in errors.items():
combined_errors[err] = combined_errors[err] + count
combined_errors.append_report(errors)
if ttft:
combined_ttfts.append(ttft)

Expand All @@ -461,7 +455,7 @@ def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics
**server_metrics
},
**benchmark_result,
**errors,
**(errors.to_dict()),
},
# dimensions values are strings
"dimensions": {
Expand Down Expand Up @@ -650,7 +644,7 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re
benchmark_result = {}

print(f"====Result for Model: {model}====")
print(f"Errors: {errors}")
print(f"Errors: {errors.to_dict()}")
print(f"Total time: {benchmark_duration:.2f} s")
print(f"Successful/total requests: {len(request_latencies)}/{total_requests}")
print(f"Requests/min: {60 * total_requests / benchmark_duration:.2f}")
Expand Down Expand Up @@ -754,19 +748,12 @@ async def main(args: argparse.Namespace):
# Summarize results
combined_latencies = []
combined_ttfts = []
combined_errors = {
"ClientConnectorError": 0,
"TimeoutError": 0,
"ContentTypeError": 0,
"ClientOSError": 0,
"unknown_error": 0,
"ServerDisconnectedError": 0,
}
combined_errors = ErrorsReport()
for latencies, ttfts, errors in results:
combined_latencies.extend(latencies)
combined_ttfts.extend(ttfts)
for k, v in errors.items():
combined_errors[k] = combined_errors[k] + v
if errors:
combined_errors.append_report(errors)

benchmark_duration_all_models = time.time() - benchmark_start_time
if args.save_aggregated_result:
Expand Down