Skip to content

Commit 94733eb

Browse files
committed
Fixup: refactor / cleanup [skip ci]
1 parent 1717751 commit 94733eb

File tree

1 file changed

+30
-67
lines changed

1 file changed

+30
-67
lines changed

tests/tpch/test_dask.py

+30-67
Original file line numberDiff line numberDiff line change
@@ -304,92 +304,55 @@ def test_query_8(client, dataset_path, fs):
304304
var1 = datetime.strptime("1995-01-01", "%Y-%m-%d")
305305
var2 = datetime.strptime("1997-01-01", "%Y-%m-%d")
306306

307-
supplier_ds = dd.read_parquet(dataset_path + "supplier", filesystem=fs)
308-
lineitem_ds = dd.read_parquet(dataset_path + "lineitem", filesystem=fs)
309-
orders_ds = dd.read_parquet(dataset_path + "orders", filesystem=fs)
310-
customer_ds = dd.read_parquet(dataset_path + "customer", filesystem=fs)
311-
nation_ds = dd.read_parquet(dataset_path + "nation", filesystem=fs)
312-
region_ds = dd.read_parquet(dataset_path + "region", filesystem=fs)
313-
part_ds = dd.read_parquet(dataset_path + "part", filesystem=fs)
314-
315-
part_filtered = part_ds[part_ds["p_type"] == "ECONOMY ANODIZED STEEL"][
316-
["p_partkey"]
317-
]
307+
supplier = dd.read_parquet(dataset_path + "supplier", filesystem=fs)
308+
lineitem = dd.read_parquet(dataset_path + "lineitem", filesystem=fs)
309+
orders = dd.read_parquet(dataset_path + "orders", filesystem=fs)
310+
customer = dd.read_parquet(dataset_path + "customer", filesystem=fs)
311+
nation = dd.read_parquet(dataset_path + "nation", filesystem=fs)
312+
region = dd.read_parquet(dataset_path + "region", filesystem=fs)
313+
part = dd.read_parquet(dataset_path + "part", filesystem=fs)
314+
315+
part = part[part["p_type"] == "ECONOMY ANODIZED STEEL"][["p_partkey"]]
316+
lineitem["volume"] = lineitem["l_extendedprice"] * (1.0 - lineitem["l_discount"])
317+
total = part.merge(lineitem, left_on="p_partkey", right_on="l_partkey", how="inner")
318318

319-
lineitem_filtered = lineitem_ds[["l_partkey", "l_suppkey", "l_orderkey"]]
320-
lineitem_filtered["volume"] = lineitem_ds["l_extendedprice"] * (
321-
1.0 - lineitem_ds["l_discount"]
319+
total = total.merge(
320+
supplier, left_on="l_suppkey", right_on="s_suppkey", how="inner"
322321
)
323-
total = part_filtered.merge(
324-
lineitem_filtered,
325-
left_on="p_partkey",
326-
right_on="l_partkey",
327-
how="inner",
328-
)[["l_suppkey", "l_orderkey", "volume"]]
329322

330-
supplier_filtered = supplier_ds[["s_suppkey", "s_nationkey"]]
323+
orders = orders[(orders["o_orderdate"] >= var1) & (orders["o_orderdate"] < var2)]
324+
orders["o_year"] = orders["o_orderdate"].dt.year
331325
total = total.merge(
332-
supplier_filtered,
333-
left_on="l_suppkey",
334-
right_on="s_suppkey",
335-
how="inner",
336-
)[["l_orderkey", "volume", "s_nationkey"]]
337-
338-
orders_filtered = orders_ds[
339-
(orders_ds["o_orderdate"] >= var1) & (orders_ds["o_orderdate"] < var2)
340-
]
326+
orders, left_on="l_orderkey", right_on="o_orderkey", how="inner"
327+
)
341328

342-
orders_filtered["o_year"] = orders_filtered["o_orderdate"].dt.year
343-
orders_filtered = orders_filtered[["o_orderkey", "o_custkey", "o_year"]]
344329
total = total.merge(
345-
orders_filtered,
346-
left_on="l_orderkey",
347-
right_on="o_orderkey",
348-
how="inner",
349-
)[["volume", "s_nationkey", "o_custkey", "o_year"]]
330+
customer, left_on="o_custkey", right_on="c_custkey", how="inner"
331+
)
350332

351-
customer_filtered = customer_ds[["c_custkey", "c_nationkey"]]
333+
n1_filtered = nation[["n_nationkey", "n_regionkey"]]
352334
total = total.merge(
353-
customer_filtered,
354-
left_on="o_custkey",
355-
right_on="c_custkey",
356-
how="inner",
357-
)[["volume", "s_nationkey", "o_year", "c_nationkey"]]
358-
359-
n1_filtered = nation_ds[["n_nationkey", "n_regionkey"]]
360-
n2_filtered = nation_ds[["n_nationkey", "n_name"]].rename(
361-
columns={"n_name": "nation"}
335+
n1_filtered, left_on="c_nationkey", right_on="n_nationkey", how="inner"
362336
)
363-
total = total.merge(
364-
n1_filtered,
365-
left_on="c_nationkey",
366-
right_on="n_nationkey",
367-
how="inner",
368-
)[["volume", "s_nationkey", "o_year", "n_regionkey"]]
369337

338+
n2_filtered = nation[["n_nationkey", "n_name"]].rename(columns={"n_name": "nation"})
370339
total = total.merge(
371-
n2_filtered,
372-
left_on="s_nationkey",
373-
right_on="n_nationkey",
374-
how="inner",
375-
)[["volume", "o_year", "n_regionkey", "nation"]]
340+
n2_filtered, left_on="s_nationkey", right_on="n_nationkey", how="inner"
341+
)
376342

377-
region_filtered = region_ds[region_ds["r_name"] == "AMERICA"][["r_regionkey"]]
343+
region = region[region["r_name"] == "AMERICA"][["r_regionkey"]]
378344
total = total.merge(
379-
region_filtered,
380-
left_on="n_regionkey",
381-
right_on="r_regionkey",
382-
how="inner",
383-
)[["volume", "o_year", "nation"]]
345+
region, left_on="n_regionkey", right_on="r_regionkey", how="inner"
346+
)
384347

385348
mkt_brazil = (
386349
total[total["nation"] == "BRAZIL"].groupby("o_year").volume.sum().reset_index()
387350
)
388351
mkt_total = total.groupby("o_year").volume.sum().reset_index()
352+
389353
final = mkt_total.merge(
390354
mkt_brazil, left_on="o_year", right_on="o_year", suffixes=("_mkt", "_brazil")
391355
)
392356
final["mkt_share"] = final.volume_brazil / final.volume_mkt
393-
total = total.sort_values(by=["o_year"], ascending=[True])
394-
395-
total.compute()
357+
final = final.sort_values(by=["o_year"], ascending=[True])
358+
final.compute()

0 commit comments

Comments
 (0)