Skip to content

Commit

Permalink
Streamline IO types (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego authored Apr 18, 2024
1 parent 9876026 commit 17de3a7
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 96 deletions.
12 changes: 8 additions & 4 deletions queries/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
settings = Settings()


def get_table_path(table_name: str) -> Path:
"""Return the path to the given table."""
ext = settings.run.io_type if settings.run.include_io else "parquet"
return settings.dataset_base_dir / f"{table_name}.{ext}"


def log_query_timing(
solution: str, version: str, query_number: int, time: float
) -> None:
settings.paths.timings.mkdir(parents=True, exist_ok=True)

with (settings.paths.timings / settings.paths.timings_filename).open("a") as f:
if f.tell() == 0:
f.write(
"solution,version,query_number,duration[s],include_io,scale_factor\n"
)
f.write("solution,version,query_number,duration[s],io_type,scale_factor\n")

line = (
",".join(
Expand All @@ -38,7 +42,7 @@ def log_query_timing(
version,
str(query_number),
str(time),
str(settings.run.include_io),
settings.run.io_type,
str(settings.scale_factor),
]
)
Expand Down
19 changes: 10 additions & 9 deletions queries/dask/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from queries.common_utils import (
check_query_result_pd,
get_table_path,
on_second_call,
run_query_generic,
)
Expand All @@ -23,26 +24,26 @@


def read_ds(table_name: str) -> DataFrame:
# TODO: Load into memory before returning the Dask DataFrame.
# Code below is tripped up by date types
# df = pd.read_parquet(path, dtype_backend="pyarrow")
# return dd.from_pandas(df, npartitions=os.cpu_count())
if not settings.run.include_io:
if settings.run.io_type == "skip":
# TODO: Load into memory before returning the Dask DataFrame.
# Code below is tripped up by date types
# df = pd.read_parquet(path, dtype_backend="pyarrow")
# return dd.from_pandas(df, npartitions=os.cpu_count())
msg = "cannot run Dask starting from an in-memory representation"
raise RuntimeError(msg)

path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}"
path = get_table_path(table_name)

if settings.run.file_type == "parquet":
if settings.run.io_type == "parquet":
return dd.read_parquet(path, dtype_backend="pyarrow") # type: ignore[attr-defined,no-any-return]
elif settings.run.file_type == "csv":
elif settings.run.io_type == "csv":
df = dd.read_csv(path, dtype_backend="pyarrow") # type: ignore[attr-defined]
for c in df.columns:
if c.endswith("date"):
df[c] = df[c].astype("date32[day][pyarrow]")
return df # type: ignore[no-any-return]
else:
msg = f"unsupported file type: {settings.run.file_type!r}"
msg = f"unsupported file type: {settings.run.io_type!r}"
raise ValueError(msg)


Expand Down
46 changes: 19 additions & 27 deletions queries/duckdb/utils.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,35 @@
import duckdb
from duckdb import DuckDBPyRelation

from queries.common_utils import check_query_result_pl, run_query_generic
from queries.common_utils import (
check_query_result_pl,
get_table_path,
run_query_generic,
)
from settings import Settings

settings = Settings()


def _scan_ds(table_name: str) -> str:
path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}"
path = get_table_path(table_name)
path_str = str(path)

if settings.run.file_type == "parquet":
if settings.run.include_io:
duckdb.read_parquet(path_str)
return f"'{path_str}'"
else:
name = path_str.replace("/", "_").replace(".", "_").replace("-", "_")
duckdb.sql(
f"create temp table if not exists {name} as select * from read_parquet('{path_str}');"
)
return name
if settings.run.file_type == "csv":
if settings.run.include_io:
duckdb.read_csv(path_str)
return f"'{path_str}'"
else:
name = path_str.replace("/", "_").replace(".", "_").replace("-", "_")
duckdb.sql(
f"create temp table if not exists {name} as select * from read_csv('{path_str}');"
)
return name
elif settings.run.file_type == "feather":
msg = "duckdb does not support feather for now"
raise ValueError(msg)
if settings.run.io_type == "skip":
name = path_str.replace("/", "_").replace(".", "_").replace("-", "_")
duckdb.sql(
f"create temp table if not exists {name} as select * from read_parquet('{path_str}');"
)
return name
elif settings.run.io_type == "parquet":
duckdb.read_parquet(path_str)
return f"'{path_str}'"
elif settings.run.io_type == "csv":
duckdb.read_csv(path_str)
return f"'{path_str}'"
else:
msg = f"unsupported file type: {settings.run.file_type!r}"
msg = f"unsupported file type: {settings.run.io_type!r}"
raise ValueError(msg)
return path_str


