Skip to content

Commit

Permalink
Merge pull request #264 from pymc-labs/kink
Browse files Browse the repository at this point in the history
Add ability to analyse Regression Kink Analysis Designs
  • Loading branch information
drbenvincent authored Nov 9, 2023
2 parents 7ebab10 + 4386f6b commit 24ad46e
Show file tree
Hide file tree
Showing 15 changed files with 3,404 additions and 15 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repos:
exclude_types: [svg]
- id: check-yaml
- id: check-added-large-files
args: ['--maxkb=1500']
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.1.4
hooks:
Expand Down
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,25 @@ Regression discontinuity designs are used when treatment is applied to units acc

> The data, model fit, and counterfactual are plotted (top). Frequentist analysis shows the causal impact with the blue shaded region, but this is not shown in the Bayesian analysis to avoid a cluttered chart. Instead, the Bayesian analysis shows shaded Bayesian credible regions of the model fits. The Frequentist analysis visualises the point estimate of the causal impact, but the Bayesian analysis also plots the posterior distribution of the regression discontinuity effect (bottom).
### Regression kink designs

Regression discontinuity designs are used when treatment is applied to units according to a cutoff on a running variable, which is typically not time. By looking for the presence of a discontinuity at the precise point of the treatment cutoff then we can make causal claims about the potential impact of the treatment.

| Running variable | Outcome |
|-----------|-----------|
| $x_0$ | $y_0$ |
| $x_1$ | $y_0$ |
| $\ldots$ | $\ldots$ |
| $x_{N-1}$ | $y_{N-1}$ |
| $x_N$ | $y_N$ |


| Frequentist | Bayesian |
|--|--|
| coming soon | ![](docs/source/_static/regression_kink_pymc.svg) |

> The data and model fit. The Bayesian analysis shows the posterior mean with credible intervals (shaded regions). We also report the Bayesian $R^2$ on the data along with the posterior mean and credible intervals of the change in gradient at the kink point.
### Interrupted time series

Interrupted time series analysis is appropriate when you have a time series of observations which undergo treatment at a particular point in time. This kind of analysis has no control group and looks for the presence of a change in the outcome measure at or soon after the treatment time. Multiple predictors can be included.
Expand Down
230 changes: 222 additions & 8 deletions causalpy/pymc_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""

import warnings
from typing import Optional, Union
from typing import Union

import arviz as az
import matplotlib.pyplot as plt
Expand Down Expand Up @@ -328,7 +328,7 @@ def plot(self, counterfactual_label="Counterfactual", **kwargs):
fontsize=LEGEND_FONT_SIZE,
)

return (fig, ax)
return fig, ax

def summary(self) -> None:
"""
Expand Down Expand Up @@ -424,7 +424,7 @@ def plot(self, plot_predictors=False, **kwargs):
ax[0].plot(
self.datapost.index, self.post_X, "-", c=[0.8, 0.8, 0.8], zorder=1
)
return (fig, ax)
return fig, ax


class DifferenceInDifferences(ExperimentalDesign):
Expand Down Expand Up @@ -794,7 +794,7 @@ def __init__(
model=None,
running_variable_name: str = "x",
epsilon: float = 0.001,
bandwidth: Optional[float] = None,
bandwidth: float = np.inf,
**kwargs,
):
super().__init__(model=model, **kwargs)
Expand All @@ -807,7 +807,7 @@ def __init__(
self.bandwidth = bandwidth
self._input_validation()

if self.bandwidth is not None:
if self.bandwidth is not np.inf:
fmin = self.treatment_threshold - self.bandwidth
fmax = self.treatment_threshold + self.bandwidth
filtered_data = self.data.query(f"{fmin} <= x <= {fmax}")
Expand Down Expand Up @@ -836,7 +836,7 @@ def __init__(
self.score = self.model.score(X=self.X, y=self.y)

# get the model predictions of the observed data
if self.bandwidth is not None:
if self.bandwidth is not np.inf:
xi = np.linspace(fmin, fmax, 200)
else:
xi = np.linspace(
Expand Down Expand Up @@ -903,7 +903,7 @@ def plot(self):
self.data,
x=self.running_variable_name,
y=self.outcome_variable_name,
c="k", # hue="treated",
c="k",
ax=ax,
)

Expand Down Expand Up @@ -939,7 +939,7 @@ def plot(self):
labels=labels,
fontsize=LEGEND_FONT_SIZE,
)
return (fig, ax)
return fig, ax

def summary(self) -> None:
"""
Expand All @@ -957,6 +957,220 @@ def summary(self) -> None:
self.print_coefficients()


class RegressionKink(ExperimentalDesign):
"""
A class to analyse sharp regression kink experiments.
:param data:
A pandas dataframe
:param formula:
A statistical model formula
:param kink_point:
A scalar threshold value at which there is a change in the first derivative of
the assignment function
:param model:
A PyMC model
:param running_variable_name:
The name of the predictor variable that the kink_point is based upon
:param epsilon:
A small scalar value which determines how far above and below the kink point to
evaluate the causal impact.
:param bandwidth:
Data outside of the bandwidth (relative to the discontinuity) is not used to fit
the model.
"""

def __init__(
self,
data: pd.DataFrame,
formula: str,
kink_point: float,
model=None,
running_variable_name: str = "x",
epsilon: float = 0.001,
bandwidth: float = np.inf,
**kwargs,
):
super().__init__(model=model, **kwargs)
self.expt_type = "Regression Kink"
self.data = data
self.formula = formula
self.running_variable_name = running_variable_name
self.kink_point = kink_point
self.epsilon = epsilon
self.bandwidth = bandwidth
self._input_validation()

