From a4ae8a75068ab5d0e149b082f1352b74384e78c6 Mon Sep 17 00:00:00 2001 From: KX79WQ Date: Sat, 18 Apr 2020 14:39:31 +0200 Subject: [PATCH 1/2] Two speedups of the phik code - phik value of each variable-pair is calculated in parallel - code to calculate phik is twice as fast, number of scipy.stats.mvn.mvnun evaluations has been reduced with factor of two. Thanks to symmety. - Precision of brentq optimization has been slightly reduced. - Bump up version to 0.9.11, turn off test requirements in setup.py - Remove unused imports - Also allow large number of bins in phik calculation. --- README.rst | 2 +- python/phik/binning.py | 10 +++--- python/phik/bivariate.py | 55 ++++++++++++++++++++-------- python/phik/config.py | 4 +++ python/phik/data_quality.py | 1 - python/phik/phik.py | 65 ++++++++++++++++++++-------------- python/phik/resources.py | 3 -- python/phik/significance.py | 2 +- python/phik/simulation.py | 5 ++- python/phik/version.py | 4 +-- setup.py | 5 +-- tests/phik_python/test_phik.py | 7 ---- 12 files changed, 96 insertions(+), 67 deletions(-) create mode 100644 python/phik/config.py diff --git a/README.rst b/README.rst index bbefc5f..ea3376d 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,7 @@ Phi_K Correlation Analyzer Library ================================== -* Version: 0.9.10. Released: Feb 2020 +* Version: 0.9.11. Released: April 2020 * Documentation: https://phik.readthedocs.io * Repository: https://github.com/kaveio/phik * Publication: https://arxiv.org/abs/1811.11440 diff --git a/python/phik/binning.py b/python/phik/binning.py index f082ed3..cd73b82 100644 --- a/python/phik/binning.py +++ b/python/phik/binning.py @@ -64,9 +64,7 @@ def bin_array(arr, bin_edges): bin_labels = [] bin_indices = pd.Series(binned_arr).value_counts().index for i in range(1, len(bin_edges)): - if i not in bin_indices: - warnings.warn('Empty bin with bin-edges {0:s} - {1:s}'.format(str(bin_edges[i-1]), str(bin_edges[i]))) - else: + if i in bin_indices: bin_labels.append((bin_edges[i-1], bin_edges[i])) # NaN values are added to the overflow bin. Restore NaN values: @@ -175,11 +173,11 @@ def hist2d_from_rebinned_df(data_binned:pd.DataFrame, dropna:bool=True, drop_und assert len(data_binned.columns) == 2, 'DataFrame should contain only two columns' if not dropna: - data_binned = data_binned.fillna(defs.NaN).copy() + data_binned.fillna(defs.NaN, inplace=True) if drop_underflow: - data_binned = data_binned.replace(defs.UF, np.nan).copy() + data_binned.replace(defs.UF, np.nan, inplace=True) if drop_overflow: - data_binned = data_binned.replace(defs.OF, np.nan).copy() + data_binned.replace(defs.OF, np.nan, inplace=True) # create a contingency table c0, c1 = data_binned.columns diff --git a/python/phik/bivariate.py b/python/phik/bivariate.py index 9d7f937..a185195 100644 --- a/python/phik/bivariate.py +++ b/python/phik/bivariate.py @@ -18,6 +18,8 @@ import numpy as np from scipy.stats import mvn from scipy import optimize +# from joblib import Parallel, delayed +# from phik.config import ncores as NCORES import warnings @@ -33,8 +35,8 @@ def _mvn_un(rho: float, lower: tuple, upper: tuple) -> float: :returns float: integral value ''' mu = np.array([0., 0.]) - S = np.array([[1.,rho],[rho,1.0]]) - p,i = mvn.mvnun(lower,upper,mu,S) + S = np.array([[1., rho], [rho, 1.]]) + p, i = mvn.mvnun(lower, upper, mu, S) return p @@ -48,16 +50,40 @@ def _mvn_array(rho: float, sx: np.ndarray, sy: np.ndarray) -> list: :param np.ndarray sy: bin edges array of y-axis :returns list: list of integral values ''' - corr = [] - for i in range(len(sx)-1): - for j in range(len(sy)-1): - lower = [sx[i],sy[j]] - upper = [sx[i+1],sy[j+1]] - p = _mvn_un(rho,lower,upper) - corr.append(p) + # ranges = [([sx[i], sy[j]], [sx[i+1], sy[j+1]]) for i in range(len(sx) - 1) for j in range(len(sy) - 1)] + # corr = [mvn.mvnun(lower, upper, mu, S)[0] for lower, upper in ranges] + # return corr + + # mean and covariance + mu = np.array([0., 0.]) + S = np.array([[1., rho], [rho, 1.]]) + + # add half block, which is symmetric in x + odd_odd = False + ranges = [([sx[i], sy[j]], [sx[i+1], sy[j+1]]) for i in range((len(sx)-1)//2) for j in range(len(sy) - 1)] + # add odd middle row, which is symmetric in y + if (len(sx)-1) % 2 == 1: + i = (len(sx)-1)//2 + ranges += [([sx[i], sy[j]], [sx[i+1], sy[j+1]]) for j in range((len(sy)-1)//2)] + # add center point, add this only once + if (len(sy)-1) % 2 == 1: + j = (len(sy)-1)//2 + ranges.append(([sx[i], sy[j]], [sx[i+1], sy[j+1]])) + odd_odd = True + + corr = [_calc_mvnun(lower, upper, mu, S) for lower, upper in ranges] + # corr = Parallel(n_jobs=NCORES, prefer="threads")(delayed(_calc_mvnun)(lower, upper, mu, S) for lower, upper in ranges) + + # add second half, exclude center + corr += corr if not odd_odd else corr[:-1] + return corr +def _calc_mvnun(lower, upper, mu, S): + return mvn.mvnun(lower, upper, mu, S)[0] + + def bivariate_normal_theory(rho: float, nx:int=-1, ny:int=-1, n:int=1, sx:np.ndarray=None, sy:np.ndarray=None) -> np.ndarray: """Return binned pdf of bivariate normal distribution. @@ -171,18 +197,19 @@ def phik_from_chi2(chi2:float, n:int, nx:int, ny:int, sx:np.ndarray=None, sy:np. # scale ensures that for rho=1, chi2 is the maximum possible value corr1 = _mvn_array(1, sx, sy) if 0 in corr0 and len(corr0)>10000: - warnings.warn('Too many unique values. Are you sure that interval variables are set correctly?') - return np.nan + warnings.warn('Many cells: {0:d}. Are interval variables set correctly?'.format(len(corr0))) chi2_one = n * sum([((c1-c0)*(c1-c0)) / c0 for c0,c1 in zip(corr0,corr1)]) chi2_max = n * min(nx-1, ny-1) scale = (chi2_max - pedestal) / chi2_one - if chi2_max < chi2 and np.isclose(chi2, chi2_max, atol=1E-14): + if chi2 > chi2_max and np.isclose(chi2, chi2_max, atol=1E-14): chi2 = chi2_max # only solve for rho if chi2 exceeds noise pedestal if chi2 <= pedestal: - return 0 + return 0. + if chi2 >= chi2_max: + return 1. - rho = optimize.brentq(chi2_from_phik, 0, 1, args=(n, chi2, corr0, scale, sx, sy, pedestal)) + rho = optimize.brentq(chi2_from_phik, 0, 1, args=(n, chi2, corr0, scale, sx, sy, pedestal), xtol=1e-5) return rho diff --git a/python/phik/config.py b/python/phik/config.py new file mode 100644 index 0000000..864846f --- /dev/null +++ b/python/phik/config.py @@ -0,0 +1,4 @@ +import multiprocessing + +# number of cores to use in parallel processing +ncores = multiprocessing.cpu_count() diff --git a/python/phik/data_quality.py b/python/phik/data_quality.py index eab5607..a97547e 100644 --- a/python/phik/data_quality.py +++ b/python/phik/data_quality.py @@ -15,7 +15,6 @@ import warnings import copy -import pandas as pd def dq_check_nunique_values(df, interval_cols, dropna=True): diff --git a/python/phik/phik.py b/python/phik/phik.py index f4aaf28..3dcbdd7 100644 --- a/python/phik/phik.py +++ b/python/phik/phik.py @@ -16,7 +16,8 @@ import numpy as np import itertools import pandas as pd -import warnings +from joblib import Parallel, delayed +from phik.config import ncores as NCORES from phik import definitions as defs from .bivariate import phik_from_chi2 @@ -83,37 +84,47 @@ def phik_from_rebinned_df(data_binned:pd.DataFrame, noise_correction:bool=True, if not dropna: # if not dropna replace the NaN values with the string NaN. Otherwise the rows with NaN are dropped # by the groupby. - data_binned = data_binned.replace(np.nan, defs.NaN).copy() + data_binned.replace(np.nan, defs.NaN, inplace=True) if drop_underflow: - data_binned = data_binned.replace(defs.UF, np.nan).copy() + data_binned.replace(defs.UF, np.nan, inplace=True) if drop_overflow: - data_binned = data_binned.replace(defs.OF, np.nan).copy() - - - phiks = {} - for i, comb in enumerate(itertools.combinations_with_replacement(data_binned.columns.values, 2)): - c0, c1 = comb - if c0 == c1: - phiks[':'.join(comb)] = 1 - continue - datahist = data_binned.groupby([c0, c1])[c0].count().to_frame().unstack().fillna(0) - - # If 0 or only 1 values for one of the two variables, it is not possible to calculate phik. - # This check needs to be done after creation of OF, UF and NaN bins. - if 1 in datahist.shape or 0 in datahist.shape: - phiks[':'.join(comb)] = np.nan - warnings.warn('Too few unique values for variable {0:s} ({1:d}) or {2:s} ({3:d}) to calculate phik'.format( - c0, datahist.shape[0], c1, datahist.shape[1])) - continue - - datahist.columns = datahist.columns.droplevel() - phikvalue = phik_from_hist2d(datahist.values, noise_correction=noise_correction) - phiks[':'.join(comb)] = phikvalue - - phik_overview = create_correlation_overview_table(phiks) + data_binned.replace(defs.OF, np.nan, inplace=True) + + # phik_list = [_calc_phik(co, data_binned[list(co)], noise_correction) + # for co in itertools.combinations_with_replacement(data_binned.columns.values, 2)] + + phik_list = Parallel(n_jobs=NCORES)(delayed(_calc_phik)(co, data_binned[list(co)], noise_correction) + for co in itertools.combinations_with_replacement(data_binned.columns.values, 2)) + + phik_overview = create_correlation_overview_table(dict(phik_list)) return phik_overview +def _calc_phik(comb, data_binned, noise_correction): + """Split off calculation of phik for parallel processing + + :param tuple comb: union of two string columns + :param pd.DataFrame data_binned: input data where interval variables have been binned + :param bool noise_correction: apply noise correction in phik calculation + :return: + """ + c0, c1 = comb + combi = ':'.join(comb) + if c0 == c1: + return (combi, 1.0) + + datahist = data_binned.groupby([c0, c1])[c0].count().to_frame().unstack().fillna(0) + + # If 0 or only 1 values for one of the two variables, it is not possible to calculate phik. + # This check needs to be done after creation of OF, UF and NaN bins. + if any([v in datahist.shape for v in [0, 1]]): + return (combi, np.nan) + + datahist.columns = datahist.columns.droplevel() + phikvalue = phik_from_hist2d(datahist.values, noise_correction=noise_correction) + return (combi, phikvalue) + + def phik_matrix(df:pd.DataFrame, interval_cols:list=None, bins=10, quantile:bool=False, noise_correction:bool=True, dropna:bool=True, drop_underflow:bool=True, drop_overflow:bool=True) -> pd.DataFrame: """ diff --git a/python/phik/resources.py b/python/phik/resources.py index 5844fd6..e6ca1f6 100644 --- a/python/phik/resources.py +++ b/python/phik/resources.py @@ -15,10 +15,7 @@ """ import pathlib -import sys - from pkg_resources import resource_filename - import phik # Fixtures diff --git a/python/phik/significance.py b/python/phik/significance.py index 1eb2425..4055f6e 100644 --- a/python/phik/significance.py +++ b/python/phik/significance.py @@ -27,7 +27,7 @@ from phik import definitions as defs from .binning import bin_data, create_correlation_overview_table from .statistics import get_chi2_using_dependent_frequency_estimates -from .statistics import estimate_ndof, estimate_simple_ndof, theoretical_ndof +from .statistics import estimate_ndof, theoretical_ndof from .simulation import sim_chi2_distribution from .data_quality import dq_check_nunique_values, dq_check_hist2d diff --git a/python/phik/simulation.py b/python/phik/simulation.py index ed1c7c8..aa51b62 100644 --- a/python/phik/simulation.py +++ b/python/phik/simulation.py @@ -18,8 +18,8 @@ import pandas as pd from numba import jit from joblib import Parallel, delayed -import multiprocessing +from phik.config import ncores as NCORES from .statistics import get_dependent_frequency_estimates from .statistics import get_chi2_using_dependent_frequency_estimates @@ -326,8 +326,7 @@ def sim_chi2_distribution(values, nsim:int=1000, lambda_:str='log-likelihood', s exp_dep = get_dependent_frequency_estimates(values) - num_cores = multiprocessing.cpu_count() - chi2s = Parallel(n_jobs=num_cores)(delayed(simulate)(exp_dep, simulation_method, lambda_) for i in range(nsim)) + chi2s = Parallel(n_jobs=NCORES)(delayed(simulate)(exp_dep, simulation_method, lambda_) for i in range(nsim)) return chi2s diff --git a/python/phik/version.py b/python/phik/version.py index 63737e3..48aa262 100644 --- a/python/phik/version.py +++ b/python/phik/version.py @@ -1,6 +1,6 @@ """THIS FILE IS AUTO-GENERATED BY PHIK SETUP.PY.""" name = 'phik' -version = '0.9.10' -full_version = '0.9.10' +version = '0.9.11' +full_version = '0.9.11' release = True diff --git a/setup.py b/setup.py index 3b06508..1583c28 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ MAJOR = 0 REVISION = 9 -PATCH = 10 +PATCH = 11 DEV = False # note: also update README.rst @@ -45,7 +45,8 @@ 'joblib>=0.14.1' ] -REQUIREMENTS = REQUIREMENTS + TEST_REQUIREMENTS +if DEV: + REQUIREMENTS += TEST_REQUIREMENTS CMD_CLASS = dict() COMMAND_OPTIONS = dict() diff --git a/tests/phik_python/test_phik.py b/tests/phik_python/test_phik.py index e95f436..59cbf4b 100644 --- a/tests/phik_python/test_phik.py +++ b/tests/phik_python/test_phik.py @@ -15,7 +15,6 @@ """ import unittest -import unittest.mock as mock import pytest @@ -40,7 +39,6 @@ def test_phik_matrix(self): import numpy as np import pandas as pd - import phik from phik import resources # open fake car insurance data @@ -58,7 +56,6 @@ def test_global_phik(self): import numpy as np import pandas as pd - import phik from phik import resources # open fake car insurance data @@ -76,7 +73,6 @@ def test_significance_matrix(self): import numpy as np import pandas as pd - import phik from phik import resources # open fake car insurance data @@ -92,9 +88,7 @@ def test_significance_matrix(self): def test_hist2d(self): """Test the calculation of global Phi_K values""" - import numpy as np import pandas as pd - import phik from phik import resources # open fake car insurance data @@ -113,7 +107,6 @@ def test_outlier_significance_matrix(self): import numpy as np import pandas as pd - import phik from phik import resources # open fake car insurance data From 1ae2642579f5716a6b1f1c896c2271b741cbbcb7 Mon Sep 17 00:00:00 2001 From: KX79WQ Date: Sat, 18 Apr 2020 20:26:28 +0200 Subject: [PATCH 2/2] Added example notebook and helper function to calculate phik matrix with spark - Updated spark instructions in README --- README.rst | 2 + docs/source/tutorials.rst | 4 +- .../phik/notebooks/phik_tutorial_spark.ipynb | 195 ++++++++++++++++++ python/phik/phik.py | 50 +++++ 4 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 python/phik/notebooks/phik_tutorial_spark.ipynb diff --git a/README.rst b/README.rst index ea3376d..f266e96 100644 --- a/README.rst +++ b/README.rst @@ -25,6 +25,8 @@ Documentation ============= The entire Phi_K documentation including tutorials can be found at `read-the-docs `_. +See the tutorials for detailed examples on how to run the code with pandas. We also have one example on how +calculate the Phi_K correlation matrix for a spark dataframe. Check it out diff --git a/docs/source/tutorials.rst b/docs/source/tutorials.rst index 892f3e2..86a9185 100644 --- a/docs/source/tutorials.rst +++ b/docs/source/tutorials.rst @@ -10,9 +10,9 @@ The tutorials are available in the ``python/phik/notebooks`` directory. We have: * A basic tutorial: this covers the basics of calculating Phi_K, the statistical significance, and interpreting the correlation. * An advanced tutorial: this shows how to use the advanced features of the ``PhiK`` library. +* A spark tutorial: this shows how to calculate the Phi_K correlation matrix for a spark dataframe. You can open these notebooks directly: * Run them interactively at `MyBinder `_. -* View them statically: `basic tutorial `_ and `advanced tutorial `_. - +* View them statically: `basic tutorial `_ and the `advanced tutorial `_ and the `spark tutorial `_. diff --git a/python/phik/notebooks/phik_tutorial_spark.ipynb b/python/phik/notebooks/phik_tutorial_spark.ipynb new file mode 100644 index 0000000..5d67cc5 --- /dev/null +++ b/python/phik/notebooks/phik_tutorial_spark.ipynb @@ -0,0 +1,195 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Phi_K spark tutorial\n", + "\n", + "This notebook shows you how to obtain the Phi_K correlation matrix for a spark dataframe.\n", + "Calculating the Phi_K matrix consists of two steps:\n", + "\n", + "- Obtain the 2d contingency tables for all variable pairs. To make these we use the `popmon` package, which relies on the `spark histogrammar` package.\n", + "- Calculate the Phi_K value for each variable pair from its contingency table.\n", + "\n", + "Make sure you install the popmon package to make the 2d histograms, that are then used to calculate phik." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "!pip install popmon" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import itertools\n", + "import pandas as pd\n", + "import phik\n", + "from phik import resources\n", + "from phik.phik import spark_phik_matrix_from_hist2d_dict\n", + "import popmon\n", + "from popmon.analysis.hist_numpy import get_2dgrid\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# histogramming in popmon is done using the histogrammar library" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.config('spark.jars.packages','org.diana-hep:histogrammar-sparksql_2.11:1.0.4').getOrCreate()\n", + "sc = spark.sparkContext" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load data\n", + "\n", + "A simulated dataset is part of the phik-package. The dataset concerns fake car insurance data. Load the dataset here:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data = pd.read_csv( resources.fixture('fake_insurance_data.csv.gz') )\n", + "sdf = spark.createDataFrame(data)\n", + "sdf.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "combis = itertools.combinations_with_replacement(sdf.columns, 2)\n", + "combis = [list(c) for c in combis]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print (combis)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# step 1: create histograms (this runs spark histogrammar in the background)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# see the doc-string of pm_make_histograms() for binning options.\n", + "hists = sdf.pm_make_histograms(combis)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "grids = {k:get_2dgrid(h) for k,h in hists.items()}\n", + "print (grids)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# we can store the histograms if we want to\n", + "if False:\n", + " import pickle\n", + "\n", + " with open('grids.pkl', 'wb') as outfile:\n", + " pickle.dump(grids, outfile)\n", + "\n", + " with open('grids.pkl', 'rb') as handle:\n", + " grids = pickle.load(handle)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# step 2: calculate phik matrix (runs rdd parallellization over all 2d histograms)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "phik_matrix = spark_phik_matrix_from_hist2d_dict(sc, grids)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "phik_matrix" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/python/phik/phik.py b/python/phik/phik.py index 3dcbdd7..501f409 100644 --- a/python/phik/phik.py +++ b/python/phik/phik.py @@ -26,6 +26,56 @@ from .data_quality import dq_check_nunique_values, dq_check_hist2d +def spark_phik_matrix_from_hist2d_dict(spark_context, hist_dict): + """Correlation matrix of bivariate gaussian using spark parallellization over variable-pair 2d histograms + + See spark notebook phik_tutorial_spark.ipynb as example. + + Each input histogram gets converted into correlation coefficient of bivariate gauss + with correlation value rho, assuming giving binning and number of records. + Correlation coefficient value is between 0 and 1. + + :param spark_context: spark context + :param hist_dict: dict of 2d numpy grids with value-counts. keys are histogram names. + :return: phik correlation matrix + """ + if not isinstance(hist_dict, dict): + raise TypeError('hist_dict should be a dictionary') + for k, v in hist_dict.items(): + if not isinstance(v, np.ndarray): + raise TypeError('hist_dict should be a dictionary of 2d numpy arrays.') + hist_list = list(hist_dict.items()) + hist_rdd = spark_context.parallelize(hist_list) + phik_rdd = hist_rdd.map(_phik_from_row) + phik_list = phik_rdd.collect() + phik_overview = create_correlation_overview_table(dict(phik_list)) + return phik_overview + + +def _phik_from_row(row): + """Helper function for spark parallel processing + + :param row: rdd row, where row[0] is key and rdd[1] + :return: union of key, phik-value + """ + if not len(row) >= 2: + raise RuntimeError('row should have at least two elements.') + key = row[0] + grid = row[1] + if not isinstance(key, str): + raise TypeError('key is not a string.') + if not isinstance(grid, np.ndarray): + raise TypeError('grid is not a numpy array.') + c = key.split(':') + if len(c) == 2 and c[0] == c[1]: + return key, 1.0 + try: + phik_value = phik_from_hist2d(grid) + except: + phik_value = None + return key, phik_value + + def phik_from_hist2d(observed:np.ndarray, noise_correction:bool=True) -> float: """ correlation coefficient of bivariate gaussian derived from chi2-value