diff --git a/ambrosia/spark_tools/constants.py b/ambrosia/spark_tools/constants.py new file mode 100644 index 0000000..b75233a --- /dev/null +++ b/ambrosia/spark_tools/constants.py @@ -0,0 +1,15 @@ +# Copyright 2022 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +EMPTY_VALUE_PARTITION: int = 0 diff --git a/ambrosia/spark_tools/split_tools.py b/ambrosia/spark_tools/split_tools.py index dfd58cf..87e16e9 100644 --- a/ambrosia/spark_tools/split_tools.py +++ b/ambrosia/spark_tools/split_tools.py @@ -16,6 +16,7 @@ import ambrosia.spark_tools.stratification as strat_pkg from ambrosia import types +from ambrosia.spark_tools.constants import EMPTY_VALUE_PARTITION from ambrosia.tools import split_tools from ambrosia.tools.import_tools import spark_installed @@ -26,7 +27,6 @@ HASH_COLUMN_NAME: str = "__hashed_ambrosia_column" GROUPS_COLUMN: str = "group" ROW_NUMBER: str = "__row_number" -EMPTY_VALUE: int = 0 def unite_spark_tables(*dataframes: types.SparkDataFrame) -> types.SparkDataFrame: @@ -90,7 +90,7 @@ def udf_make_labels(row_number: int) -> str: label_ind = (row_number - 1) // groups_size return labels[label_ind] - window = Window.orderBy(HASH_COLUMN_NAME).partitionBy(spark_funcs.lit(EMPTY_VALUE)) + window = Window.orderBy(HASH_COLUMN_NAME).partitionBy(spark_funcs.lit(EMPTY_VALUE_PARTITION)) result = hashed_dataframe.withColumn(ROW_NUMBER, spark_funcs.row_number().over(window)).withColumn( GROUPS_COLUMN, spark_funcs.udf(udf_make_labels)(spark_funcs.col(ROW_NUMBER)) ) @@ -128,7 +128,9 @@ def udf_make_labels_with_find(row_number: int): not_used_ids.withColumn( ROW_NUMBER, spark_funcs.row_number().over( - Window.orderBy(spark_funcs.lit(EMPTY_VALUE)).partitionBy(spark_funcs.lit(EMPTY_VALUE)) + Window.orderBy(spark_funcs.lit(EMPTY_VALUE_PARTITION)).partitionBy( + spark_funcs.lit(EMPTY_VALUE_PARTITION) + ) ), ) .withColumn(GROUPS_COLUMN, spark_funcs.udf(udf_make_labels_with_find)(ROW_NUMBER)) diff --git a/ambrosia/spark_tools/stat_criteria.py b/ambrosia/spark_tools/stat_criteria.py index 072ec37..6e35501 100644 --- a/ambrosia/spark_tools/stat_criteria.py +++ b/ambrosia/spark_tools/stat_criteria.py @@ -19,15 +19,19 @@ import ambrosia.tools.pvalue_tools as pvalue_pkg import ambrosia.tools.theoretical_tools as theory_pkg +import ambrosia.tools.type_checks as cast_pkg from ambrosia import types +from ambrosia.spark_tools.constants import EMPTY_VALUE_PARTITION from ambrosia.spark_tools.theory import get_stats_from_table from ambrosia.tools.ab_abstract_component import ABStatCriterion from ambrosia.tools.configs import Effects from ambrosia.tools.import_tools import spark_installed +from ambrosia.tools.stat_criteria import TtestRelHelpful if spark_installed(): import pyspark.sql.functions as F - from pyspark.sql.functions import col, row_number + from pyspark.sql.dataframe import DataFrame + from pyspark.sql.functions import col, mean, row_number, variance from pyspark.sql.window import Window @@ -88,8 +92,7 @@ class TtestIndCriterionSpark(ABSparkCriterion): Unit for pyspark independent T-test. """ - __implemented_effect_types: List = ["absolute", "relative"] - __type_error_msg: str = f"Choose effect type from {__implemented_effect_types}" + implemented_effect_types: List = ["absolute", "relative"] __data_parameters = ["mean_group_a", "mean_group_b", "std_group_a", "std_group_b", "nobs_group_a", "nobs_group_b"] def __calc_and_cache_data_parameters( @@ -127,18 +130,18 @@ def calculate_pvalue( effect_type: str = "absolute", **kwargs, ): - if effect_type not in TtestIndCriterionSpark.__implemented_effect_types: - raise ValueError(TtestIndCriterionSpark.__type_error_msg) + if effect_type not in self.implemented_effect_types: + raise ValueError(self._send_type_error_msg()) if not self.parameters_are_cached: self.__calc_and_cache_data_parameters(group_a, group_b, column) if effect_type == "absolute": p_value = sps.ttest_ind_from_stats( - self.data_stats["mean_group_b"], - self.data_stats["std_group_b"], - self.data_stats["nobs_group_b"], self.data_stats["mean_group_a"], self.data_stats["std_group_a"], self.data_stats["nobs_group_a"], + self.data_stats["mean_group_b"], + self.data_stats["std_group_b"], + self.data_stats["nobs_group_b"], **kwargs, ).pvalue elif effect_type == "relative": @@ -163,7 +166,7 @@ def calculate_effect( "mean_group_a" ] else: - raise ValueError(TtestIndCriterionSpark.__type_error_msg) + raise ValueError(self._send_type_error_msg()) return effect def calculate_conf_interval( @@ -175,10 +178,12 @@ def calculate_conf_interval( effect_type: str = "absolute", **kwargs, ): + alpha = cast_pkg.transform_alpha_np(alpha) if self.parameters_are_cached is not True: self.__calc_and_cache_data_parameters(group_a, group_b, column) if effect_type == "absolute": - alpha_corrected: float = pvalue_pkg.corrected_alpha(alpha, kwargs["alternative"]) + alternative = "two-sided" if"alternative" not in kwargs else kwargs["alternative"] + alpha_corrected: float = pvalue_pkg.corrected_alpha(alpha, alternative) quantiles, sd = theory_pkg.get_ttest_info_from_stats( var_a=self.data_stats["std_group_a"] ** 2, var_b=self.data_stats["std_group_b"] ** 2, @@ -189,15 +194,15 @@ def calculate_conf_interval( mean = self.data_stats["mean_group_b"] - self.data_stats["mean_group_a"] left_ci: np.ndarray = mean - quantiles * sd right_ci: np.ndarray = mean + quantiles * sd - return self._make_ci(left_ci, right_ci, kwargs["alternative"]) + return self._make_ci(left_ci, right_ci, alternative) elif effect_type == "relative": conf_interval = self._apply_delta_method(alpha, **kwargs)[0] return conf_interval else: - raise ValueError(TtestIndCriterionSpark.__type_error_msg) + raise ValueError(self._send_type_error_msg()) -class TtestRelativeCriterionSpark(ABSparkCriterion): +class TtestRelativeCriterionSpark(ABSparkCriterion, TtestRelHelpful): """ Relative ttest for spark """ @@ -213,15 +218,23 @@ def _rename_col(column: str, group: str) -> str: def _calc_and_cache_data_parameters( self, group_a: types.SparkDataFrame, group_b: types.SparkDataFrame, column: types.ColumnNameType ) -> None: - a_ = ( + col_a: str = self._rename_col(column, "a") + col_b: str = self._rename_col(column, "b") + a_: DataFrame = ( group_a.withColumn(self.__ord_col, F.lit(1)) - .withColumn(self.__add_index_name, row_number().over(Window().orderBy(self.__ord_col))) - .withColumnRenamed(column, self._rename_col(column, "a")) + .withColumn( + self.__add_index_name, + row_number().over(Window().orderBy(self.__ord_col).partitionBy(F.lit(EMPTY_VALUE_PARTITION))), + ) + .withColumnRenamed(column, col_a) ) - b_ = ( + b_: DataFrame = ( group_b.withColumn(self.__ord_col, F.lit(1)) - .withColumn(self.__add_index_name, row_number().over(Window().orderBy(self.__ord_col))) - .withColumnRenamed(column, self._rename_col(column, "b")) + .withColumn( + self.__add_index_name, + row_number().over(Window().orderBy(self.__ord_col).partitionBy(F.lit(EMPTY_VALUE_PARTITION))), + ) + .withColumnRenamed(column, col_b) ) n_a_obs: int = group_a.count() @@ -230,11 +243,25 @@ def _calc_and_cache_data_parameters( if n_a_obs != n_b_obs: raise ValueError("Size of group A and B must be equal") - both = a_.join(b_, self.__add_index_name, "inner").withColumn( - self.__diff, col(self._rename_col(column, "b")) - col(self._rename_col(column, "a")) - ) + both: DataFrame = a_.join(b_, self.__add_index_name, "inner").withColumn(self.__diff, col(col_b) - col(col_a)) + + cov: float = both.stat.cov(col_a, col_b) + stats = both.select( + variance(col_a).alias("__var_a"), + variance(col_b).alias("__var_b"), + mean(col_a).alias("__mean_a"), + mean(col_b).alias("__mean_b"), + ).first() + var_a: float = theory_pkg.unbiased_to_sufficient(stats["__var_a"], n_a_obs, is_std=False) + var_b: float = theory_pkg.unbiased_to_sufficient(stats["__var_b"], n_a_obs, is_std=False) + self.data_stats["mean"], self.data_stats["std"] = get_stats_from_table(both, self.__diff) self.data_stats["n_obs"] = n_a_obs + self.data_stats["cov"] = cov + self.data_stats["var_a"] = var_a + self.data_stats["var_b"] = var_b + self.data_stats["mean_a"] = stats["__mean_a"] + self.data_stats["mean_b"] = stats["__mean_b"] self.parameters_are_cached = True def calculate_pvalue( @@ -247,11 +274,26 @@ def calculate_pvalue( ): self._recalc_cache(group_a, group_b, column) if effect_type == Effects.abs.value: + if "alternative" in kwargs: + kwargs["alternative"] = theory_pkg.switch_alternative(kwargs["alternative"]) p_value = theory_pkg.ttest_1samp_from_stats( mean=self.data_stats["mean"], std=self.data_stats["std"], n_obs=self.data_stats["n_obs"], **kwargs - ) + )[ + 1 + ] # (stat, pvalue) elif effect_type == Effects.rel.value: - raise NotImplementedError("Will be implemented later") + _, p_value = theory_pkg.apply_delta_method_by_stats( + size=self.data_stats["n_obs"], + mean_group_a=self.data_stats["mean_a"], + mean_group_b=self.data_stats["mean_b"], + var_group_a=self.data_stats["var_a"], + var_group_b=self.data_stats["var_b"], + cov_groups=self.data_stats["cov"], + transformation="fraction", + **kwargs + ) + else: + raise ValueError(self._send_type_error_msg()) self._check_clear_cache() return p_value @@ -259,18 +301,49 @@ def calculate_conf_interval( self, group_a: types.SparkDataFrame, group_b: types.SparkDataFrame, - alpha: types.StatErrorType, - effect_type: str, + column: str, + alpha: types.StatErrorType = np.array([0.05]), + effect_type: str = Effects.abs.value, **kwargs, ) -> List[Tuple]: - raise NotImplementedError("Will be implemented later") + self._recalc_cache(group_a, group_b, column) + alpha = cast_pkg.transform_alpha_np(alpha) + if effect_type == Effects.abs.value: + confidence_intervals = self._build_intervals_absolute_from_stats( + center=self.data_stats["mean"], + sd_1=self.data_stats["std"], + n_obs=self.data_stats["n_obs"], + alpha=alpha, + **kwargs, + ) + elif effect_type == Effects.rel.value: + confidence_intervals, _ = theory_pkg.apply_delta_method_by_stats( + size=self.data_stats["n_obs"], + mean_group_a=self.data_stats["mean_a"], + mean_group_b=self.data_stats["mean_b"], + var_group_a=self.data_stats["var_a"], + var_group_b=self.data_stats["var_b"], + cov_groups=self.data_stats["cov"], + alpha=alpha, + transformation="fraction", + **kwargs, + ) + else: + raise ValueError(self._send_type_error_msg()) + return confidence_intervals def calculate_effect( - self, group_a: types.SparkDataFrame, group_b: types.SparkDataFrame, column: str, effect_type: str + self, + group_a: types.SparkDataFrame, + group_b: types.SparkDataFrame, + column: str, + effect_type: str = Effects.abs.value, ) -> float: self._recalc_cache(group_a, group_b, column) if effect_type == Effects.abs.value: effect: float = self.data_stats["mean"] + elif effect_type == Effects.rel.value: + effect: float = (self.data_stats["mean_b"] - self.data_stats["mean_a"]) / self.data_stats["mean_a"] else: - raise NotImplementedError("Will be implemented later") + raise ValueError(self._send_type_error_msg()) return effect diff --git a/ambrosia/spark_tools/stratification.py b/ambrosia/spark_tools/stratification.py index 50e53d4..9b12a32 100644 --- a/ambrosia/spark_tools/stratification.py +++ b/ambrosia/spark_tools/stratification.py @@ -16,6 +16,7 @@ import ambrosia.tools.ab_abstract_component as ab_abstract from ambrosia import types +from ambrosia.spark_tools.constants import EMPTY_VALUE_PARTITION from ambrosia.tools.import_tools import spark_installed if spark_installed(): @@ -23,7 +24,6 @@ from pyspark.sql import Window -EMPTY_VALUE: int = 0 STRAT_GROUPS: str = "__ambrosia_strat" @@ -38,7 +38,7 @@ def fit(self, dataframe: types.SparkDataFrame, columns: Optional[Iterable[types. self.strats = {ab_abstract.EmptyStratValue.NO_STRATIFICATION: dataframe} return - window = Window.orderBy(*columns).partitionBy(spark_funcs.lit(EMPTY_VALUE)) + window = Window.orderBy(*columns).partitionBy(spark_funcs.lit(EMPTY_VALUE_PARTITION)) with_groups = dataframe.withColumn(STRAT_GROUPS, spark_funcs.dense_rank().over(window)) amount_of_strats: int = with_groups.select(spark_funcs.max(STRAT_GROUPS)).collect()[0][0] diff --git a/ambrosia/tester/handlers.py b/ambrosia/tester/handlers.py index 7d65c35..144bf0d 100644 --- a/ambrosia/tester/handlers.py +++ b/ambrosia/tester/handlers.py @@ -44,7 +44,7 @@ class PandasCriteria(enum.Enum): class SparkCriteria(enum.Enum): ttest: StatCriterion = spark_crit_pkg.TtestIndCriterionSpark - ttest_rel: StatCriterion = None # spark_crit_pkg.TtestRelativeCriterionSpark it's in development now + ttest_rel: StatCriterion = spark_crit_pkg.TtestRelativeCriterionSpark mw: StatCriterion = None wilcoxon: StatCriterion = None diff --git a/ambrosia/tools/configs.py b/ambrosia/tools/configs.py index 72730a2..e006df1 100644 --- a/ambrosia/tools/configs.py +++ b/ambrosia/tools/configs.py @@ -39,7 +39,7 @@ def get_all_enum_values(cls) -> tp.List[str]: @classmethod def raise_if_value_incorrect_enum(cls, value: tp.Any) -> None: if not cls.check_value_in_enum(value): - msg: str = f"Choose value from " + ", ".join(cls.get_all_enum_values()) + msg: str = f"Choose value from {', '.join(cls.get_all_enum_values())}, your value - {value}" raise ValueError(msg) diff --git a/ambrosia/tools/pvalue_tools.py b/ambrosia/tools/pvalue_tools.py index f9feb38..25c8220 100644 --- a/ambrosia/tools/pvalue_tools.py +++ b/ambrosia/tools/pvalue_tools.py @@ -114,9 +114,9 @@ def calculate_pvalue_by_delta_method( raise ValueError(f"Got unknown random variable transformation: {ADMISSIBLE_TRANSFORMATIONS}") if alternative == "less": - pvalue: float = sps.norm.cdf(statistic) - elif alternative == "greater": pvalue: float = sps.norm.sf(statistic) + elif alternative == "greater": + pvalue: float = sps.norm.cdf(statistic) elif alternative == "two-sided": pvalue: float = 2 * min(sps.norm.cdf(statistic), sps.norm.sf(statistic)) else: @@ -156,9 +156,9 @@ def choose_from_bounds( """ cond_many: bool = isinstance(left_ci, Iterable) amount: int = len(left_ci) if cond_many else 1 - if alternative == "greater": - right_ci = np.ones(amount) * right_bound if cond_many else right_bound if alternative == "less": + right_ci = np.ones(amount) * right_bound if cond_many else right_bound + if alternative == "greater": left_ci = np.ones(amount) * left_bound if cond_many else left_bound return left_ci, right_ci diff --git a/ambrosia/tools/stat_criteria.py b/ambrosia/tools/stat_criteria.py index 622904a..cdc4468 100644 --- a/ambrosia/tools/stat_criteria.py +++ b/ambrosia/tools/stat_criteria.py @@ -20,6 +20,7 @@ import ambrosia.tools.pvalue_tools as pvalue_pkg import ambrosia.tools.theoretical_tools as theory_pkg +import ambrosia.tools.type_checks as cast_pkg from ambrosia import types from ambrosia.tools.ab_abstract_component import ABStatCriterion, StatCriterion @@ -52,7 +53,7 @@ class TtestIndCriterion(ABStatCriterion): def calculate_pvalue(self, group_a: np.ndarray, group_b: np.ndarray, effect_type: str = "absolute", **kwargs): if effect_type == "absolute": - return sps.ttest_ind(a=group_b, b=group_a, equal_var=False, **kwargs).pvalue + return sps.ttest_ind(a=group_a, b=group_b, equal_var=False, **kwargs).pvalue elif effect_type == "relative": _, pvalue = theory_pkg.apply_delta_method(group_a, group_b, "fraction", **kwargs) return pvalue @@ -89,8 +90,7 @@ def calculate_conf_interval( effect_type: str = "absolute", **kwargs, ): - if isinstance(alpha, float): - alpha = np.array([alpha]) + alpha = cast_pkg.transform_alpha_np(alpha) if effect_type == "absolute": difference_estimation: float = group_b.mean() - group_a.mean() conf_intervals = self._build_intervals_absolute(difference_estimation, group_a, group_b, alpha, **kwargs) @@ -118,7 +118,30 @@ def get_results( return super().get_results(group_a, group_b, alpha, effect_type, **kwargs) -class TtestRelCriterion(ABStatCriterion): +class TtestRelHelpful: + def _build_intervals_absolute_from_stats( + self, + center: float, + sd_1: float, + n_obs: int, + alpha: types.StatErrorType = np.array([0.05]), + alternative: str = "two-sided", + ): + """ + Helps handle different alternatives and build confidence interval + for related sampels + """ + alpha_corrected: float = pvalue_pkg.corrected_alpha(alpha, alternative) + std_error = sd_1 / np.sqrt(n_obs) + quantiles = sps.t.ppf(1 - alpha_corrected / 2, df=n_obs - 1) + left_ci: float = center - quantiles * std_error + right_ci: float = center + quantiles * std_error + left_ci, right_ci = pvalue_pkg.choose_from_bounds(left_ci, right_ci, alternative) + conf_intervals = list(zip(left_ci, right_ci)) + return conf_intervals + + +class TtestRelCriterion(ABStatCriterion, TtestRelHelpful): """ Unit for relative paired T-test. """ @@ -127,7 +150,7 @@ class TtestRelCriterion(ABStatCriterion): def calculate_pvalue(self, group_a: np.ndarray, group_b: np.ndarray, effect_type: str = "absolute", **kwargs): if effect_type == "absolute": - return sps.ttest_rel(a=group_b, b=group_a, **kwargs).pvalue + return sps.ttest_rel(a=group_a, b=group_b, **kwargs).pvalue elif effect_type == "relative": _, pvalue = theory_pkg.apply_delta_method(group_a, group_b, "fraction", dependent=True, **kwargs) return pvalue @@ -149,14 +172,10 @@ def _build_intervals_absolute( Helps handle different alternatives and build confidence interval for related sampels """ - alpha_corrected: float = pvalue_pkg.corrected_alpha(alpha, alternative) - std_error = np.sqrt(np.var(group_b - group_a, ddof=1) / len(group_a)) - quantiles = sps.t.ppf(1 - alpha_corrected / 2, df=len(group_a) - 1) - left_ci: float = center - quantiles * std_error - right_ci: float = center + quantiles * std_error - left_ci, right_ci = pvalue_pkg.choose_from_bounds(left_ci, right_ci, alternative) - conf_intervals = list(zip(left_ci, right_ci)) - return conf_intervals + sd_1: float = np.sqrt(np.var(group_b - group_a, ddof=1)) + return self._build_intervals_absolute_from_stats( + center=center, sd_1=sd_1, n_obs=len(group_a), alpha=alpha, alternative=alternative + ) def calculate_conf_interval( self, @@ -166,8 +185,7 @@ def calculate_conf_interval( effect_type: str = "absolute", **kwargs, ): - if isinstance(alpha, float): - alpha = np.array([alpha]) + alpha = cast_pkg.transform_alpha_np(alpha) if effect_type == "absolute": difference_estimation: float = np.mean(group_b - group_a) conf_intervals = self._build_intervals_absolute(difference_estimation, group_a, group_b, alpha, **kwargs) diff --git a/ambrosia/tools/theoretical_tools.py b/ambrosia/tools/theoretical_tools.py index f44dfc3..976a777 100644 --- a/ambrosia/tools/theoretical_tools.py +++ b/ambrosia/tools/theoretical_tools.py @@ -34,6 +34,15 @@ ROUND_DIGITS_PERCENT: int = 1 +def switch_alternative(alternative: str) -> str: + Alternatives.raise_if_value_incorrect_enum(alternative) + if alternative == Alternatives.ts.value: + return alternative + if alternative == Alternatives.less.value: + return Alternatives.gr.value + return Alternatives.less.value + + def get_stats(values: Iterable[float], ddof: int = 1) -> Tuple[float, float]: """ Calculate the mean and standard value for a list of values. @@ -60,12 +69,13 @@ def check_encode_alternative(alternative: str) -> str: return statsmodels_alternatives_encoding[alternative] -def unbiased_to_sufficient(std: float, size: int) -> float: +def unbiased_to_sufficient(std: float, size: int, is_std: bool = True) -> float: """ Transforms unbiased estimation of standard deviation to sufficient (ddof = 1) => (ddof = 0) + If is_std = True, then transform std, else variance """ - return std * np.sqrt((size - 1) / size) + return std * np.sqrt((size - 1) / size) if is_std else std * (size - 1) / size def check_target_type( diff --git a/ambrosia/tools/type_checks.py b/ambrosia/tools/type_checks.py index d29a9b2..bdd7f1c 100644 --- a/ambrosia/tools/type_checks.py +++ b/ambrosia/tools/type_checks.py @@ -16,6 +16,7 @@ from typing import Iterable, List import pandas as pd +import numpy as np from ambrosia import types @@ -171,3 +172,11 @@ def check_norm_value(norm: str) -> None: raise ValueError(f'Choose correct norm, from {", ".join(norm_list)}') else: raise TypeError(f'norm variable must be a string and from{", ".join(norm_list)}') + + +def transform_alpha_np(alpha: types.StatErrorType) -> np.ndarray: + if isinstance(alpha, float): + alpha = [alpha] + if isinstance(alpha, Iterable): + alpha = np.array(alpha) + return alpha diff --git a/tests/conftest.py b/tests/conftest.py index d9c5953..9b8bfb5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -91,12 +91,19 @@ def results_ltv_retention_conversions() -> pd.DataFrame: @pytest.fixture() @pytest.mark.tester() -def tester_spark_ltv_ret(local_spark_session, results_ltv_retention_conversions): +def results_ltv_retention_conversions_spark(results_ltv_retention_conversions, local_spark_session): + table = local_spark_session.createDataFrame(results_ltv_retention_conversions) + return table + + +@pytest.fixture() +@pytest.mark.tester() +def tester_spark_ltv_ret(results_ltv_retention_conversions_spark): """ Spark tester """ - table = local_spark_session.createDataFrame(results_ltv_retention_conversions) - tester = Tester(dataframe=table, metrics=["retention", "conversions", "ltv"], column_groups="group") + tester = Tester(dataframe=results_ltv_retention_conversions_spark, + metrics=["retention", "conversions", "ltv"], column_groups="group") return tester diff --git a/tests/test_tester.py b/tests/test_tester.py index c4ba892..f09bceb 100644 --- a/tests/test_tester.py +++ b/tests/test_tester.py @@ -2,10 +2,13 @@ import numpy as np import pandas as pd +from pyspark.sql.functions import col +import scipy.stats as sps import pytest from ambrosia.tester import Tester, test from ambrosia.tools.stat_criteria import TtestIndCriterion, TtestRelCriterion +from ambrosia.spark_tools.stat_criteria import TtestRelativeCriterionSpark, TtestIndCriterionSpark def check_eq(a: float, b: float, eps: float = 1e-5) -> bool: @@ -212,7 +215,8 @@ def test_criteria_ttest_different(effect_type): ) != ttest_rel.calculate_conf_interval(group_a, group_b, effect_type=effect_type) -@pytest.mark.parametrize("criterion", ["ttest", "ttest_rel", "mw", "wilcoxon"]) +# Not mw and willcox, because exists x F(x) != G(x) easier than forall x F(x) < G(x) +@pytest.mark.parametrize("criterion", ["ttest", "ttest_rel"]) @pytest.mark.parametrize("metrics, alternative", [("retention", "greater"), ("conversions", "less"), ("ltv", "less")]) def test_kwargs_passing_theory(criterion, metrics, alternative, tester_on_ltv_retention): """ @@ -222,7 +226,7 @@ def test_kwargs_passing_theory(criterion, metrics, alternative, tester_on_ltv_re alternative_pvalue = tester_on_ltv_retention.run( criterion=criterion, metrics=metrics, as_table=False, alternative=alternative )[0]["pvalue"] - assert old_pvalue >= alternative_pvalue + assert old_pvalue <= alternative_pvalue @pytest.mark.parametrize("metrics, alternative", [("retention", "greater"), ("conversions", "less")]) @@ -244,7 +248,7 @@ def test_kwargs_passing_empiric(metrics, alternative, tester_on_ltv_retention): random_seed=random_seed, alternative=alternative, )[0]["pvalue"] - assert old_pvalue >= alternative_pvalue + assert old_pvalue <= alternative_pvalue @pytest.mark.parametrize("interval_type", ["yule", "yule_modif", "newcombe", "jeffrey", "agresti"]) @@ -285,10 +289,10 @@ def check_bound_intervals(int_center, int_less, int_gr, left_bound: float = -np. """ Check bound of intervals for different alternatives """ - assert int_less[0] == left_bound - assert int_gr[1] == right_bound - assert int_gr[0] > int_center[0] - assert int_less[1] < int_center[1] + assert int_less[1] == right_bound + assert int_gr[0] == left_bound + assert int_gr[1] < int_center[1] + assert int_less[0] > int_center[0] @pytest.mark.parametrize("effect_type", ["absolute"]) @@ -302,8 +306,8 @@ def test_alternative_change_binary(effect_type, interval_type, tester_on_ltv_ret ) # mean retention A - 0.303 # mean retention B - 0.399 - assert pvalue_less > pvalue_center - assert pvalue_center > pvalue_gr + assert pvalue_less < pvalue_center + assert pvalue_center < pvalue_gr # Check intervals check_bound_intervals(int_center, int_less, int_gr, -1, 1) @@ -323,8 +327,8 @@ def test_alternative_change_th(effect_type, criterion, tester_on_ltv_retention): as_table=False, ) # Mean(group_a) > Mean(group_b) in this table - assert pvalue_less < pvalue_center - assert pvalue_center < pvalue_gr + assert pvalue_less > pvalue_center + assert pvalue_center > pvalue_gr # Check intervals check_bound_intervals(int_center, int_less, int_gr) @@ -347,7 +351,7 @@ def test_spark_tester(tester_spark_ltv_ret, tester_on_ltv_retention, alternative @pytest.mark.parametrize("effect_type", ["absolute", "relative"]) -@pytest.mark.parametrize("alternative", ["two-sided", "greater"]) +@pytest.mark.parametrize("alternative", ["two-sided", "less"]) def test_paired_bootstrap(effect_type, alternative): """ Compare pvalues and confidence intervals between paired and regular bootstrap @@ -384,3 +388,70 @@ def test_paired_bootstrap(effect_type, alternative): ) assert test_results_dep[0]["pvalue"] < test_results_ind[0]["pvalue"] assert test_results_dep[0]["confidence_interval"][0] > test_results_ind[0]["confidence_interval"][0] + + +def _test_criteria(spark_criterion, pandas_criterion, a_sp, b_sp, a_gr, b_gr, effect_type, alternative, sps_method): + pvalue_sp = spark_criterion.calculate_pvalue(a_sp, b_sp, column="ltv", + effect_type=effect_type, alternative=alternative) + pvalue_pd = pandas_criterion.calculate_pvalue(a_gr, b_gr, effect_type=effect_type, alternative=alternative) + + assert check_eq(pvalue_sp, pvalue_pd) + + pvalue_sps = sps_method(a_gr, b_gr, alternative=alternative).pvalue + + if effect_type == "absolute": + assert check_eq(pvalue_sp, pvalue_sps) + + # pvalue consistency + assert (pvalue_sps - 0.5) * (pvalue_sp - 0.5) >=0 + + effect_sp = spark_criterion.calculate_effect(a_sp, b_sp, column="ltv", effect_type=effect_type) + effect_pd = pandas_criterion.calculate_effect(a_gr, b_gr, effect_type=effect_type) + assert check_eq(effect_sp, effect_pd) + + conf_sp = spark_criterion.calculate_conf_interval( + a_sp, b_sp, column="ltv", alpha=0.05, effect_type=effect_type, alternative=alternative + ) + conf_pd = pandas_criterion.calculate_conf_interval( + a_gr, b_gr, alpha=0.05, effect_type=effect_type, alternative=alternative + ) + + assert check_eq_int(conf_sp[0], conf_pd[0]) + + +def _get_groups(spark_data, pandas_data): + a_sp = spark_data.where(col("group") == 'A') + b_sp = spark_data.where(col("group") == 'B') + a_gr = pandas_data[pandas_data.group == 'A'].ltv.values + b_gr = pandas_data[pandas_data.group == 'B'].ltv.values + return a_sp, b_sp, a_gr, b_gr + + +@pytest.mark.parametrize("effect_type", ["absolute", "relative"]) +@pytest.mark.parametrize("alternative", ["two-sided", "greater", "less"]) +def test_ttest_ind_spark(results_ltv_retention_conversions, + results_ltv_retention_conversions_spark, + effect_type, alternative): + a_sp, b_sp, a_gr, b_gr = _get_groups( + results_ltv_retention_conversions_spark, results_ltv_retention_conversions + ) + + spark_criterion = TtestIndCriterionSpark(cache_parameters=True) + pandas_criterion = TtestIndCriterion() + + _test_criteria(spark_criterion, pandas_criterion, a_sp, b_sp, a_gr, b_gr, effect_type, alternative, sps.ttest_ind) + + +@pytest.mark.parametrize("effect_type", ["absolute", "relative"]) +@pytest.mark.parametrize("alternative", ["two-sided", "greater", "less"]) +def test_ttest_rel_spark(results_ltv_retention_conversions, + results_ltv_retention_conversions_spark, + effect_type, alternative): + a_sp, b_sp, a_gr, b_gr = _get_groups( + results_ltv_retention_conversions_spark, results_ltv_retention_conversions + ) + + spark_criterion = TtestRelativeCriterionSpark(cache_parameters=True) + pandas_criterion = TtestRelCriterion() + + _test_criteria(spark_criterion, pandas_criterion, a_sp, b_sp, a_gr, b_gr, effect_type, alternative, sps.ttest_rel)