Skip to content

Commit

Permalink
Merge pull request #9 from KaveIO/speedups
Browse files Browse the repository at this point in the history
Speedups + spark example
  • Loading branch information
mbaak authored Apr 18, 2020
2 parents 3f51b85 + 1ae2642 commit 9dfdf2b
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 69 deletions.
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Phi_K Correlation Analyzer Library
==================================

* Version: 0.9.10. Released: Feb 2020
* Version: 0.9.11. Released: April 2020
* Documentation: https://phik.readthedocs.io
* Repository: https://github.com/kaveio/phik
* Publication: https://arxiv.org/abs/1811.11440
Expand All @@ -25,6 +25,8 @@ Documentation
=============

The entire Phi_K documentation including tutorials can be found at `read-the-docs <https://phik.readthedocs.io>`_.
See the tutorials for detailed examples on how to run the code with pandas. We also have one example on how
calculate the Phi_K correlation matrix for a spark dataframe.


Check it out
Expand Down
4 changes: 2 additions & 2 deletions docs/source/tutorials.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ The tutorials are available in the ``python/phik/notebooks`` directory. We have:

* A basic tutorial: this covers the basics of calculating Phi_K, the statistical significance, and interpreting the correlation.
* An advanced tutorial: this shows how to use the advanced features of the ``PhiK`` library.
* A spark tutorial: this shows how to calculate the Phi_K correlation matrix for a spark dataframe.

You can open these notebooks directly:

* Run them interactively at `MyBinder <https://mybinder.org/v2/gh/KaveIO/PhiK/master?filepath=python%2Fphik%2Fnotebooks>`_.
* View them statically: `basic tutorial <http://nbviewer.ipython.org/urls/raw.github.com/kaveio/phik/master/python/phik/notebooks/phik_tutorial_basic.ipynb>`_ and `advanced tutorial <http://nbviewer.ipython.org/urls/raw.github.com/kaveio/phik/master/python/phik/notebooks/phik_tutorial_advanced.ipynb>`_.

* View them statically: `basic tutorial <http://nbviewer.ipython.org/urls/raw.github.com/kaveio/phik/master/python/phik/notebooks/phik_tutorial_basic.ipynb>`_ and the `advanced tutorial <http://nbviewer.ipython.org/urls/raw.github.com/kaveio/phik/master/python/phik/notebooks/phik_tutorial_advanced.ipynb>`_ and the `spark tutorial <http://nbviewer.ipython.org/urls/raw.github.com/kaveio/phik/master/python/phik/notebooks/phik_tutorial_spark.ipynb>`_.
10 changes: 4 additions & 6 deletions python/phik/binning.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def bin_array(arr, bin_edges):
bin_labels = []
bin_indices = pd.Series(binned_arr).value_counts().index
for i in range(1, len(bin_edges)):
if i not in bin_indices:
warnings.warn('Empty bin with bin-edges {0:s} - {1:s}'.format(str(bin_edges[i-1]), str(bin_edges[i])))
else:
if i in bin_indices:
bin_labels.append((bin_edges[i-1], bin_edges[i]))