def get_line_item_ds() -> str:
Expand Down
12 changes: 7 additions & 5 deletions queries/modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from queries.common_utils import (
check_query_result_pd,
get_table_path,
on_second_call,
run_query_generic,
)
Expand All @@ -23,20 +24,21 @@


def _read_ds(table_name: str) -> pd.DataFrame:
path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}"
path = get_table_path(table_name)

if settings.run.file_type == "parquet":
if settings.run.io_type in ("parquet", "skip"):
return pd.read_parquet(path, dtype_backend="pyarrow")
elif settings.run.file_type == "csv":
elif settings.run.io_type == "csv":
df = pd.read_csv(path, dtype_backend="pyarrow")
# TODO: This is slow - we should use the known schema to read dates directly
for c in df.columns:
if c.endswith("date"):
df[c] = df[c].astype("date32[day][pyarrow]")
return df
elif settings.run.file_type == "feather":
elif settings.run.io_type == "feather":
return pd.read_feather(path, dtype_backend="pyarrow")
else:
msg = f"unsupported file type: {settings.run.file_type!r}"
msg = f"unsupported file type: {settings.run.io_type!r}"
raise ValueError(msg)


Expand Down
12 changes: 7 additions & 5 deletions queries/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from queries.common_utils import (
check_query_result_pd,
get_table_path,
on_second_call,
run_query_generic,
)
Expand All @@ -20,20 +21,21 @@


def _read_ds(table_name: str) -> pd.DataFrame:
path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}"
path = get_table_path(table_name)

if settings.run.file_type == "parquet":
if settings.run.io_type in ("parquet", "skip"):
return pd.read_parquet(path, dtype_backend="pyarrow")
elif settings.run.file_type == "csv":
elif settings.run.io_type == "csv":
df = pd.read_csv(path, dtype_backend="pyarrow")
# TODO: This is slow - we should use the known schema to read dates directly
for c in df.columns:
if c.endswith("date"):
df[c] = df[c].astype("date32[day][pyarrow]") # type: ignore[call-overload]
return df
elif settings.run.file_type == "feather":
elif settings.run.io_type == "feather":
return pd.read_feather(path, dtype_backend="pyarrow")
else:
msg = f"unsupported file type: {settings.run.file_type!r}"
msg = f"unsupported file type: {settings.run.io_type!r}"
raise ValueError(msg)


Expand Down
31 changes: 16 additions & 15 deletions queries/polars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@

import polars as pl

from queries.common_utils import check_query_result_pl, run_query_generic
from queries.common_utils import (
check_query_result_pl,
get_table_path,
run_query_generic,
)
from settings import Settings

settings = Settings()


def _scan_ds(table_name: str) -> pl.LazyFrame:
path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}"

if settings.run.file_type == "parquet":
scan = pl.scan_parquet(path)
elif settings.run.file_type == "feather":
scan = pl.scan_ipc(path)
elif settings.run.file_type == "csv":
scan = pl.scan_csv(path, try_parse_dates=True)
path = get_table_path(table_name)

if settings.run.io_type == "skip":
return pl.read_parquet(path, rechunk=True).lazy()
if settings.run.io_type == "parquet":
return pl.scan_parquet(path)
elif settings.run.io_type == "feather":
return pl.scan_ipc(path)
elif settings.run.io_type == "csv":
return pl.scan_csv(path, try_parse_dates=True)
else:
msg = f"unsupported file type: {settings.run.file_type!r}"
msg = f"unsupported file type: {settings.run.io_type!r}"
raise ValueError(msg)

if settings.run.include_io:
return scan
else:
return scan.collect().rechunk().lazy()


