-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathestimators.py
691 lines (587 loc) · 30.7 KB
/
estimators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
"""This module contains the Estimator abstract class, as well as its concrete extensions: LogisticRegressionEstimator,
LinearRegressionEstimator and CausalForestEstimator"""
import logging
from abc import ABC, abstractmethod
from typing import Any
from math import ceil
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
from econml.dml import CausalForestDML
from patsy import dmatrix # pylint: disable = no-name-in-module
from patsy import ModelDesc
from sklearn.ensemble import GradientBoostingRegressor
from statsmodels.regression.linear_model import RegressionResultsWrapper
from statsmodels.tools.sm_exceptions import PerfectSeparationError
from causal_testing.specification.variable import Variable
logger = logging.getLogger(__name__)
class Estimator(ABC):
# pylint: disable=too-many-instance-attributes
"""An estimator contains all of the information necessary to compute a causal estimate for the effect of changing
a set of treatment variables to a set of values.
All estimators must implement the following two methods:
1) add_modelling_assumptions: The validity of a model-assisted causal inference result depends on whether
the modelling assumptions imposed by a model actually hold. Therefore, for each model, is important to state
the modelling assumption upon which the validity of the results depend. To achieve this, the estimator object
maintains a list of modelling assumptions (as strings). If a user wishes to implement their own estimator, they
must implement this method and add all assumptions to the list of modelling assumptions.
2) estimate_ate: All estimators must be capable of returning the average treatment effect as a minimum. That is, the
average effect of the intervention (changing treatment from control to treated value) on the outcome of interest
adjusted for all confounders.
"""
def __init__(
# pylint: disable=too-many-arguments
self,
treatment: str,
treatment_value: float,
control_value: float,
adjustment_set: set,
outcome: str,
df: pd.DataFrame = None,
effect_modifiers: dict[str:Any] = None,
alpha: float = 0.05,
query: str = "",
):
self.treatment = treatment
self.treatment_value = treatment_value
self.control_value = control_value
self.adjustment_set = adjustment_set
self.outcome = outcome
self.alpha = alpha
self.df = df.query(query) if query else df
if effect_modifiers is None:
self.effect_modifiers = {}
elif isinstance(effect_modifiers, dict):
self.effect_modifiers = effect_modifiers
else:
raise ValueError(f"Unsupported type for effect_modifiers {effect_modifiers}. Expected iterable")
self.modelling_assumptions = []
if query:
self.modelling_assumptions.append(query)
self.add_modelling_assumptions()
logger.debug("Effect Modifiers: %s", self.effect_modifiers)
@abstractmethod
def add_modelling_assumptions(self):
"""
Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that
must hold if the resulting causal inference is to be considered valid.
"""
def compute_confidence_intervals(self) -> list[float, float]:
"""
Estimate the 95% Wald confidence intervals for the effect of changing the treatment from control values to
treatment values on the outcome.
:return: 95% Wald confidence intervals.
"""
class LogisticRegressionEstimator(Estimator):
"""A Logistic Regression Estimator is a parametric estimator which restricts the variables in the data to a linear
combination of parameters and functions of the variables (note these functions need not be linear). It is designed
for estimating categorical outcomes.
"""
def __init__(
# pylint: disable=too-many-arguments
self,
treatment: str,
treatment_value: float,
control_value: float,
adjustment_set: set,
outcome: str,
df: pd.DataFrame = None,
effect_modifiers: dict[str:Any] = None,
formula: str = None,
query: str = "",
):
super().__init__(
treatment=treatment,
treatment_value=treatment_value,
control_value=control_value,
adjustment_set=adjustment_set,
outcome=outcome,
df=df,
effect_modifiers=effect_modifiers,
query=query,
)
self.model = None
if formula is not None:
self.formula = formula
else:
terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(self.effect_modifiers))
self.formula = f"{outcome} ~ {'+'.join(((terms)))}"
def add_modelling_assumptions(self):
"""
Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that
must hold if the resulting causal inference is to be considered valid.
"""
self.modelling_assumptions.append(
"The variables in the data must fit a shape which can be expressed as a linear"
"combination of parameters and functions of variables. Note that these functions"
"do not need to be linear."
)
self.modelling_assumptions.append("The outcome must be binary.")
self.modelling_assumptions.append("Independently and identically distributed errors.")
def _run_logistic_regression(self, data) -> RegressionResultsWrapper:
"""Run logistic regression of the treatment and adjustment set against the outcome and return the model.
:return: The model after fitting to data.
"""
model = smf.logit(formula=self.formula, data=data).fit(disp=0)
self.model = model
return model
def estimate(self, data: pd.DataFrame, adjustment_config: dict = None) -> RegressionResultsWrapper:
"""add terms to the dataframe and estimate the outcome from the data
:param data: A pandas dataframe containing execution data from the system-under-test.
:param adjustment_config: Dictionary containing the adjustment configuration of the adjustment set
"""
if adjustment_config is None:
adjustment_config = {}
if set(self.adjustment_set) != set(adjustment_config):
raise ValueError(
f"Invalid adjustment configuration {adjustment_config}. Must specify values for {self.adjustment_set}"
)
model = self._run_logistic_regression(data)
x = pd.DataFrame(columns=self.df.columns)
x["Intercept"] = 1 # self.intercept
x[self.treatment] = [self.treatment_value, self.control_value]
for k, v in adjustment_config.items():
x[k] = v
for k, v in self.effect_modifiers.items():
x[k] = v
x = dmatrix(self.formula.split("~")[1], x, return_type="dataframe")
for col in x:
if str(x.dtypes[col]) == "object":
x = pd.get_dummies(x, columns=[col], drop_first=True)
# x = x[model.params.index]
return model.predict(x)
def estimate_control_treatment(
self, adjustment_config: dict = None, bootstrap_size: int = 100
) -> tuple[pd.Series, pd.Series]:
"""Estimate the outcomes under control and treatment.
:return: The estimated control and treatment values and their confidence
intervals in the form ((ci_low, control, ci_high), (ci_low, treatment, ci_high)).
"""
if adjustment_config is None:
adjustment_config = {}
y = self.estimate(self.df, adjustment_config=adjustment_config)
try:
bootstrap_samples = [
self.estimate(self.df.sample(len(self.df), replace=True), adjustment_config=adjustment_config)
for _ in range(bootstrap_size)
]
control, treatment = zip(*[(x.iloc[1], x.iloc[0]) for x in bootstrap_samples])
except PerfectSeparationError:
logger.warning(
"Perfect separation detected, results not available. Cannot calculate confidence intervals for such "
"a small dataset."
)
return (y.iloc[1], None), (y.iloc[0], None)
except np.linalg.LinAlgError:
logger.warning("Singular matrix detected. Confidence intervals not available. Try with a larger data set")
return (y.iloc[1], None), (y.iloc[0], None)
# Delta method confidence intervals from
# https://stackoverflow.com/questions/47414842/confidence-interval-of-probability-prediction-from-logistic-regression-statsmode
# cov = model.cov_params()
# gradient = (y * (1 - y) * x.T).T # matrix of gradients for each observation
# std_errors = np.array([np.sqrt(np.dot(np.dot(g, cov), g)) for g in gradient.to_numpy()])
# c = 1.96 # multiplier for confidence interval
# upper = np.maximum(0, np.minimum(1, y + std_errors * c))
# lower = np.maximum(0, np.minimum(1, y - std_errors * c))
return (y.iloc[1], np.array(control)), (y.iloc[0], np.array(treatment))
def estimate_ate(self, adjustment_config: dict = None, bootstrap_size: int = 100) -> float:
"""Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value. Here, we actually
calculate the expected outcomes under control and treatment and take one away from the other. This
allows for custom terms to be put in such as squares, inverses, products, etc.
:return: The estimated average treatment effect and 95% confidence intervals
"""
if adjustment_config is None:
adjustment_config = {}
(control_outcome, control_bootstraps), (
treatment_outcome,
treatment_bootstraps,
) = self.estimate_control_treatment(bootstrap_size=bootstrap_size, adjustment_config=adjustment_config)
estimate = treatment_outcome - control_outcome
if control_bootstraps is None or treatment_bootstraps is None:
return estimate, (None, None)
bootstraps = sorted(list(treatment_bootstraps - control_bootstraps))
bound = int((bootstrap_size * self.alpha) / 2)
ci_low = bootstraps[bound]
ci_high = bootstraps[bootstrap_size - bound]
logger.info(
f"Changing {self.treatment} from {self.control_value} to {self.treatment_value} gives an estimated "
f"ATE of {ci_low} < {estimate} < {ci_high}"
)
assert ci_low < estimate < ci_high, f"Expecting {ci_low} < {estimate} < {ci_high}"
return estimate, (ci_low, ci_high)
def estimate_risk_ratio(self, adjustment_config: dict = None, bootstrap_size: int = 100) -> float:
"""Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value. Here, we actually
calculate the expected outcomes under control and treatment and divide one by the other. This
allows for custom terms to be put in such as squares, inverses, products, etc.
:return: The estimated risk ratio and 95% confidence intervals.
"""
if adjustment_config is None:
adjustment_config = {}
(control_outcome, control_bootstraps), (
treatment_outcome,
treatment_bootstraps,
) = self.estimate_control_treatment(bootstrap_size=bootstrap_size, adjustment_config=adjustment_config)
estimate = treatment_outcome / control_outcome
if control_bootstraps is None or treatment_bootstraps is None:
return estimate, (None, None)
bootstraps = sorted(list(treatment_bootstraps / control_bootstraps))
bound = ceil((bootstrap_size * self.alpha) / 2)
ci_low = bootstraps[bound]
ci_high = bootstraps[bootstrap_size - bound]
logger.info(
f"Changing {self.treatment} from {self.control_value} to {self.treatment_value} gives an estimated "
f"risk ratio of {ci_low} < {estimate} < {ci_high}"
)
assert ci_low < estimate < ci_high, f"Expecting {ci_low} < {estimate} < {ci_high}"
return estimate, (ci_low, ci_high)
def estimate_unit_odds_ratio(self) -> float:
"""Estimate the odds ratio of increasing the treatment by one. In logistic regression, this corresponds to the
coefficient of the treatment of interest.
:return: The odds ratio. Confidence intervals are not yet supported.
"""
model = self._run_logistic_regression(self.df)
return np.exp(model.params[self.treatment])
class LinearRegressionEstimator(Estimator):
"""A Linear Regression Estimator is a parametric estimator which restricts the variables in the data to a linear
combination of parameters and functions of the variables (note these functions need not be linear).
"""
def __init__(
# pylint: disable=too-many-arguments
self,
treatment: str,
treatment_value: float,
control_value: float,
adjustment_set: set,
outcome: str,
df: pd.DataFrame = None,
effect_modifiers: dict[Variable:Any] = None,
formula: str = None,
alpha: float = 0.05,
query: str = "",
):
super().__init__(
treatment,
treatment_value,
control_value,
adjustment_set,
outcome,
df,
effect_modifiers,
alpha=alpha,
query=query,
)
self.model = None
if effect_modifiers is None:
effect_modifiers = []
if formula is not None:
self.formula = formula
else:
terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(effect_modifiers))
self.formula = f"{outcome} ~ {'+'.join(terms)}"
for term in self.effect_modifiers:
self.adjustment_set.add(term)
def add_modelling_assumptions(self):
"""
Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that
must hold if the resulting causal inference is to be considered valid.
"""
self.modelling_assumptions.append(
"The variables in the data must fit a shape which can be expressed as a linear"
"combination of parameters and functions of variables. Note that these functions"
"do not need to be linear."
)
def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the unit average treatment effect of the treatment on the outcome. That is, the change in outcome
caused by a unit change in treatment.
:return: The unit average treatment effect and the 95% Wald confidence intervals.
"""
model = self._run_linear_regression()
newline = "\n"
patsy_md = ModelDesc.from_formula(self.treatment)
if any((self.df.dtypes[factor.name()] == 'object' for factor in patsy_md.rhs_termlist[1].factors)):
design_info = dmatrix(self.formula.split("~")[1], self.df).design_info
treatment = design_info.column_names[design_info.term_name_slices[self.treatment]]
else:
treatment = [self.treatment]
assert set(treatment).issubset(
model.params.index.tolist()
), f"{treatment} not in\n{' ' + str(model.params.index).replace(newline, newline + ' ')}"
unit_effect = model.params[treatment] # Unit effect is the coefficient of the treatment
[ci_low, ci_high] = self._get_confidence_intervals(model, treatment)
return unit_effect, [ci_low, ci_high]
def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the average treatment effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value.
:return: The average treatment effect and the 95% Wald confidence intervals.
"""
model = self._run_linear_regression()
# Create an empty individual for the control and treated
individuals = pd.DataFrame(1, index=["control", "treated"], columns=model.params.index)
# It is ABSOLUTELY CRITICAL that these go last, otherwise we can't index
# the effect with "ate = t_test_results.effect[0]"
individuals.loc["control", [self.treatment]] = self.control_value
individuals.loc["treated", [self.treatment]] = self.treatment_value
# Perform a t-test to compare the predicted outcome of the control and treated individual (ATE)
t_test_results = model.t_test(individuals.loc["treated"] - individuals.loc["control"])
ate = pd.Series(t_test_results.effect[0])
confidence_intervals = list(t_test_results.conf_int(alpha=self.alpha).flatten())
confidence_intervals = [pd.Series(interval) for interval in confidence_intervals]
return ate, confidence_intervals
def estimate_control_treatment(self, adjustment_config: dict = None) -> tuple[pd.Series, pd.Series]:
"""Estimate the outcomes under control and treatment.
:return: The estimated outcome under control and treatment in the form
(control_outcome, treatment_outcome).
"""
if adjustment_config is None:
adjustment_config = {}
model = self._run_linear_regression()
x = pd.DataFrame(columns=self.df.columns)
x[self.treatment] = [self.treatment_value, self.control_value]
x["Intercept"] = 1 # self.intercept
for k, v in adjustment_config.items():
x[k] = v
for k, v in self.effect_modifiers.items():
x[k] = v
x = dmatrix(self.formula.split("~")[1], x, return_type="dataframe")
for col in x:
if str(x.dtypes[col]) == "object":
x = pd.get_dummies(x, columns=[col], drop_first=True)
x = x[model.params.index]
y = model.get_prediction(x).summary_frame()
return y.iloc[1], y.iloc[0]
def estimate_risk_ratio(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the risk_ratio effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value.
:return: The average treatment effect and the 95% Wald confidence intervals.
"""
if adjustment_config is None:
adjustment_config = {}
control_outcome, treatment_outcome = self.estimate_control_treatment(adjustment_config=adjustment_config)
ci_low = pd.Series(treatment_outcome["mean_ci_lower"] / control_outcome["mean_ci_upper"])
ci_high = pd.Series(treatment_outcome["mean_ci_upper"] / control_outcome["mean_ci_lower"])
return pd.Series(treatment_outcome["mean"] / control_outcome["mean"]), [ci_low, ci_high]
def estimate_ate_calculated(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value. Here, we actually
calculate the expected outcomes under control and treatment and divide one by the other. This
allows for custom terms to be put in such as squares, inverses, products, etc.
:return: The average treatment effect and the 95% Wald confidence intervals.
"""
if adjustment_config is None:
adjustment_config = {}
control_outcome, treatment_outcome = self.estimate_control_treatment(adjustment_config=adjustment_config)
ci_low = pd.Series(treatment_outcome["mean_ci_lower"] - control_outcome["mean_ci_upper"])
ci_high = pd.Series(treatment_outcome["mean_ci_upper"] - control_outcome["mean_ci_lower"])
return pd.Series(treatment_outcome["mean"] - control_outcome["mean"]), [ci_low, ci_high]
def _run_linear_regression(self) -> RegressionResultsWrapper:
"""Run linear regression of the treatment and adjustment set against the outcome and return the model.
:return: The model after fitting to data.
"""
model = smf.ols(formula=self.formula, data=self.df).fit()
self.model = model
return model
def _get_confidence_intervals(self, model, treatment):
confidence_intervals = model.conf_int(alpha=self.alpha, cols=None)
ci_low, ci_high = (
pd.Series(confidence_intervals[0].loc[treatment]),
pd.Series(confidence_intervals[1].loc[treatment]),
)
return [ci_low, ci_high]
class CubicSplineRegressionEstimator(LinearRegressionEstimator):
"""A Cubic Spline Regression Estimator is a parametric estimator which restricts the variables in the data to a
combination of parameters and basis functions of the variables.
"""
def __init__(
# pylint: disable=too-many-arguments
self,
treatment: str,
treatment_value: float,
control_value: float,
adjustment_set: set,
outcome: str,
basis: int,
df: pd.DataFrame = None,
effect_modifiers: dict[Variable:Any] = None,
formula: str = None,
alpha: float = 0.05,
expected_relationship=None,
):
super().__init__(
treatment, treatment_value, control_value, adjustment_set, outcome, df, effect_modifiers, formula, alpha
)
self.expected_relationship = expected_relationship
if effect_modifiers is None:
effect_modifiers = []
if formula is None:
terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(effect_modifiers))
self.formula = f"{outcome} ~ cr({'+'.join(terms)}, df={basis})"
def estimate_ate_calculated(self, adjustment_config: dict = None) -> pd.Series:
model = self._run_linear_regression()
x = {"Intercept": 1, self.treatment: self.treatment_value}
if adjustment_config is not None:
for k, v in adjustment_config.items():
x[k] = v
if self.effect_modifiers is not None:
for k, v in self.effect_modifiers.items():
x[k] = v
treatment = model.predict(x).iloc[0]
x[self.treatment] = self.control_value
control = model.predict(x).iloc[0]
return pd.Series(treatment - control)
class InstrumentalVariableEstimator(Estimator):
"""
Carry out estimation using instrumental variable adjustment rather than conventional adjustment. This means we do
not need to observe all confounders in order to adjust for them. A key assumption here is linearity.
"""
def __init__(
# pylint: disable=too-many-arguments
self,
treatment: str,
treatment_value: float,
control_value: float,
adjustment_set: set,
outcome: str,
instrument: str,
df: pd.DataFrame = None,
intercept: int = 1,
effect_modifiers: dict = None, # Not used (yet?). Needed for compatibility
alpha: float = 0.05,
query: str = "",
):
super().__init__(
treatment=treatment,
treatment_value=treatment_value,
control_value=control_value,
adjustment_set=adjustment_set,
outcome=outcome,
df=df,
effect_modifiers=None,
alpha=alpha,
query=query,
)
self.intercept = intercept
self.model = None
self.instrument = instrument
def add_modelling_assumptions(self):
"""
Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that
must hold if the resulting causal inference is to be considered valid.
"""
self.modelling_assumptions.append(
"""The instrument and the treatment, and the treatment and the outcome must be
related linearly in the form Y = aX + b."""
)
self.modelling_assumptions.append(
"""The three IV conditions must hold
(i) Instrument is associated with treatment
(ii) Instrument does not affect outcome except through its potential effect on treatment
(iii) Instrument and outcome do not share causes
"""
)
def estimate_iv_coefficient(self, df) -> float:
"""
Estimate the linear regression coefficient of the treatment on the
outcome.
"""
# Estimate the total effect of instrument I on outcome Y = abI + c1
ab = sm.OLS(df[self.outcome], df[[self.instrument]]).fit().params[self.instrument]
# Estimate the direct effect of instrument I on treatment X = aI + c1
a = sm.OLS(df[self.treatment], df[[self.instrument]]).fit().params[self.instrument]
# Estimate the coefficient of I on X by cancelling
return ab / a
def estimate_coefficient(self, bootstrap_size=100) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""
Estimate the unit ate (i.e. coefficient) of the treatment on the
outcome.
"""
bootstraps = sorted(
[self.estimate_iv_coefficient(self.df.sample(len(self.df), replace=True)) for _ in range(bootstrap_size)]
)
bound = ceil((bootstrap_size * self.alpha) / 2)
ci_low = pd.Series(bootstraps[bound])
ci_high = pd.Series(bootstraps[bootstrap_size - bound])
return pd.Series(self.estimate_iv_coefficient(self.df)), [ci_low, ci_high]
class CausalForestEstimator(Estimator):
"""A causal random forest estimator is a non-parametric estimator which recursively partitions the covariate space
to learn a low-dimensional representation of treatment effect heterogeneity. This form of estimator is best suited
to the estimation of heterogeneous treatment effects i.e. the estimated effect for every sample rather than the
population average.
"""
def add_modelling_assumptions(self):
"""Add any modelling assumptions to the estimator.
:return self: Update self.modelling_assumptions
"""
self.modelling_assumptions.append("Non-parametric estimator: no restrictions imposed on the data.")
def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the average treatment effect.
:return ate, confidence_intervals: The average treatment effect and 95% confidence intervals.
"""
# Remove any NA containing rows
reduced_df = self.df.copy()
necessary_cols = [self.treatment] + list(self.adjustment_set) + [self.outcome]
missing_rows = reduced_df[necessary_cols].isnull().any(axis=1)
reduced_df = reduced_df[~missing_rows]
# Split data into effect modifiers (X), confounders (W), treatments (T), and outcome (Y)
if self.effect_modifiers:
effect_modifier_df = reduced_df[list(self.effect_modifiers)]
else:
effect_modifier_df = reduced_df[list(self.adjustment_set)]
confounders_df = reduced_df[list(self.adjustment_set)]
treatment_df = np.ravel(reduced_df[[self.treatment]])
outcome_df = np.ravel(reduced_df[[self.outcome]])
# Fit the model to the data using a gradient boosting regressor for both the treatment and outcome model
model = CausalForestDML(
model_y=GradientBoostingRegressor(),
model_t=GradientBoostingRegressor(),
)
model.fit(outcome_df, treatment_df, X=effect_modifier_df, W=confounders_df)
# Obtain the ATE and 95% confidence intervals
ate = pd.Series(model.ate(effect_modifier_df, T0=self.control_value, T1=self.treatment_value))
ate_interval = model.ate_interval(effect_modifier_df, T0=self.control_value, T1=self.treatment_value)
ci_low, ci_high = pd.Series(ate_interval[0]), pd.Series(ate_interval[1])
return ate, [ci_low, ci_high]
def estimate_cates(self) -> pd.DataFrame:
"""Estimate the conditional average treatment effect for each sample in the data as a function of a set of
covariates (X) i.e. effect modifiers. That is, the predicted change in outcome caused by the intervention
(change in treatment from control to treatment value) for every execution of the system-under-test, taking into
account the value of each effect modifier X. As a result, for every unique setting of the set of covariates X,
we expect a different CATE.
:return results_df: A dataframe containing a conditional average treatment effect, 95% confidence intervals, and
the covariate (effect modifier) values for each sample.
"""
# Remove any NA containing rows
reduced_df = self.df.copy()
necessary_cols = [self.treatment] + list(self.adjustment_set) + [self.outcome]
missing_rows = reduced_df[necessary_cols].isnull().any(axis=1)
reduced_df = reduced_df[~missing_rows]
# Split data into effect modifiers (X), confounders (W), treatments (T), and outcome (Y)
if self.effect_modifiers:
effect_modifier_df = reduced_df[list(self.effect_modifiers)]
else:
raise ValueError("CATE requires the user to define a set of effect modifiers.")
if self.adjustment_set:
confounders_df = reduced_df[list(self.adjustment_set)]
else:
confounders_df = None
treatment_df = reduced_df[[self.treatment]]
outcome_df = reduced_df[[self.outcome]]
# Fit a model to the data
model = CausalForestDML(model_y=GradientBoostingRegressor(), model_t=GradientBoostingRegressor())
model.fit(outcome_df, treatment_df, X=effect_modifier_df, W=confounders_df)
# Obtain CATES and confidence intervals
conditional_ates = model.effect(effect_modifier_df, T0=self.control_value, T1=self.treatment_value).flatten()
[ci_low, ci_high] = model.effect_interval(
effect_modifier_df, T0=self.control_value, T1=self.treatment_value, alpha=self.alpha
)
# Merge results into a dataframe (CATE, confidence intervals, and effect modifier values)
results_df = pd.DataFrame(columns=["cate", "ci_low", "ci_high"])
results_df["cate"] = list(conditional_ates)
results_df["ci_low"] = list(ci_low.flatten())
results_df["ci_high"] = list(ci_high.flatten())
effect_modifier_df.reset_index(drop=True, inplace=True)
results_df[list(self.effect_modifiers)] = effect_modifier_df
results_df.sort_values(by=list(self.effect_modifiers), inplace=True)
return results_df, None