Skip to content

Commit 5d9614a

Browse files
Add TPC-H query 8 for Dask (#1180)
Co-authored-by: Hendrik Makait <[email protected]>
1 parent 8c9db00 commit 5d9614a

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed

tests/tpch/test_correctness.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def verify_result(result: pd.DataFrame, query: int, answer_dir: pathlib.Path):
7777
5,
7878
6,
7979
7,
80-
pytest.param(8, marks=pytest.mark.skip(reason="Not implemented")),
80+
8,
8181
9,
8282
pytest.param(10, marks=pytest.mark.xfail(reason="Result is wrong")),
8383
11,

tests/tpch/test_dask.py

+60
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,66 @@ def test_query_7(client, dataset_path, fs):
327327
return result
328328

329329

330+
def test_query_8(client, dataset_path, fs):
331+
var1 = datetime.strptime("1995-01-01", "%Y-%m-%d")
332+
var2 = datetime.strptime("1997-01-01", "%Y-%m-%d")
333+
334+
supplier = dd.read_parquet(dataset_path + "supplier", filesystem=fs)
335+
lineitem = dd.read_parquet(dataset_path + "lineitem", filesystem=fs)
336+
orders = dd.read_parquet(dataset_path + "orders", filesystem=fs)
337+
customer = dd.read_parquet(dataset_path + "customer", filesystem=fs)
338+
nation = dd.read_parquet(dataset_path + "nation", filesystem=fs)
339+
region = dd.read_parquet(dataset_path + "region", filesystem=fs)
340+
part = dd.read_parquet(dataset_path + "part", filesystem=fs)
341+
342+
part = part[part["p_type"] == "ECONOMY ANODIZED STEEL"][["p_partkey"]]
343+
lineitem["volume"] = lineitem["l_extendedprice"] * (1.0 - lineitem["l_discount"])
344+
total = part.merge(lineitem, left_on="p_partkey", right_on="l_partkey", how="inner")
345+
346+
total = total.merge(
347+
supplier, left_on="l_suppkey", right_on="s_suppkey", how="inner"
348+
)
349+
350+
orders = orders[(orders["o_orderdate"] >= var1) & (orders["o_orderdate"] < var2)]
351+
orders["o_year"] = orders["o_orderdate"].dt.year
352+
total = total.merge(
353+
orders, left_on="l_orderkey", right_on="o_orderkey", how="inner"
354+
)
355+
356+
total = total.merge(
357+
customer, left_on="o_custkey", right_on="c_custkey", how="inner"
358+
)
359+
360+
n1_filtered = nation[["n_nationkey", "n_regionkey"]]
361+
total = total.merge(
362+
n1_filtered, left_on="c_nationkey", right_on="n_nationkey", how="inner"
363+
)
364+
365+
n2_filtered = nation[["n_nationkey", "n_name"]].rename(columns={"n_name": "nation"})
366+
total = total.merge(
367+
n2_filtered, left_on="s_nationkey", right_on="n_nationkey", how="inner"
368+
)
369+
370+
region = region[region["r_name"] == "AMERICA"][["r_regionkey"]]
371+
total = total.merge(
372+
region, left_on="n_regionkey", right_on="r_regionkey", how="inner"
373+
)
374+
375+
mkt_brazil = (
376+
total[total["nation"] == "BRAZIL"].groupby("o_year").volume.sum().reset_index()
377+
)
378+
mkt_total = total.groupby("o_year").volume.sum().reset_index()
379+
380+
final = mkt_total.merge(
381+
mkt_brazil, left_on="o_year", right_on="o_year", suffixes=("_mkt", "_brazil")
382+
)
383+
final["mkt_share"] = final.volume_brazil / final.volume_mkt
384+
final = final.sort_values(by=["o_year"], ascending=[True])[["o_year", "mkt_share"]]
385+
result = final.compute()
386+
387+
return result
388+
389+
330390
@pytest.mark.shuffle_p2p
331391
def test_query_9(client, dataset_path, fs):
332392
"""

0 commit comments

Comments
 (0)