Skip to content

Commit

Permalink
Add block io graph
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickkenney9801 committed Nov 27, 2024
1 parent 7505cbc commit edf6cee
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 25 deletions.
67 changes: 65 additions & 2 deletions python/kernmlops/data_schema/block_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
UPTIME_TIMESTAMP,
CollectionGraph,
CollectionTable,
GraphEngine,
)

# from: https://github.com/iovisor/bcc/blob/8d85dcfac86bb7402a20bea5ceba373e5e019b6c/tools/biolatency.py#L328
Expand Down Expand Up @@ -202,7 +203,7 @@ def from_tables(cls, queue_table: BlockIOQueueTable, latency_table: BlockIOLaten
pl.col("measured_latency_us") > 0
).filter(
(pl.col("measured_latency_us") - 100) < pl.col("block_latency_us")
)
).sort(UPTIME_TIMESTAMP, descending=False)
return cls.from_df(block_df)

def __init__(self, table: pl.DataFrame):
Expand All @@ -216,4 +217,66 @@ def filtered_table(self) -> pl.DataFrame:
return self.table

def graphs(self) -> list[type[CollectionGraph]]:
return []
return [BlockQueueGraph]


class BlockQueueGraph(CollectionGraph):

@classmethod
def with_graph_engine(cls, graph_engine: GraphEngine) -> CollectionGraph | None:
block_table = graph_engine.collection_data.get(BlockIOTable)
if block_table is not None:
return BlockQueueGraph(
graph_engine=graph_engine,
block_table=block_table
)
return None

@classmethod
def base_name(cls) -> str:
return "Block IO Queue Sizes"

def __init__(
self,
graph_engine: GraphEngine,
block_table: BlockIOTable,
):
self.graph_engine = graph_engine
self.collection_data = self.graph_engine.collection_data
self._block_table = block_table

def name(self) -> str:
return f"{self.base_name()} for Collection {self.collection_data.id}"

def x_axis(self) -> str:
return "Benchmark Runtime (sec)"

def y_axis(self) -> str:
return "Queue Size"

def plot(self) -> None:
block_df = self._block_table.filtered_table()

# group by and plot by device
block_df_by_cpu = block_df.group_by("device")
for device, block_df_group in block_df_by_cpu:
self.graph_engine.plot(
self.collection_data.normalize_uptime_sec(block_df_group),
(block_df_group.select("queue_length_segment_ios")).to_series().to_list(),
label=f"Device {device[0]} Segment IOs",
)
self.graph_engine.plot(
self.collection_data.normalize_uptime_sec(block_df_group),
(block_df_group.select("queue_length_4k_ios")).to_series().to_list(),
label=f"Device {device[0]} 4K IOs",
)
self.graph_engine.plot(
self.collection_data.normalize_uptime_sec(block_df_group),
(block_df_group.select("block_io_latency_us")).to_series().to_list(),
label=f"Device {device[0]} Block IO Latency",
y_axis="IO Latency (us)",
linestyle="dashed",
)

def plot_trends(self) -> None:
return
5 changes: 1 addition & 4 deletions python/kernmlops/data_schema/memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,10 @@ def plot(self) -> None:

def plot_trends(self) -> None:
memory_df = self._memory_usage_table.filtered_table()
start_uptime_sec = self.collection_data.start_uptime_sec