def get_line_item_ds() -> pl.LazyFrame:
return _scan_ds("lineitem")
Expand Down
18 changes: 11 additions & 7 deletions queries/pyspark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from pyspark.sql import SparkSession

from queries.common_utils import check_query_result_pd, run_query_generic
from queries.common_utils import (
check_query_result_pd,
get_table_path,
run_query_generic,
)
from settings import Settings

if TYPE_CHECKING:
Expand All @@ -26,19 +30,19 @@ def get_or_create_spark() -> SparkSession:


def _read_ds(table_name: str) -> DataFrame:
# TODO: Persist data in memory before query
if not settings.run.include_io:
if settings.run.io_type == "skip":
# TODO: Persist data in memory before query
msg = "cannot run PySpark starting from an in-memory representation"
raise RuntimeError(msg)

path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}"
path = get_table_path(table_name)

if settings.run.file_type == "parquet":
if settings.run.io_type == "parquet":
df = get_or_create_spark().read.parquet(str(path))
elif settings.run.file_type == "csv":
elif settings.run.io_type == "csv":
df = get_or_create_spark().read.csv(str(path), header=True, inferSchema=True)
else:
msg = f"unsupported file type: {settings.run.file_type!r}"
msg = f"unsupported file type: {settings.run.io_type!r}"
raise ValueError(msg)

df.createOrReplaceTempView(table_name)
Expand Down
37 changes: 16 additions & 21 deletions scripts/plot_bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@
if TYPE_CHECKING:
from plotly.graph_objects import Figure

from settings import FileType
from settings import IoType

settings = Settings()

if settings.run.include_io:
LIMIT = settings.plot.limit_with_io
else:
LIMIT = settings.plot.limit_without_io


COLORS = {
"polars": "#0075FF",
Expand All @@ -49,6 +44,14 @@
"pyspark": "PySpark",
}

Y_LIMIT_MAP = {
"skip": 15.0,
"parquet": 20.0,
"csv": 25.0,
"feather": 20.0,
}
LIMIT = Y_LIMIT_MAP[settings.run.io_type]


def main() -> None:
pl.Config.set_tbl_rows(100)
Expand All @@ -62,12 +65,8 @@ def prep_data() -> pl.DataFrame:
# Scale factor not used at the moment
lf = lf.drop("scale_factor")

# Select timings either with or without IO
if settings.run.include_io:
io = pl.col("include_io")
else:
io = ~pl.col("include_io")
lf = lf.filter(io).drop("include_io")
# Select timings with the right IO type
lf = lf.filter(pl.col("io_type") == settings.run.io_type).drop("io_type")

# Select relevant queries
lf = lf.filter(pl.col("query_number") <= settings.plot.n_queries)
Expand Down Expand Up @@ -118,7 +117,7 @@ def plot(df: pl.DataFrame) -> Figure:

fig.update_layout(
title={
"text": get_title(settings.run.include_io, settings.run.file_type),
"text": get_title(settings.run.io_type),
"y": 0.95,
"yanchor": "top",
},
Expand Down Expand Up @@ -147,12 +146,12 @@ def plot(df: pl.DataFrame) -> Figure:
fig.show()


def get_title(include_io: bool, file_type: FileType) -> str:
if not include_io:
def get_title(io_type: IoType) -> str:
if io_type == "skip":
title = "Runtime excluding data read from disk"
else:
file_type_map = {"parquet": "Parquet", "csv": "CSV", "feather": "Feather"}
file_type_formatted = file_type_map[file_type]
file_type_formatted = file_type_map[io_type]
title = f"Runtime including data read from disk ({file_type_formatted})"

subtitle = "(lower is better)"
Expand Down Expand Up @@ -219,11 +218,7 @@ def write_plot_image(fig: Any) -> None:
if not path.exists():
path.mkdir()

if settings.run.include_io:
file_name = f"plot-io-{settings.run.file_type}.html"
else:
file_name = "plot-no-io.html"

file_name = f"plot-io-{settings.run.io_type}.html"
print(path / file_name)

fig.write_html(path / file_name)
Expand Down
Loading

0 comments on commit 17de3a7

Please sign in to comment.