Skip to content

Commit

Permalink
Add merged table for hugepage data (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickkenney9801 authored Dec 11, 2024
1 parent db4069b commit 834e3f3
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from dataclasses import dataclass
from pathlib import Path
from typing import cast

import polars as pl
from bcc import BPF
from data_collection.bpf_instrumentation.bpf_hook import POLL_TIMEOUT_MS, BPFProgram
from data_schema import CollectionTable
from data_schema.huge_pages import (
CollapseHugePageDataTable,
CollapseHugePageDataTableRaw,
TraceMMCollapseHugePageDataTable,
TraceMMKhugepagedScanPMDDataTable,
)
Expand Down Expand Up @@ -79,12 +81,20 @@ def close(self):

def data(self) -> list[CollectionTable]:
return [
CollapseHugePageDataTable.from_df_id(
pl.DataFrame(self.collapse_huge_pages),
collection_id = self.collection_id,),
TraceMMCollapseHugePageDataTable.from_df_id(
pl.DataFrame(self.trace_mm_collapse_huge_pages),
collection_id = self.collection_id,),
CollapseHugePageDataTable.from_tables(
collapse_table=cast(
CollapseHugePageDataTableRaw,
CollapseHugePageDataTableRaw.from_df_id(
pl.DataFrame(self.collapse_huge_pages),
collection_id = self.collection_id,),
),
trace_mm_table=cast(
TraceMMCollapseHugePageDataTable,
TraceMMCollapseHugePageDataTable.from_df_id(
pl.DataFrame(self.trace_mm_collapse_huge_pages),
collection_id = self.collection_id,),
),
),
TraceMMKhugepagedScanPMDDataTable.from_df_id(
pl.DataFrame(self.trace_mm_khugepaged_scan_pmds),
collection_id = self.collection_id,),
Expand Down
2 changes: 2 additions & 0 deletions python/kernmlops/data_schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from data_schema import perf
from data_schema.block_io import BlockIOLatencyTable, BlockIOQueueTable, BlockIOTable
from data_schema.file_data import FileDataTable
from data_schema.huge_pages import CollapseHugePageDataTable
from data_schema.memory_usage import MemoryUsageTable
from data_schema.process_metadata import ProcessMetadataTable
from data_schema.quanta_runtime import QuantaQueuedTable, QuantaRuntimeTable
Expand All @@ -31,6 +32,7 @@
BlockIOLatencyTable,
BlockIOQueueTable,
BlockIOTable,
CollapseHugePageDataTable,
] + list(perf.perf_table_types.values())

def demote(user_id: int | None = None, group_id: int | None = None) -> Callable[[], None]:
Expand Down
55 changes: 52 additions & 3 deletions python/kernmlops/data_schema/huge_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def by_pid(self, pids: int | list[int]) -> pl.DataFrame:
pids = [pids]
return self.filtered_table().filter(pl.col("pid").is_in(pids))

class CollapseHugePageDataTable(CollectionTable):
class CollapseHugePageDataTableRaw(CollectionTable):

@classmethod
def name(cls) -> str:
Expand All @@ -48,8 +48,8 @@ def schema(cls) -> pl.Schema:
return pl.Schema()

@classmethod
def from_df(cls, table: pl.DataFrame) -> "CollapseHugePageDataTable":
return CollapseHugePageDataTable(table=table)
def from_df(cls, table: pl.DataFrame) -> "CollapseHugePageDataTableRaw":
return CollapseHugePageDataTableRaw(table=table)

def __init__(self, table: pl.DataFrame):
self._table = table
Expand Down Expand Up @@ -100,3 +100,52 @@ def by_pid(self, pids: int | list[int]) -> pl.DataFrame:
if isinstance(pids, int):
pids = [pids]
return self.filtered_table().filter(pl.col("pid").is_in(pids))


class CollapseHugePageDataTable(CollectionTable):
"""Best effort merged table of CollapseHugePageDataTableRaw and TraceMMCollapseHugePageDataTable."""

@classmethod
def name(cls) -> str:
return "collapse_hugepage"

@classmethod
def schema(cls) -> pl.Schema:
return pl.Schema()

@classmethod
def from_df(cls, table: pl.DataFrame) -> "CollapseHugePageDataTable":
return CollapseHugePageDataTable(table=table) #.cast(cls.schema(), strict=True)) # pyright: ignore [reportArgumentType]

@classmethod
def from_tables(cls, collapse_table: CollapseHugePageDataTableRaw, trace_mm_table: TraceMMCollapseHugePageDataTable) -> "CollapseHugePageDataTable":
collapse_df = collapse_table.filtered_table().sort("start_ts_ns", descending=False)
trace_mm_df = trace_mm_table.filtered_table().sort("start_ts_ns", descending=False)
if trace_mm_df.row(0, named=True)["start_ts_ns"] < collapse_df.row(0, named=True)["start_ts_ns"]:
collapse_df = collapse_df[1:]
assert len(collapse_df) == len(trace_mm_df)
collapse_df = collapse_df.drop([
"end_ts_ns",
])
trace_mm_df = trace_mm_df.drop([
"pid",
"tgid",
"start_ts_ns",
"end_ts_ns",
"mm",
"collection_id",
])
return cls.from_df(pl.concat([collapse_df, trace_mm_df], how="horizontal"))

def __init__(self, table: pl.DataFrame):
self._table = table

@property
def table(self) -> pl.DataFrame:
return self._table

def filtered_table(self) -> pl.DataFrame:
return self.table

def graphs(self) -> list[type[CollectionGraph]]:
return []

0 comments on commit 834e3f3

Please sign in to comment.