diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 0f8a6eb0b..af0eb1cbc 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -78,6 +78,10 @@ where `YOUR-GITHUB-USERNAME` will be your GitHub user name. Here's how you can set up your local development environment to contribute. +#### Prerequisites for PySpark tests + +If you want to run PySpark-related tests, you'll need to have Java installed. Refer to the [Spark documentation](https://spark.apache.org/docs/latest/#downloading) for more information. + #### Option 1: Use UV (recommended) 1. Make sure you have Python3.12 installed, create a virtual environment, diff --git a/narwhals/_spark_like/expr.py b/narwhals/_spark_like/expr.py index d190b5667..66826a6ab 100644 --- a/narwhals/_spark_like/expr.py +++ b/narwhals/_spark_like/expr.py @@ -160,6 +160,11 @@ def __gt__(self, other: SparkLikeExpr) -> Self: returns_scalar=False, ) + def abs(self) -> Self: + from pyspark.sql import functions as F # noqa: N812 + + return self._from_call(F.abs, "abs", returns_scalar=self._returns_scalar) + def alias(self, name: str) -> Self: def _alias(df: SparkLikeLazyFrame) -> list[Column]: return [col.alias(name) for col in self._call(df)] @@ -179,44 +184,42 @@ def _alias(df: SparkLikeLazyFrame) -> list[Column]: ) def count(self) -> Self: - def _count(_input: Column) -> Column: - from pyspark.sql import functions as F # noqa: N812 + from pyspark.sql import functions as F # noqa: N812 - return F.count(_input) - - return self._from_call(_count, "count", returns_scalar=True) + return self._from_call(F.count, "count", returns_scalar=True) def max(self) -> Self: - def _max(_input: Column) -> Column: - from pyspark.sql import functions as F # noqa: N812 + from pyspark.sql import functions as F # noqa: N812 - return F.max(_input) - - return self._from_call(_max, "max", returns_scalar=True) + return self._from_call(F.max, "max", returns_scalar=True) def mean(self) -> Self: - def _mean(_input: Column) -> Column: + from pyspark.sql import functions as F # noqa: N812 + + return self._from_call(F.mean, "mean", returns_scalar=True) + + def median(self) -> Self: + def _median(_input: Column) -> Column: + import pyspark # ignore-banned-import from pyspark.sql import functions as F # noqa: N812 - return F.mean(_input) + if parse_version(pyspark.__version__) < (3, 4): + # Use percentile_approx with default accuracy parameter (10000) + return F.percentile_approx(_input.cast("double"), 0.5) - return self._from_call(_mean, "mean", returns_scalar=True) + return F.median(_input) - def min(self) -> Self: - def _min(_input: Column) -> Column: - from pyspark.sql import functions as F # noqa: N812 + return self._from_call(_median, "median", returns_scalar=True) - return F.min(_input) + def min(self) -> Self: + from pyspark.sql import functions as F # noqa: N812 - return self._from_call(_min, "min", returns_scalar=True) + return self._from_call(F.min, "min", returns_scalar=True) def sum(self) -> Self: - def _sum(_input: Column) -> Column: - from pyspark.sql import functions as F # noqa: N812 - - return F.sum(_input) + from pyspark.sql import functions as F # noqa: N812 - return self._from_call(_sum, "sum", returns_scalar=True) + return self._from_call(F.sum, "sum", returns_scalar=True) def std(self: Self, ddof: int) -> Self: from functools import partial @@ -239,3 +242,133 @@ def var(self: Self, ddof: int) -> Self: func = partial(_var, ddof=ddof, np_version=parse_version(np.__version__)) return self._from_call(func, "var", returns_scalar=True, ddof=ddof) + + def clip( + self, + lower_bound: Any | None = None, + upper_bound: Any | None = None, + ) -> Self: + def _clip(_input: Column, lower_bound: Any, upper_bound: Any) -> Column: + from pyspark.sql import functions as F # noqa: N812 + + result = _input + if lower_bound is not None: + # Convert lower_bound to a literal Column + result = F.when(result < lower_bound, F.lit(lower_bound)).otherwise( + result + ) + if upper_bound is not None: + # Convert upper_bound to a literal Column + result = F.when(result > upper_bound, F.lit(upper_bound)).otherwise( + result + ) + return result + + return self._from_call( + _clip, + "clip", + lower_bound=lower_bound, + upper_bound=upper_bound, + returns_scalar=self._returns_scalar, + ) + + def is_between( + self, + lower_bound: Any, + upper_bound: Any, + closed: str, + ) -> Self: + def _is_between(_input: Column, lower_bound: Any, upper_bound: Any) -> Column: + if closed == "both": + return (_input >= lower_bound) & (_input <= upper_bound) + if closed == "none": + return (_input > lower_bound) & (_input < upper_bound) + if closed == "left": + return (_input >= lower_bound) & (_input < upper_bound) + return (_input > lower_bound) & (_input <= upper_bound) + + return self._from_call( + _is_between, + "is_between", + lower_bound=lower_bound, + upper_bound=upper_bound, + returns_scalar=self._returns_scalar, + ) + + def is_duplicated(self) -> Self: + def _is_duplicated(_input: Column) -> Column: + from pyspark.sql import Window + from pyspark.sql import functions as F # noqa: N812 + + # Create a window spec that treats each value separately. + return F.count("*").over(Window.partitionBy(_input)) > 1 + + return self._from_call( + _is_duplicated, "is_duplicated", returns_scalar=self._returns_scalar + ) + + def is_finite(self) -> Self: + def _is_finite(_input: Column) -> Column: + from pyspark.sql import functions as F # noqa: N812 + + # A value is finite if it's not NaN, not NULL, and not infinite + return ( + ~F.isnan(_input) + & ~F.isnull(_input) + & (_input != float("inf")) + & (_input != float("-inf")) + ) + + return self._from_call( + _is_finite, "is_finite", returns_scalar=self._returns_scalar + ) + + def is_in(self, values: Sequence[Any]) -> Self: + def _is_in(_input: Column, values: Sequence[Any]) -> Column: + return _input.isin(values) + + return self._from_call( + _is_in, + "is_in", + values=values, + returns_scalar=self._returns_scalar, + ) + + def is_unique(self) -> Self: + def _is_unique(_input: Column) -> Column: + from pyspark.sql import Window + from pyspark.sql import functions as F # noqa: N812 + + # Create a window spec that treats each value separately + return F.count("*").over(Window.partitionBy(_input)) == 1 + + return self._from_call( + _is_unique, "is_unique", returns_scalar=self._returns_scalar + ) + + def len(self) -> Self: + def _len(_input: Column) -> Column: + from pyspark.sql import functions as F # noqa: N812 + + # Use count(*) to count all rows including nulls + return F.count("*") + + return self._from_call(_len, "len", returns_scalar=True) + + def round(self, decimals: int) -> Self: + def _round(_input: Column, decimals: int) -> Column: + from pyspark.sql import functions as F # noqa: N812 + + return F.round(_input, decimals) + + return self._from_call( + _round, + "round", + decimals=decimals, + returns_scalar=self._returns_scalar, + ) + + def skew(self) -> Self: + from pyspark.sql import functions as F # noqa: N812 + + return self._from_call(F.skewness, "skew", returns_scalar=True) diff --git a/narwhals/utils.py b/narwhals/utils.py index b8e9830e1..591cd53ae 100644 --- a/narwhals/utils.py +++ b/narwhals/utils.py @@ -155,7 +155,11 @@ def is_pandas_like(self) -> bool: >>> df.implementation.is_pandas_like() True """ - return self in {Implementation.PANDAS, Implementation.MODIN, Implementation.CUDF} + return self in { + Implementation.PANDAS, + Implementation.MODIN, + Implementation.CUDF, + } def is_polars(self) -> bool: """Return whether implementation is Polars. diff --git a/tests/spark_like_test.py b/tests/spark_like_test.py index f0c66ab04..c929f4f85 100644 --- a/tests/spark_like_test.py +++ b/tests/spark_like_test.py @@ -271,6 +271,14 @@ def test_add(pyspark_constructor: Constructor) -> None: assert_equal_data(result, expected) +def test_abs(pyspark_constructor: Constructor) -> None: + data = {"a": [1, 2, 3, -4, 5]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select(nw.col("a").abs()) + expected = {"a": [1, 2, 3, 4, 5]} + assert_equal_data(result, expected) + + # copied from tests/expr_and_series/all_horizontal_test.py @pytest.mark.parametrize("expr1", ["a", nw.col("a")]) @pytest.mark.parametrize("expr2", ["b", nw.col("b")]) @@ -569,7 +577,9 @@ def test_drop_nulls(pyspark_constructor: Constructor) -> None: ], ) def test_drop_nulls_subset( - pyspark_constructor: Constructor, subset: str | list[str], expected: dict[str, float] + pyspark_constructor: Constructor, + subset: str | list[str], + expected: dict[str, float], ) -> None: data = { "a": [1.0, 2.0, None, 4.0], @@ -720,7 +730,8 @@ def test_cross_join(pyspark_constructor: Constructor) -> None: assert_equal_data(result, expected) with pytest.raises( - ValueError, match="Can not pass `left_on`, `right_on` or `on` keys for cross join" + ValueError, + match="Can not pass `left_on`, `right_on` or `on` keys for cross join", ): df.join(other, how="cross", left_on="antananarivo") # type: ignore[arg-type] @@ -940,3 +951,148 @@ def test_left_join_overlapping_column(pyspark_constructor: Constructor) -> None: "c": [4.0, 6.0, None], } assert_equal_data(result, expected) + + +# Copied from tests/expr_and_series/median_test.py +def test_median(pyspark_constructor: Constructor) -> None: + data = {"a": [3, 8, 2, None], "b": [5, 5, None, 7], "z": [7.0, 8, 9, None]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select( + a=nw.col("a").median(), b=nw.col("b").median(), z=nw.col("z").median() + ) + expected = {"a": [3.0], "b": [5.0], "z": [8.0]} + assert_equal_data(result, expected) + + +# copied from tests/expr_and_series/clip_test.py +def test_clip(pyspark_constructor: Constructor) -> None: + df = nw.from_native(pyspark_constructor({"a": [1, 2, 3, -4, 5]})) + result = df.select( + lower_only=nw.col("a").clip(lower_bound=3), + upper_only=nw.col("a").clip(upper_bound=4), + both=nw.col("a").clip(3, 4), + ) + expected = { + "lower_only": [3, 3, 3, 3, 5], + "upper_only": [1, 2, 3, -4, 4], + "both": [3, 3, 3, 3, 4], + } + assert_equal_data(result, expected) + + +# copied from tests/expr_and_series/is_between_test.py +@pytest.mark.parametrize( + ("closed", "expected"), + [ + ("left", [True, True, True, False]), + ("right", [False, True, True, True]), + ("both", [True, True, True, True]), + ("none", [False, True, True, False]), + ], +) +def test_is_between( + pyspark_constructor: Constructor, closed: str, expected: list[bool] +) -> None: + data = {"a": [1, 4, 2, 5]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select(nw.col("a").is_between(1, 5, closed=closed)) + expected_dict = {"a": expected} + assert_equal_data(result, expected_dict) + + +# copied from tests/expr_and_series/is_duplicated_test.py +def test_is_duplicated(pyspark_constructor: Constructor) -> None: + data = {"a": [1, 1, 2, None], "b": [1, 2, None, None], "level_0": [0, 1, 2, 3]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select( + a=nw.col("a").is_duplicated(), + b=nw.col("b").is_duplicated(), + level_0=nw.col("level_0"), + ).sort("level_0") + expected = { + "a": [True, True, False, False], + "b": [False, False, True, True], + "level_0": [0, 1, 2, 3], + } + assert_equal_data(result, expected) + + +# copied from tests/expr_and_series/is_finite_test.py +def test_is_finite(pyspark_constructor: Constructor) -> None: + data = {"a": [float("nan"), float("inf"), 2.0, None]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select(finite=nw.col("a").is_finite()) + expected = {"finite": [False, False, True, False]} + assert_equal_data(result, expected) + + +def test_is_in(pyspark_constructor: Constructor) -> None: + data = {"a": [1, 2, 3, 4, 5]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select(in_list=nw.col("a").is_in([2, 4])) + expected = {"in_list": [False, True, False, True, False]} + assert_equal_data(result, expected) + + +# copied from tests/expr_and_series/is_unique_test.py +def test_is_unique(pyspark_constructor: Constructor) -> None: + data = {"a": [1, 1, 2, None], "b": [1, 2, None, None], "level_0": [0, 1, 2, 3]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select( + a=nw.col("a").is_unique(), + b=nw.col("b").is_unique(), + level_0=nw.col("level_0"), + ).sort("level_0") + expected = { + "a": [False, False, True, True], + "b": [True, True, False, False], + "level_0": [0, 1, 2, 3], + } + assert_equal_data(result, expected) + + +def test_len(pyspark_constructor: Constructor) -> None: + data = {"a": [1, 2, float("nan"), 4, None], "b": [None, 3, None, 5, None]} + df = nw.from_native(pyspark_constructor(data)) + result = df.select( + a=nw.col("a").len(), + b=nw.col("b").len(), + ) + expected = {"a": [5], "b": [5]} + assert_equal_data(result, expected) + + +# Copied from tests/expr_and_series/round_test.py +@pytest.mark.parametrize("decimals", [0, 1, 2]) +def test_round(pyspark_constructor: Constructor, decimals: int) -> None: + data = {"a": [2.12345, 2.56789, 3.901234]} + df = nw.from_native(pyspark_constructor(data)) + + expected_data = {k: [round(e, decimals) for e in v] for k, v in data.items()} + result_frame = df.select(nw.col("a").round(decimals)) + assert_equal_data(result_frame, expected_data) + + +# copied from tests/expr_and_series/skew_test.py +@pytest.mark.parametrize( + ("data", "expected"), + [ + pytest.param( + [], + None, + marks=pytest.mark.skip( + reason="PySpark cannot infer schema from empty datasets" + ), + ), + ([1], None), + ([1, 2], 0.0), + ([0.0, 0.0, 0.0], None), + ([1, 2, 3, 2, 1], 0.343622), + ], +) +def test_skew( + pyspark_constructor: Constructor, data: list[float], expected: float | None +) -> None: + df = nw.from_native(pyspark_constructor({"a": data})) + result = df.select(skew=nw.col("a").skew()) + assert_equal_data(result, {"skew": [expected]})