if self.bandwidth is not np.inf:
fmin = self.kink_point - self.bandwidth
fmax = self.kink_point + self.bandwidth
filtered_data = self.data.query(f"{fmin} <= x <= {fmax}")
if len(filtered_data) <= 10:
warnings.warn(
f"Choice of bandwidth parameter has lead to only {len(filtered_data)} remaining datapoints. Consider increasing the bandwidth parameter.", # noqa: E501
UserWarning,
)
y, X = dmatrices(formula, filtered_data)
else:
y, X = dmatrices(formula, self.data)

self._y_design_info = y.design_info
self._x_design_info = X.design_info
self.labels = X.design_info.column_names
self.y, self.X = np.asarray(y), np.asarray(X)
self.outcome_variable_name = y.design_info.column_names[0]

COORDS = {"coeffs": self.labels, "obs_indx": np.arange(self.X.shape[0])}
self.model.fit(X=self.X, y=self.y, coords=COORDS)

# score the goodness of fit to all data
self.score = self.model.score(X=self.X, y=self.y)

# get the model predictions of the observed data
if self.bandwidth is not np.inf:
xi = np.linspace(fmin, fmax, 200)
else:
xi = np.linspace(
np.min(self.data[self.running_variable_name]),
np.max(self.data[self.running_variable_name]),
200,
)
self.x_pred = pd.DataFrame(
{self.running_variable_name: xi, "treated": self._is_treated(xi)}
)
(new_x,) = build_design_matrices([self._x_design_info], self.x_pred)
self.pred = self.model.predict(X=np.asarray(new_x))

# evaluate gradient change around kink point
mu_kink_left, mu_kink, mu_kink_right = self._probe_kink_point()
self.gradient_change = self._eval_gradient_change(
mu_kink_left, mu_kink, mu_kink_right, epsilon
)

@staticmethod
def _eval_gradient_change(mu_kink_left, mu_kink, mu_kink_right, epsilon):
"""Evaluate the gradient change at the kink point.
It works by evaluating the model below the kink point, at the kink point,
and above the kink point.
This is a static method for ease of testing.
"""
gradient_left = (mu_kink - mu_kink_left) / epsilon
gradient_right = (mu_kink_right - mu_kink) / epsilon
gradient_change = gradient_right - gradient_left
return gradient_change

def _probe_kink_point(self):
# Create a dataframe to evaluate predicted outcome at the kink point and either
# side
x_predict = pd.DataFrame(
{
self.running_variable_name: np.array(
[
self.kink_point - self.epsilon,
self.kink_point,
self.kink_point + self.epsilon,
]
),
"treated": np.array([0, 1, 1]),
}
)
(new_x,) = build_design_matrices([self._x_design_info], x_predict)
predicted = self.model.predict(X=np.asarray(new_x))
# extract predicted mu values
mu_kink_left = predicted["posterior_predictive"].sel(obs_ind=0)["mu"]
mu_kink = predicted["posterior_predictive"].sel(obs_ind=1)["mu"]
mu_kink_right = predicted["posterior_predictive"].sel(obs_ind=2)["mu"]
return mu_kink_left, mu_kink, mu_kink_right

def _input_validation(self):
"""Validate the input data and model formula for correctness"""
if "treated" not in self.formula:
raise FormulaException(
"A predictor called `treated` should be in the formula"
)

if _is_variable_dummy_coded(self.data["treated"]) is False:
raise DataException(
"""The treated variable should be dummy coded. Consisting of 0's and 1's only.""" # noqa: E501
)

if self.bandwidth <= 0:
raise ValueError("The bandwidth must be greater than zero.")

if self.epsilon <= 0:
raise ValueError("Epsilon must be greater than zero.")

def _is_treated(self, x):
"""Returns ``True`` if `x` is greater than or equal to the treatment threshold.""" # noqa: E501
return np.greater_equal(x, self.kink_point)

def plot(self):
"""
Plot the results
"""
fig, ax = plt.subplots()
# Plot raw data
sns.scatterplot(
self.data,
x=self.running_variable_name,
y=self.outcome_variable_name,
c="k", # hue="treated",
ax=ax,
)

# Plot model fit to data
h_line, h_patch = plot_xY(
self.x_pred[self.running_variable_name],
self.pred["posterior_predictive"].mu,
ax=ax,
plot_hdi_kwargs={"color": "C1"},
)
handles = [(h_line, h_patch)]
labels = ["Posterior mean"]

# create strings to compose title
title_info = f"{self.score.r2:.3f} (std = {self.score.r2_std:.3f})"
r2 = f"Bayesian $R^2$ on all data = {title_info}"
percentiles = self.gradient_change.quantile([0.03, 1 - 0.03]).values
ci = r"$CI_{94\%}$" + f"[{percentiles[0]:.2f}, {percentiles[1]:.2f}]"
grad_change = f"""
Change in gradient = {self.gradient_change.mean():.2f},
"""
ax.set(title=r2 + "\n" + grad_change + ci)
# Intervention line
ax.axvline(
x=self.kink_point,
ls="-",
lw=3,
color="r",
label="treatment threshold",
)
ax.legend(
handles=(h_tuple for h_tuple in handles),
labels=labels,
fontsize=LEGEND_FONT_SIZE,
)
return fig, ax

def summary(self) -> None:
"""
Print text output summarising the results
"""

print(
f"""
{self.expt_type:=^80}
Formula: {self.formula}
Running variable: {self.running_variable_name}
Kink point on running variable: {self.kink_point}
Results:
Change in slope at kink point = {self.gradient_change.mean():.2f}
"""
)
self.print_coefficients()


class PrePostNEGD(ExperimentalDesign):
"""
A class to analyse data from pretest/posttest designs
Expand Down
Loading

0 comments on commit 24ad46e

Please sign in to comment.