# NaN values are added to the overflow bin. Restore NaN values:
Expand Down Expand Up @@ -175,11 +173,11 @@ def hist2d_from_rebinned_df(data_binned:pd.DataFrame, dropna:bool=True, drop_und
assert len(data_binned.columns) == 2, 'DataFrame should contain only two columns'

if not dropna:
data_binned = data_binned.fillna(defs.NaN).copy()
data_binned.fillna(defs.NaN, inplace=True)
if drop_underflow:
data_binned = data_binned.replace(defs.UF, np.nan).copy()
data_binned.replace(defs.UF, np.nan, inplace=True)
if drop_overflow:
data_binned = data_binned.replace(defs.OF, np.nan).copy()
data_binned.replace(defs.OF, np.nan, inplace=True)

# create a contingency table
c0, c1 = data_binned.columns
Expand Down
55 changes: 41 additions & 14 deletions python/phik/bivariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import numpy as np
from scipy.stats import mvn
from scipy import optimize
# from joblib import Parallel, delayed
# from phik.config import ncores as NCORES

import warnings

Expand All @@ -33,8 +35,8 @@ def _mvn_un(rho: float, lower: tuple, upper: tuple) -> float:
:returns float: integral value
'''
mu = np.array([0., 0.])
S = np.array([[1.,rho],[rho,1.0]])
p,i = mvn.mvnun(lower,upper,mu,S)
S = np.array([[1., rho], [rho, 1.]])
p, i = mvn.mvnun(lower, upper, mu, S)
return p


Expand All @@ -48,16 +50,40 @@ def _mvn_array(rho: float, sx: np.ndarray, sy: np.ndarray) -> list:
:param np.ndarray sy: bin edges array of y-axis
:returns list: list of integral values
'''
corr = []
for i in range(len(sx)-1):
for j in range(len(sy)-1):
lower = [sx[i],sy[j]]
upper = [sx[i+1],sy[j+1]]
p = _mvn_un(rho,lower,upper)
corr.append(p)
# ranges = [([sx[i], sy[j]], [sx[i+1], sy[j+1]]) for i in range(len(sx) - 1) for j in range(len(sy) - 1)]
# corr = [mvn.mvnun(lower, upper, mu, S)[0] for lower, upper in ranges]
# return corr

# mean and covariance
mu = np.array([0., 0.])
S = np.array([[1., rho], [rho, 1.]])

# add half block, which is symmetric in x
odd_odd = False
ranges = [([sx[i], sy[j]], [sx[i+1], sy[j+1]]) for i in range((len(sx)-1)//2) for j in range(len(sy) - 1)]
# add odd middle row, which is symmetric in y
if (len(sx)-1) % 2 == 1:
i = (len(sx)-1)//2
ranges += [([sx[i], sy[j]], [sx[i+1], sy[j+1]]) for j in range((len(sy)-1)//2)]
# add center point, add this only once
if (len(sy)-1) % 2 == 1:
j = (len(sy)-1)//2
ranges.append(([sx[i], sy[j]], [sx[i+1], sy[j+1]]))
odd_odd = True

corr = [_calc_mvnun(lower, upper, mu, S) for lower, upper in ranges]
# corr = Parallel(n_jobs=NCORES, prefer="threads")(delayed(_calc_mvnun)(lower, upper, mu, S) for lower, upper in ranges)

# add second half, exclude center
corr += corr if not odd_odd else corr[:-1]

return corr


def _calc_mvnun(lower, upper, mu, S):
return mvn.mvnun(lower, upper, mu, S)[0]


def bivariate_normal_theory(rho: float, nx:int=-1, ny:int=-1, n:int=1,
sx:np.ndarray=None, sy:np.ndarray=None) -> np.ndarray:
"""Return binned pdf of bivariate normal distribution.
Expand Down Expand Up @@ -171,18 +197,19 @@ def phik_from_chi2(chi2:float, n:int, nx:int, ny:int, sx:np.ndarray=None, sy:np.
# scale ensures that for rho=1, chi2 is the maximum possible value
corr1 = _mvn_array(1, sx, sy)
if 0 in corr0 and len(corr0)>10000:
warnings.warn('Too many unique values. Are you sure that interval variables are set correctly?')
return np.nan
warnings.warn('Many cells: {0:d}. Are interval variables set correctly?'.format(len(corr0)))

chi2_one = n * sum([((c1-c0)*(c1-c0)) / c0 for c0,c1 in zip(corr0,corr1)])
chi2_max = n * min(nx-1, ny-1)
scale = (chi2_max - pedestal) / chi2_one
if chi2_max < chi2 and np.isclose(chi2, chi2_max, atol=1E-14):
if chi2 > chi2_max and np.isclose(chi2, chi2_max, atol=1E-14):
chi2 = chi2_max

# only solve for rho if chi2 exceeds noise pedestal
if chi2 <= pedestal:
return 0
return 0.
if chi2 >= chi2_max:
return 1.

rho = optimize.brentq(chi2_from_phik, 0, 1, args=(n, chi2, corr0, scale, sx, sy, pedestal))
rho = optimize.brentq(chi2_from_phik, 0, 1, args=(n, chi2, corr0, scale, sx, sy, pedestal), xtol=1e-5)
return rho
4 changes: 4 additions & 0 deletions python/phik/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import multiprocessing

# number of cores to use in parallel processing
ncores = multiprocessing.cpu_count()
1 change: 0 additions & 1 deletion python/phik/data_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import warnings
import copy
import pandas as pd


def dq_check_nunique_values(df, interval_cols, dropna=True):
Expand Down
195 changes: 195 additions & 0 deletions python/phik/notebooks/phik_tutorial_spark.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Phi_K spark tutorial\n",
"\n",
"This notebook shows you how to obtain the Phi_K correlation matrix for a spark dataframe.\n",
"Calculating the Phi_K matrix consists of two steps:\n",
"\n",
"- Obtain the 2d contingency tables for all variable pairs. To make these we use the `popmon` package, which relies on the `spark histogrammar` package.\n",
"- Calculate the Phi_K value for each variable pair from its contingency table.\n",
"\n",
"Make sure you install the popmon package to make the 2d histograms, that are then used to calculate phik."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"!pip install popmon"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import itertools\n",
"import pandas as pd\n",
"import phik\n",
"from phik import resources\n",
"from phik.phik import spark_phik_matrix_from_hist2d_dict\n",
"import popmon\n",
"from popmon.analysis.hist_numpy import get_2dgrid\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# histogramming in popmon is done using the histogrammar library"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.config('spark.jars.packages','org.diana-hep:histogrammar-sparksql_2.11:1.0.4').getOrCreate()\n",
"sc = spark.sparkContext"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load data\n",
"\n",
"A simulated dataset is part of the phik-package. The dataset concerns fake car insurance data. Load the dataset here:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data = pd.read_csv( resources.fixture('fake_insurance_data.csv.gz') )\n",
"sdf = spark.createDataFrame(data)\n",
"sdf.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"combis = itertools.combinations_with_replacement(sdf.columns, 2)\n",
"combis = [list(c) for c in combis]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print (combis)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# step 1: create histograms (this runs spark histogrammar in the background)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# see the doc-string of pm_make_histograms() for binning options.\n",
"hists = sdf.pm_make_histograms(combis)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grids = {k:get_2dgrid(h) for k,h in hists.items()}\n",
"print (grids)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# we can store the histograms if we want to\n",
"if False:\n",
" import pickle\n",
"\n",
" with open('grids.pkl', 'wb') as outfile:\n",
" pickle.dump(grids, outfile)\n",
"\n",
" with open('grids.pkl', 'rb') as handle:\n",
" grids = pickle.load(handle)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# step 2: calculate phik matrix (runs rdd parallellization over all 2d histograms)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"phik_matrix = spark_phik_matrix_from_hist2d_dict(sc, grids)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"phik_matrix"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit 9dfdf2b

Please sign in to comment.