From 17de3a747709e3e5a94a0ba2d7acbc09379d2b8b Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 18 Apr 2024 11:32:02 +0200 Subject: [PATCH] Streamline IO types (#114) --- queries/common_utils.py | 12 +++++++---- queries/dask/utils.py | 19 +++++++++-------- queries/duckdb/utils.py | 46 +++++++++++++++++----------------------- queries/modin/utils.py | 12 ++++++----- queries/pandas/utils.py | 12 ++++++----- queries/polars/utils.py | 31 ++++++++++++++------------- queries/pyspark/utils.py | 18 ++++++++++------ scripts/plot_bars.py | 37 ++++++++++++++------------------ settings.py | 10 ++++++--- 9 files changed, 101 insertions(+), 96 deletions(-) diff --git a/queries/common_utils.py b/queries/common_utils.py index f392f5d..9fc0d1d 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -20,6 +20,12 @@ settings = Settings() +def get_table_path(table_name: str) -> Path: + """Return the path to the given table.""" + ext = settings.run.io_type if settings.run.include_io else "parquet" + return settings.dataset_base_dir / f"{table_name}.{ext}" + + def log_query_timing( solution: str, version: str, query_number: int, time: float ) -> None: @@ -27,9 +33,7 @@ def log_query_timing( with (settings.paths.timings / settings.paths.timings_filename).open("a") as f: if f.tell() == 0: - f.write( - "solution,version,query_number,duration[s],include_io,scale_factor\n" - ) + f.write("solution,version,query_number,duration[s],io_type,scale_factor\n") line = ( ",".join( @@ -38,7 +42,7 @@ def log_query_timing( version, str(query_number), str(time), - str(settings.run.include_io), + settings.run.io_type, str(settings.scale_factor), ] ) diff --git a/queries/dask/utils.py b/queries/dask/utils.py index a7666b1..dbc99cf 100644 --- a/queries/dask/utils.py +++ b/queries/dask/utils.py @@ -7,6 +7,7 @@ from queries.common_utils import ( check_query_result_pd, + get_table_path, on_second_call, run_query_generic, ) @@ -23,26 +24,26 @@ def read_ds(table_name: str) -> DataFrame: - # TODO: Load into memory before returning the Dask DataFrame. - # Code below is tripped up by date types - # df = pd.read_parquet(path, dtype_backend="pyarrow") - # return dd.from_pandas(df, npartitions=os.cpu_count()) - if not settings.run.include_io: + if settings.run.io_type == "skip": + # TODO: Load into memory before returning the Dask DataFrame. + # Code below is tripped up by date types + # df = pd.read_parquet(path, dtype_backend="pyarrow") + # return dd.from_pandas(df, npartitions=os.cpu_count()) msg = "cannot run Dask starting from an in-memory representation" raise RuntimeError(msg) - path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}" + path = get_table_path(table_name) - if settings.run.file_type == "parquet": + if settings.run.io_type == "parquet": return dd.read_parquet(path, dtype_backend="pyarrow") # type: ignore[attr-defined,no-any-return] - elif settings.run.file_type == "csv": + elif settings.run.io_type == "csv": df = dd.read_csv(path, dtype_backend="pyarrow") # type: ignore[attr-defined] for c in df.columns: if c.endswith("date"): df[c] = df[c].astype("date32[day][pyarrow]") return df # type: ignore[no-any-return] else: - msg = f"unsupported file type: {settings.run.file_type!r}" + msg = f"unsupported file type: {settings.run.io_type!r}" raise ValueError(msg) diff --git a/queries/duckdb/utils.py b/queries/duckdb/utils.py index 8f88f4b..3f15b58 100644 --- a/queries/duckdb/utils.py +++ b/queries/duckdb/utils.py @@ -1,43 +1,35 @@ import duckdb from duckdb import DuckDBPyRelation -from queries.common_utils import check_query_result_pl, run_query_generic +from queries.common_utils import ( + check_query_result_pl, + get_table_path, + run_query_generic, +) from settings import Settings settings = Settings() def _scan_ds(table_name: str) -> str: - path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}" + path = get_table_path(table_name) path_str = str(path) - if settings.run.file_type == "parquet": - if settings.run.include_io: - duckdb.read_parquet(path_str) - return f"'{path_str}'" - else: - name = path_str.replace("/", "_").replace(".", "_").replace("-", "_") - duckdb.sql( - f"create temp table if not exists {name} as select * from read_parquet('{path_str}');" - ) - return name - if settings.run.file_type == "csv": - if settings.run.include_io: - duckdb.read_csv(path_str) - return f"'{path_str}'" - else: - name = path_str.replace("/", "_").replace(".", "_").replace("-", "_") - duckdb.sql( - f"create temp table if not exists {name} as select * from read_csv('{path_str}');" - ) - return name - elif settings.run.file_type == "feather": - msg = "duckdb does not support feather for now" - raise ValueError(msg) + if settings.run.io_type == "skip": + name = path_str.replace("/", "_").replace(".", "_").replace("-", "_") + duckdb.sql( + f"create temp table if not exists {name} as select * from read_parquet('{path_str}');" + ) + return name + elif settings.run.io_type == "parquet": + duckdb.read_parquet(path_str) + return f"'{path_str}'" + elif settings.run.io_type == "csv": + duckdb.read_csv(path_str) + return f"'{path_str}'" else: - msg = f"unsupported file type: {settings.run.file_type!r}" + msg = f"unsupported file type: {settings.run.io_type!r}" raise ValueError(msg) - return path_str def get_line_item_ds() -> str: diff --git a/queries/modin/utils.py b/queries/modin/utils.py index 3a8d7a6..28d0b9c 100644 --- a/queries/modin/utils.py +++ b/queries/modin/utils.py @@ -7,6 +7,7 @@ from queries.common_utils import ( check_query_result_pd, + get_table_path, on_second_call, run_query_generic, ) @@ -23,20 +24,21 @@ def _read_ds(table_name: str) -> pd.DataFrame: - path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}" + path = get_table_path(table_name) - if settings.run.file_type == "parquet": + if settings.run.io_type in ("parquet", "skip"): return pd.read_parquet(path, dtype_backend="pyarrow") - elif settings.run.file_type == "csv": + elif settings.run.io_type == "csv": df = pd.read_csv(path, dtype_backend="pyarrow") + # TODO: This is slow - we should use the known schema to read dates directly for c in df.columns: if c.endswith("date"): df[c] = df[c].astype("date32[day][pyarrow]") return df - elif settings.run.file_type == "feather": + elif settings.run.io_type == "feather": return pd.read_feather(path, dtype_backend="pyarrow") else: - msg = f"unsupported file type: {settings.run.file_type!r}" + msg = f"unsupported file type: {settings.run.io_type!r}" raise ValueError(msg) diff --git a/queries/pandas/utils.py b/queries/pandas/utils.py index f29c071..e7b8d70 100644 --- a/queries/pandas/utils.py +++ b/queries/pandas/utils.py @@ -6,6 +6,7 @@ from queries.common_utils import ( check_query_result_pd, + get_table_path, on_second_call, run_query_generic, ) @@ -20,20 +21,21 @@ def _read_ds(table_name: str) -> pd.DataFrame: - path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}" + path = get_table_path(table_name) - if settings.run.file_type == "parquet": + if settings.run.io_type in ("parquet", "skip"): return pd.read_parquet(path, dtype_backend="pyarrow") - elif settings.run.file_type == "csv": + elif settings.run.io_type == "csv": df = pd.read_csv(path, dtype_backend="pyarrow") + # TODO: This is slow - we should use the known schema to read dates directly for c in df.columns: if c.endswith("date"): df[c] = df[c].astype("date32[day][pyarrow]") # type: ignore[call-overload] return df - elif settings.run.file_type == "feather": + elif settings.run.io_type == "feather": return pd.read_feather(path, dtype_backend="pyarrow") else: - msg = f"unsupported file type: {settings.run.file_type!r}" + msg = f"unsupported file type: {settings.run.io_type!r}" raise ValueError(msg) diff --git a/queries/polars/utils.py b/queries/polars/utils.py index 4ddbac0..19db862 100644 --- a/queries/polars/utils.py +++ b/queries/polars/utils.py @@ -2,30 +2,31 @@ import polars as pl -from queries.common_utils import check_query_result_pl, run_query_generic +from queries.common_utils import ( + check_query_result_pl, + get_table_path, + run_query_generic, +) from settings import Settings settings = Settings() def _scan_ds(table_name: str) -> pl.LazyFrame: - path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}" - - if settings.run.file_type == "parquet": - scan = pl.scan_parquet(path) - elif settings.run.file_type == "feather": - scan = pl.scan_ipc(path) - elif settings.run.file_type == "csv": - scan = pl.scan_csv(path, try_parse_dates=True) + path = get_table_path(table_name) + + if settings.run.io_type == "skip": + return pl.read_parquet(path, rechunk=True).lazy() + if settings.run.io_type == "parquet": + return pl.scan_parquet(path) + elif settings.run.io_type == "feather": + return pl.scan_ipc(path) + elif settings.run.io_type == "csv": + return pl.scan_csv(path, try_parse_dates=True) else: - msg = f"unsupported file type: {settings.run.file_type!r}" + msg = f"unsupported file type: {settings.run.io_type!r}" raise ValueError(msg) - if settings.run.include_io: - return scan - else: - return scan.collect().rechunk().lazy() - def get_line_item_ds() -> pl.LazyFrame: return _scan_ds("lineitem") diff --git a/queries/pyspark/utils.py b/queries/pyspark/utils.py index 711bc97..ad3558c 100644 --- a/queries/pyspark/utils.py +++ b/queries/pyspark/utils.py @@ -4,7 +4,11 @@ from pyspark.sql import SparkSession -from queries.common_utils import check_query_result_pd, run_query_generic +from queries.common_utils import ( + check_query_result_pd, + get_table_path, + run_query_generic, +) from settings import Settings if TYPE_CHECKING: @@ -26,19 +30,19 @@ def get_or_create_spark() -> SparkSession: def _read_ds(table_name: str) -> DataFrame: - # TODO: Persist data in memory before query - if not settings.run.include_io: + if settings.run.io_type == "skip": + # TODO: Persist data in memory before query msg = "cannot run PySpark starting from an in-memory representation" raise RuntimeError(msg) - path = settings.dataset_base_dir / f"{table_name}.{settings.run.file_type}" + path = get_table_path(table_name) - if settings.run.file_type == "parquet": + if settings.run.io_type == "parquet": df = get_or_create_spark().read.parquet(str(path)) - elif settings.run.file_type == "csv": + elif settings.run.io_type == "csv": df = get_or_create_spark().read.csv(str(path), header=True, inferSchema=True) else: - msg = f"unsupported file type: {settings.run.file_type!r}" + msg = f"unsupported file type: {settings.run.io_type!r}" raise ValueError(msg) df.createOrReplaceTempView(table_name) diff --git a/scripts/plot_bars.py b/scripts/plot_bars.py index 99f8d87..97f17ed 100644 --- a/scripts/plot_bars.py +++ b/scripts/plot_bars.py @@ -19,15 +19,10 @@ if TYPE_CHECKING: from plotly.graph_objects import Figure - from settings import FileType + from settings import IoType settings = Settings() -if settings.run.include_io: - LIMIT = settings.plot.limit_with_io -else: - LIMIT = settings.plot.limit_without_io - COLORS = { "polars": "#0075FF", @@ -49,6 +44,14 @@ "pyspark": "PySpark", } +Y_LIMIT_MAP = { + "skip": 15.0, + "parquet": 20.0, + "csv": 25.0, + "feather": 20.0, +} +LIMIT = Y_LIMIT_MAP[settings.run.io_type] + def main() -> None: pl.Config.set_tbl_rows(100) @@ -62,12 +65,8 @@ def prep_data() -> pl.DataFrame: # Scale factor not used at the moment lf = lf.drop("scale_factor") - # Select timings either with or without IO - if settings.run.include_io: - io = pl.col("include_io") - else: - io = ~pl.col("include_io") - lf = lf.filter(io).drop("include_io") + # Select timings with the right IO type + lf = lf.filter(pl.col("io_type") == settings.run.io_type).drop("io_type") # Select relevant queries lf = lf.filter(pl.col("query_number") <= settings.plot.n_queries) @@ -118,7 +117,7 @@ def plot(df: pl.DataFrame) -> Figure: fig.update_layout( title={ - "text": get_title(settings.run.include_io, settings.run.file_type), + "text": get_title(settings.run.io_type), "y": 0.95, "yanchor": "top", }, @@ -147,12 +146,12 @@ def plot(df: pl.DataFrame) -> Figure: fig.show() -def get_title(include_io: bool, file_type: FileType) -> str: - if not include_io: +def get_title(io_type: IoType) -> str: + if io_type == "skip": title = "Runtime excluding data read from disk" else: file_type_map = {"parquet": "Parquet", "csv": "CSV", "feather": "Feather"} - file_type_formatted = file_type_map[file_type] + file_type_formatted = file_type_map[io_type] title = f"Runtime including data read from disk ({file_type_formatted})" subtitle = "(lower is better)" @@ -219,11 +218,7 @@ def write_plot_image(fig: Any) -> None: if not path.exists(): path.mkdir() - if settings.run.include_io: - file_name = f"plot-io-{settings.run.file_type}.html" - else: - file_name = "plot-no-io.html" - + file_name = f"plot-io-{settings.run.io_type}.html" print(path / file_name) fig.write_html(path / file_name) diff --git a/settings.py b/settings.py index fe1ac17..4cc2c4f 100644 --- a/settings.py +++ b/settings.py @@ -4,7 +4,7 @@ from pydantic import computed_field from pydantic_settings import BaseSettings, SettingsConfigDict -FileType: TypeAlias = Literal["parquet", "feather", "csv"] +IoType: TypeAlias = Literal["skip", "parquet", "feather", "csv"] class Paths(BaseSettings): @@ -22,8 +22,7 @@ class Paths(BaseSettings): class Run(BaseSettings): - include_io: bool = True - file_type: FileType = "parquet" + io_type: IoType = "parquet" log_timings: bool = False show_results: bool = False @@ -39,6 +38,11 @@ class Run(BaseSettings): spark_executor_memory: str = "1g" # Tune as needed for optimal performance spark_log_level: str = "ERROR" + @computed_field # type: ignore[misc] + @property + def include_io(self) -> bool: + return self.io_type != "skip" + model_config = SettingsConfigDict( env_prefix="run_", env_file=".env", extra="ignore" )