|
1 | 1 | from datetime import datetime
|
2 | 2 |
|
3 | 3 | import dask_expr as dd
|
4 |
| -from dask.dataframe import Aggregation |
5 | 4 |
|
6 | 5 |
|
7 | 6 | def test_query_1(client, dataset_path, fs):
|
@@ -383,19 +382,14 @@ def test_query_8(client, dataset_path, fs):
|
383 | 382 | how="inner",
|
384 | 383 | )[["volume", "o_year", "nation"]]
|
385 | 384 |
|
386 |
| - def chunk_udf(df): |
387 |
| - denominator = df["volume"] |
388 |
| - df = df[df["nation"] == "BRAZIL"] |
389 |
| - numerator = df["volume"] |
390 |
| - return (numerator, denominator) |
391 |
| - |
392 |
| - def agg_udf(x): |
393 |
| - return round(x[0].sum() / x[1].sum(), 2) |
394 |
| - |
395 |
| - agg = Aggregation(name="mkt_share", chunk=chunk_udf, agg=agg_udf) |
396 |
| - |
397 |
| - total = total.groupby(["o_year"]).agg(agg) |
398 |
| - total = total.rename(columns={"o_year": "o_year", "x": "mkt_share"}) |
| 385 | + mkt_brazil = ( |
| 386 | + total[total["nation"] == "BRAZIL"].groupby("o_year").volume.sum().reset_index() |
| 387 | + ) |
| 388 | + mkt_total = total.groupby("o_year").volume.sum().reset_index() |
| 389 | + final = mkt_total.merge( |
| 390 | + mkt_brazil, left_on="o_year", right_on="o_year", suffixes=("_mkt", "_brazil") |
| 391 | + ) |
| 392 | + final["mkt_share"] = final.volume_brazil / final.volume_mkt |
399 | 393 | total = total.sort_values(by=["o_year"], ascending=[True])
|
400 | 394 |
|
401 | 395 | total.compute()
|
0 commit comments