for plot_line in self.plot_lines:
self.graph_engine.plot(
(
(memory_df.select(UPTIME_TIMESTAMP) / 1_000_000.0) - start_uptime_sec
).to_series().to_list(),
self.collection_data.normalize_uptime_sec(memory_df),
(memory_df.select(plot_line) / (1_024.0)**3).to_series().to_list(),
label=plot_line.replace("bytes", "gb"),
y_axis=self.y_axis(),
Expand Down
10 changes: 2 additions & 8 deletions python/kernmlops/data_schema/perf/perf_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,14 @@ def y_axis(self) -> str:

def plot(self) -> None:
pdf_df = self._perf_table.as_pdf()
start_uptime_sec = self.collection_data.start_uptime_sec
print(f"Total {self._perf_table.component_name()} {self._perf_table.measured_event_name()}: {self._perf_table.total_cumulative()}")

# group by and plot by cpu
def plot_rate(pdf_df: pl.DataFrame) -> None:
pdf_df_by_cpu = pdf_df.group_by("cpu")
for cpu, pdf_df_group in pdf_df_by_cpu:
self.graph_engine.plot(
(
(pdf_df_group.select(UPTIME_TIMESTAMP) / 1_000_000.0) - start_uptime_sec
).to_series().to_list(),
self.collection_data.normalize_uptime_sec(pdf_df_group),
(
pdf_df_group.select(self._perf_table.name()) / (
pdf_df_group.select("span_duration_us") / 1_000.0
Expand Down Expand Up @@ -231,16 +228,13 @@ def y_axis(self) -> str:

def plot(self) -> None:
cdf_df = self._perf_table.as_cdf()
start_uptime_sec = self.collection_data.start_uptime_sec

# group by and plot by cpu
def plot_cumulative(cdf_df: pl.DataFrame) -> None:
cdf_df_by_cpu = cdf_df.group_by("cpu")
for cpu, cdf_df_group in cdf_df_by_cpu:
self.graph_engine.plot(
(
(cdf_df_group.select(UPTIME_TIMESTAMP) / 1_000_000.0) - start_uptime_sec
).to_series().to_list(),
self.collection_data.normalize_uptime_sec(cdf_df_group),
(
cdf_df_group.select(self._perf_table.name())
).to_series().to_list(),
Expand Down
10 changes: 2 additions & 8 deletions python/kernmlops/data_schema/quanta_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,12 @@ def y_axis(self) -> str:

def plot(self) -> None:
quanta_df = self._quanta_table.filtered_table()
start_uptime_sec = self.collection_data.start_uptime_sec

# group by and plot by cpu
quanta_df_by_cpu = quanta_df.group_by("cpu")
for cpu, quanta_df_group in quanta_df_by_cpu:
self.graph_engine.scatter(
(
(quanta_df_group.select(UPTIME_TIMESTAMP) / 1_000_000.0) - start_uptime_sec
).to_series().to_list(),
self.collection_data.normalize_uptime_sec(quanta_df_group),
(quanta_df_group.select("quanta_run_length_us") / 1_000.0).to_series().to_list(),
label=f"CPU {cpu[0]}",
)
Expand Down Expand Up @@ -290,15 +287,12 @@ def y_axis(self) -> str:

def plot(self) -> None:
quanta_df = self._quanta_table.filtered_table()
start_uptime_sec = self.collection_data.start_uptime_sec

# group by and plot by cpu
quanta_df_by_cpu = quanta_df.group_by("cpu")
for cpu, quanta_df_group in quanta_df_by_cpu:
self.graph_engine.scatter(
(
(quanta_df_group.select(UPTIME_TIMESTAMP) / 1_000_000.0) - start_uptime_sec
).to_series().to_list(),
self.collection_data.normalize_uptime_sec(quanta_df_group),
(quanta_df_group.select("quanta_queued_time_us") / 1_000.0).to_series().to_list(),
label=f"CPU {cpu[0]}",
)
Expand Down
22 changes: 19 additions & 3 deletions python/kernmlops/data_schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ def dump(self, *, output_dir: Path | None, use_matplot: bool, no_trends: bool =
else:
print(f"{name}: {table.table}")

def normalize_uptime_sec(self, table_df: pl.DataFrame) -> list[int]:
return (
(table_df.select(UPTIME_TIMESTAMP) / 1_000_000.0) - self.start_uptime_sec
).to_series().to_list()

@classmethod
def from_tables(
cls,
Expand Down Expand Up @@ -327,14 +332,25 @@ def _show(self) -> None:
def scatter(self, x_data: list[float], y_data: list[float], *, label: str) -> None:
self._plt.scatter(x_data, y_data, label=label)

def plot(self, x_data: list[float], y_data: list[float], *, label: str, y_axis: str | None = None) -> None:
def plot(
self,
x_data: list[float],
y_data: list[float],
*,
label: str,
y_axis: str | None = None,
linestyle: str | None = None,
) -> None:
if not y_axis or y_axis == self._y_axis:
self._plt.plot(x_data, y_data, label=label)
if not linestyle or self._plt is plotext:
self._plt.plot(x_data, y_data, label=label)
else:
self._plt.plot(x_data, y_data, label=label, linestyle=linestyle)
elif self._ax is not None:
if self._ax2 is None:
self._ax2 = self._ax.twinx()
self._ax2.set_ylabel(y_axis)
self._ax2.plot(x_data, y_data, label=label)
self._ax2.plot(x_data, y_data, label=label, linestyle=linestyle)
elif self._plt is plotext:
plotext.plot(x_data, y_data, label=label, yside="right")
plotext.ylabel(label=y_axis, yside="right")
Expand Down

0 comments on commit edf6cee

Please sign in to comment.