Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init: m5 forecasting FE benchmark #136

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions m5-forecasting-feature-engineering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# M5 Forecasting Feature Engineering

The [M5 Forecasting Competition](https://www.sciencedirect.com/science/article/pii/S0169207021001874) was held on Kaggle in 2020,
and top solutions generally featured a lot of heavy feature engineering.

Participants typically used pandas (Polars was only just getting started at the time), so here we benchmark how long it have
taken to do the same feature engineering with Polars (and, coming soon, DuckDB).

We believe this to be a useful task to benchmark, because:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove L9-L12.

I think this can serve as a basis for more time-series related benchmarks on this datasets. I don't think we have to strictly limit to what was used in the kaggle competition.


- the competition was run on real-world Walmart data
- the operations we're benchmarking are from the winning solution, so evidently they were doing something right

The original code can be found here: https://github.com/Mcompetitions/M5-methods. We run part of the prepocessing
functions from the top solution "A1". The code is generally kept as-is, with some minor modifications to deal
with pandas syntax updates which happened in the last 2 years.

## Running the benchmark

**Data**: download the output files from https://www.kaggle.com/code/marcogorelli/winning-solution-preprocessing.
Place them in a `data` folder here.

**Run**:

- `python pandas_queries.py`
- `python polars_queries.py`

1 change: 1 addition & 0 deletions m5-forecasting-feature-engineering/duckdb_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# coming soon ;)
73 changes: 73 additions & 0 deletions m5-forecasting-feature-engineering/pandas_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import time
from pathlib import Path

import numpy as np
import pandas as pd
import pyarrow

print("pandas version", pd.__version__)
print("numpy version", np.__version__)
print("pyarrow version", pyarrow.__version__)

pd.options.mode.copy_on_write = True
pd.options.future.infer_string = True

PROCESSED_DATA_DIR = "data"

TARGET = "sales"
SHIFT_DAY = 28

# Set this to True if you just want to test that everything runs
SMALL = True
if SMALL:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1_small.parquet"
else:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1.parquet"

LAG_DAYS = list(range(SHIFT_DAY, SHIFT_DAY + 15))


def q1_pandas(df):
return df.assign(
**{
f"{TARGET}_lag_{lag}": df.groupby(["id"], observed=True)[TARGET].transform(
lambda x: x.shift(lag) # noqa: B023
)
for lag in LAG_DAYS
}
)


def q2_pandas(df):
for i in [7, 14, 30, 60, 180]:
df["rolling_mean_" + str(i)] = df.groupby(["id"], observed=True)[
TARGET
].transform(lambda x: x.shift(SHIFT_DAY).rolling(i).mean()) # noqa: B023
for i in [7, 14, 30, 60, 180]:
df["rolling_std_" + str(i)] = df.groupby(["id"], observed=True)[
TARGET
].transform(lambda x: x.shift(SHIFT_DAY).rolling(i).std()) # noqa: B023
return df


def q3_pandas(df):
for d_shift in [1, 7, 14]:
for d_window in [7, 14, 30, 60]:
col_name = "rolling_mean_" + str(d_shift) + "_" + str(d_window)
df[col_name] = df.groupby(["id"], observed=True)[TARGET].transform(
lambda x: x.shift(d_shift).rolling(d_window).mean() # noqa: B023
)
return df


start_time = time.perf_counter()
q1_pandas(pd.read_parquet(PATH, engine="pyarrow"))
print(f"q1 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q2_pandas(pd.read_parquet(PATH, engine="pyarrow"))
print(f"q2 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q3_pandas(pd.read_parquet(PATH, engine="pyarrow"))
print(f"q2 took: {time.perf_counter() - start_time}")
90 changes: 90 additions & 0 deletions m5-forecasting-feature-engineering/polars_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import time
from pathlib import Path

import polars as pl

print("polars version", pl.__version__)

PROCESSED_DATA_DIR = "data"

TARGET = "sales"
SHIFT_DAY = 28


# Set this to True if you just want to test that everything runs
SMALL = True
if SMALL:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1_small.parquet"
else:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1.parquet"

LAG_DAYS = list(range(SHIFT_DAY, SHIFT_DAY + 15))


def q1_polars(df):
return df.with_columns(
pl.col(TARGET).shift(lag).over("id").alias(f"{TARGET}_lag_{lag}")
for lag in LAG_DAYS
)


def q2_polars(df):
return df.with_columns(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the select + explode mapping here?

*[
pl.col(TARGET)
.shift(SHIFT_DAY)
.rolling_mean(window_size=i)
.over("id")
.alias(f"rolling_mean_{i}")
for i in [7, 14, 30, 60, 180]
],
*[
pl.col(TARGET)
.shift(SHIFT_DAY)
.rolling_std(window_size=i)
.over("id")
.alias(f"rolling_std_{i}")
for i in [7, 14, 30, 60, 180]
],
)


def q3_polars(df):
return df.with_columns(
pl.col(TARGET)
.shift(d_shift)
.rolling_mean(window_size=d_window)
.over("id")
.alias(f"rolling_mean_{d_shift}_{d_window}")
for d_shift in [1, 7, 14]
for d_window in [7, 14, 30, 60]
)


print("*** polars lazy ***")

start_time = time.perf_counter()
q1_polars(pl.scan_parquet(PATH)).collect()
print(f"q1 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q2_polars(pl.scan_parquet(PATH)).collect()
print(f"q2 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q3_polars(pl.scan_parquet(PATH)).collect()
print(f"q2 took: {time.perf_counter() - start_time}")

print("*** polars eager ***")

start_time = time.perf_counter()
q1_polars(pl.read_parquet(PATH))
print(f"q1 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q2_polars(pl.read_parquet(PATH))
print(f"q2 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q3_polars(pl.read_parquet(PATH))
print(f"q2 took: {time.perf_counter() - start_time}")