Skip to content

Commit

Permalink
Update Dask queries
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Apr 12, 2024
1 parent 636891d commit 1781ba7
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 317 deletions.
34 changes: 14 additions & 20 deletions queries/dask/q1.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,30 @@


def q() -> None:
VAR1 = date(1998, 9, 2)

lineitem = utils.get_line_item_ds
line_item_ds = utils.get_line_item_ds
# first call one time to cache in case we don't include the IO times
lineitem()
line_item_ds()

def query() -> pd.DataFrame:
nonlocal lineitem
lineitem = lineitem()
nonlocal line_item_ds
line_item_ds = line_item_ds()

var1 = date(1998, 9, 2)

sel = lineitem.l_shipdate <= VAR1
lineitem_filtered = lineitem[sel]
filt = line_item_ds[line_item_ds["l_shipdate"] <= var1]

# This is lenient towards pandas as normally an optimizer should decide
# that this could be computed before the groupby aggregation.
# Other implementations don't enjoy this benefit.
lineitem_filtered["disc_price"] = lineitem_filtered.l_extendedprice * (
1 - lineitem_filtered.l_discount
)
lineitem_filtered["charge"] = (
lineitem_filtered.l_extendedprice
* (1 - lineitem_filtered.l_discount)
* (1 + lineitem_filtered.l_tax)
filt["disc_price"] = filt.l_extendedprice * (1.0 - filt.l_discount)
filt["charge"] = (
filt.l_extendedprice * (1.0 - filt.l_discount) * (1.0 + filt.l_tax)
)

# `groupby(as_index=False)` is not yet implemented by Dask:
# https://github.com/dask/dask/issues/5834
gb = lineitem_filtered.groupby(["l_returnflag", "l_linestatus"])

total = gb.agg(
gb = filt.groupby(["l_returnflag", "l_linestatus"])
agg = gb.agg(
sum_qty=pd.NamedAgg(column="l_quantity", aggfunc="sum"),
sum_base_price=pd.NamedAgg(column="l_extendedprice", aggfunc="sum"),
sum_disc_price=pd.NamedAgg(column="disc_price", aggfunc="sum"),
Expand All @@ -48,9 +42,9 @@ def query() -> pd.DataFrame:
avg_price=pd.NamedAgg(column="l_extendedprice", aggfunc="mean"),
avg_disc=pd.NamedAgg(column="l_discount", aggfunc="mean"),
count_order=pd.NamedAgg(column="l_orderkey", aggfunc="size"),
)
).reset_index()

result_df = total.reset_index().sort_values(["l_returnflag", "l_linestatus"])
result_df = agg.sort_values(["l_returnflag", "l_linestatus"])

return result_df.compute() # type: ignore[no-any-return]

Expand Down
133 changes: 24 additions & 109 deletions queries/dask/q2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@


def q() -> None:
var1 = 15
var2 = "BRASS"
var3 = "EUROPE"

region_ds = utils.get_region_ds
nation_ds = utils.get_nation_ds
supplier_ds = utils.get_supplier_ds
Expand All @@ -40,99 +36,26 @@ def query() -> pd.DataFrame:
part_ds = part_ds()
part_supp_ds = part_supp_ds()

nation_filtered = nation_ds.loc[:, ["n_nationkey", "n_name", "n_regionkey"]]
region_filtered = region_ds[(region_ds["r_name"] == var3)]
region_filtered = region_filtered.loc[:, ["r_regionkey"]]
r_n_merged = nation_filtered.merge(
region_filtered, left_on="n_regionkey", right_on="r_regionkey", how="inner"
)
r_n_merged = r_n_merged.loc[:, ["n_nationkey", "n_name"]]
supplier_filtered = supplier_ds.loc[
:,
[
"s_suppkey",
"s_name",
"s_address",
"s_nationkey",
"s_phone",
"s_acctbal",
"s_comment",
],
]
s_r_n_merged = r_n_merged.merge(
supplier_filtered,
left_on="n_nationkey",
right_on="s_nationkey",
how="inner",
)
s_r_n_merged = s_r_n_merged.loc[
:,
[
"n_name",
"s_suppkey",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
],
]
partsupp_filtered = part_supp_ds.loc[
:, ["ps_partkey", "ps_suppkey", "ps_supplycost"]
]
ps_s_r_n_merged = s_r_n_merged.merge(
partsupp_filtered, left_on="s_suppkey", right_on="ps_suppkey", how="inner"
)
ps_s_r_n_merged = ps_s_r_n_merged.loc[
:,
[
"n_name",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
"ps_partkey",
"ps_supplycost",
],
]
part_filtered = part_ds.loc[:, ["p_partkey", "p_mfgr", "p_size", "p_type"]]
part_filtered = part_filtered[
(part_filtered["p_size"] == var1)
# & (part_filtered["p_type"].astype(str).str.endswith(var2))
& (part_filtered["p_type"].str.endswith(var2))
]
part_filtered = part_filtered.loc[:, ["p_partkey", "p_mfgr"]]
merged_df = part_filtered.merge(
ps_s_r_n_merged, left_on="p_partkey", right_on="ps_partkey", how="inner"
var1 = 15
var2 = "BRASS"
var3 = "EUROPE"

jn = (
part_ds.merge(part_supp_ds, left_on="p_partkey", right_on="ps_partkey")
.merge(supplier_ds, left_on="ps_suppkey", right_on="s_suppkey")
.merge(nation_ds, left_on="s_nationkey", right_on="n_nationkey")
.merge(region_ds, left_on="n_regionkey", right_on="r_regionkey")
)
merged_df = merged_df.loc[
:,
[
"n_name",
"s_name",
"s_address",
"s_phone",
"s_acctbal",
"s_comment",
"ps_supplycost",
"p_partkey",
"p_mfgr",
],
]

# `groupby(as_index=False)` is not yet implemented by Dask:
# https://github.com/dask/dask/issues/5834
min_values = merged_df.groupby("p_partkey")["ps_supplycost"].min().reset_index()
jn = jn[jn["p_size"] == var1]
jn = jn[jn["p_type"].str.endswith(var2)]
jn = jn[jn["r_name"] == var3]

min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"]
merged_df = merged_df.merge(
min_values,
left_on=["p_partkey", "ps_supplycost"],
right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"],
how="inner",
)
result_df = merged_df.loc[
gb = jn.groupby("p_partkey")
agg = gb["ps_supplycost"].min().reset_index()
jn2 = agg.merge(jn, on=["p_partkey", "ps_supplycost"])

sel = jn2.loc[
:,
[
"s_acctbal",
Expand All @@ -145,22 +68,14 @@ def query() -> pd.DataFrame:
"s_comment",
],
]
result_df = result_df.sort_values(
by=[
"s_acctbal",
"n_name",
"s_name",
"p_partkey",
],
ascending=[
False,
True,
True,
True,
],
).head(100, compute=False)

return result_df.compute() # type: ignore[no-any-return]
sort = sel.sort_values(
by=["s_acctbal", "n_name", "s_name", "p_partkey"],
ascending=[False, True, True, True],
)
result_df = sort.head(100)

return result_df # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)

Expand Down
50 changes: 20 additions & 30 deletions queries/dask/q3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@


def q() -> None:
var1 = var2 = date(1995, 3, 15)
var3 = "BUILDING"

customer_ds = utils.get_customer_ds
line_item_ds = utils.get_line_item_ds
orders_ds = utils.get_orders_ds
Expand All @@ -32,35 +29,28 @@ def query() -> pd.DataFrame:
line_item_ds = line_item_ds()
orders_ds = orders_ds()

lineitem_filtered = line_item_ds.loc[
:, ["l_orderkey", "l_extendedprice", "l_discount", "l_shipdate"]
]
orders_filtered = orders_ds.loc[
:, ["o_orderkey", "o_custkey", "o_orderdate", "o_shippriority"]
]
customer_filtered = customer_ds.loc[:, ["c_mktsegment", "c_custkey"]]
lsel = lineitem_filtered.l_shipdate > var1
osel = orders_filtered.o_orderdate < var2
csel = customer_filtered.c_mktsegment == var3
flineitem = lineitem_filtered[lsel]
forders = orders_filtered[osel]
fcustomer = customer_filtered[csel]
jn1 = fcustomer.merge(forders, left_on="c_custkey", right_on="o_custkey")
jn2 = jn1.merge(flineitem, left_on="o_orderkey", right_on="l_orderkey")
var1 = "BUILDING"
var2 = date(1995, 3, 15)

fcustomer = customer_ds[customer_ds["c_mktsegment"] == var1]

jn1 = fcustomer.merge(orders_ds, left_on="c_custkey", right_on="o_custkey")
jn2 = jn1.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")

jn2 = jn2[jn2["o_orderdate"] < var2]
jn2 = jn2[jn2["l_shipdate"] > var2]
jn2["revenue"] = jn2.l_extendedprice * (1 - jn2.l_discount)

# `groupby(as_index=False)` is not yet implemented by Dask:
# https://github.com/dask/dask/issues/5834
total = (
jn2.groupby(["l_orderkey", "o_orderdate", "o_shippriority"])["revenue"]
.sum()
.reset_index()
.sort_values(["revenue"], ascending=False)
)
result_df = total.head(10, compute=False).loc[
:, ["l_orderkey", "revenue", "o_orderdate", "o_shippriority"]
]
return result_df.compute() # type: ignore[no-any-return]
gb = jn2.groupby(["o_orderkey", "o_orderdate", "o_shippriority"])
agg = gb["revenue"].sum().reset_index()

sel = agg.loc[:, ["o_orderkey", "revenue", "o_orderdate", "o_shippriority"]]
sel = sel.rename(columns={"o_orderkey": "l_orderkey"})

sorted = sel.sort_values(by=["revenue", "o_orderdate"], ascending=[False, True])
result_df = sorted.head(10)

return result_df # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)

Expand Down
49 changes: 20 additions & 29 deletions queries/dask/q4.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
from __future__ import annotations

from datetime import date
from typing import TYPE_CHECKING

from queries.dask import utils
import pandas as pd

if TYPE_CHECKING:
import pandas as pd
from queries.dask import utils

Q_NUM = 4


def q() -> None:
date1 = date(1993, 10, 1)
date2 = date(1993, 7, 1)

line_item_ds = utils.get_line_item_ds
orders_ds = utils.get_orders_ds

# First call one time to cache in case we don't include the IO times
# first call one time to cache in case we don't include the IO times
line_item_ds()
orders_ds()

Expand All @@ -28,27 +23,23 @@ def query() -> pd.DataFrame:
line_item_ds = line_item_ds()
orders_ds = orders_ds()

lsel = line_item_ds.l_commitdate < line_item_ds.l_receiptdate
osel = (orders_ds.o_orderdate < date1) & (orders_ds.o_orderdate >= date2)
flineitem = line_item_ds[lsel]
forders = orders_ds[osel]

# `isin(Series)` is not yet implemented by Dask.
# https://github.com/dask/dask/issues/4227
forders = forders[["o_orderkey", "o_orderpriority"]]
jn = forders.merge(
flineitem, left_on="o_orderkey", right_on="l_orderkey"
).drop_duplicates(subset=["o_orderkey"])[["o_orderpriority", "o_orderkey"]]

# `groupby(as_index=False)` is not yet implemented by Dask:
# https://github.com/dask/dask/issues/5834
result_df = (
jn.groupby("o_orderpriority")["o_orderkey"]
.count()
.reset_index()
.sort_values(["o_orderpriority"])
.rename(columns={"o_orderkey": "order_count"})
)
var1 = date(1993, 7, 1)
var2 = date(1993, 10, 1)

jn = line_item_ds.merge(orders_ds, left_on="l_orderkey", right_on="o_orderkey")

jn = jn[(jn["o_orderdate"] >= var1) & (jn["o_orderdate"] < var2)]
jn = jn[jn["l_commitdate"] < jn["l_receiptdate"]]

jn = jn.drop_duplicates(subset=["o_orderpriority", "l_orderkey"])

gb = jn.groupby("o_orderpriority")
agg = gb.agg(
order_count=pd.NamedAgg(column="o_orderkey", aggfunc="count")
).reset_index()

result_df = agg.sort_values(["o_orderpriority"])

return result_df.compute() # type: ignore[no-any-return]

utils.run_query(Q_NUM, query)
Expand Down
Loading

0 comments on commit 1781ba7

Please sign in to comment.