Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imputation manual construction #186

Merged
merged 19 commits into from
May 23, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
RoM Manual constrction update
vidhyamanisankar committed May 8, 2024
commit 9982a3f9ac0968f175e1323b807bbf4488f16baf
91 changes: 81 additions & 10 deletions statistical_methods_library/imputation/engine.py
Original file line number Diff line number Diff line change
@@ -42,6 +42,12 @@ class Marker(Enum):

FORWARD_IMPUTE_FROM_CONSTRUCTION = "FIC"
"""The value has been forward imputed from a constructed value."""

MANUAL_CONSTRUCTION = "MC"
"""The value is manual construction."""

FORWARD_IMPUTE_FROM_MANUAL_CONSTRUCTION = "FIMC"
"""The value has been forward imputed from a manual construction."""


def impute(
@@ -78,6 +84,7 @@ def impute(
unweighted_forward_link_col: Optional[str] = "forward_unweighted",
unweighted_backward_link_col: Optional[str] = "backward_unweighted",
unweighted_construction_link_col: Optional[str] = "construction_unweighted",
manual_construction_col: Optional[str] = "manual_construction",
**ratio_calculator_params,
) -> DataFrame:
"""
@@ -188,7 +195,7 @@ def impute(
link_cols = [forward_link_col, backward_link_col]
if any(link_cols) and not all(link_cols):
raise TypeError("Either all or no link columns must be specified")

# input_df.show(100)
input_params = {
"ref": reference_col,
"period": period_col,
@@ -225,6 +232,10 @@ def impute(

if construction_link_col in input_df.columns:
input_params["construction"] = construction_link_col





back_input_params = {
"ref": reference_col,
@@ -233,7 +244,14 @@ def impute(
"output": output_col,
"marker": marker_col,
}

# input_df.show(100)
# TODO CHECK in nimpuation wrapper the same column name is avaliable in back data df as well ?!
if manual_construction_col in input_df.columns:
input_params["manual_const"] = manual_construction_col

if back_data_df and manual_construction_col in back_data_df.columns:
back_input_params["manual_const"] = manual_construction_col

if weight is not None:
if not isinstance(weight, Decimal):
raise TypeError("weight must be of type Decimal")
@@ -266,6 +284,7 @@ def impute(
"forward_unweighted": DecimalType,
"backward_unweighted": DecimalType,
"construction_unweighted": DecimalType,
"manual_const": DecimalType,
}

if link_filter:
@@ -289,7 +308,7 @@ def impute(
input_params,
type_mapping,
["ref", "period", "grouping"],
["target"],
["target","manual_const"],
)
.withColumnRenamed("target", "output")
.withColumn("marker", when(~col("output").isNull(), Marker.RESPONSE.value))
@@ -301,19 +320,43 @@ def impute(
prior_period_df = prepared_df.selectExpr(
"min(previous_period) AS prior_period"
).localCheckpoint(eager=False)

# TODO Remove manual construction record from data before the rotio calculation.
print("test1111 input_df.columns")
print(str(input_df.columns))
prepared_df.show(100)
if manual_construction_col in input_df.columns: #TODO check is it needed ?!
df_with_mc_data = prepared_df.withColumn(
"marker",when((col("manual_const").isNotNull()) &
(col("output").isNull()),lit(Marker.MANUAL_CONSTRUCTION.value)).otherwise(col("marker"))).withColumn("output",when((col("manual_const").isNotNull()) &
(col("output").isNull()),col("manual_const")).otherwise(col("output")))
only_mc_data = df_with_mc_data.filter((col("marker") == Marker.MANUAL_CONSTRUCTION.value) | (col("marker") == Marker.FORWARD_IMPUTE_FROM_MANUAL_CONSTRUCTION.value))
print("only_mc_data")
only_mc_data.show(100)
df_with_mc_data.show(100)
# TODO Additionally, after MC data is entered, filter out the immediate missing responses.
prepared_df = prepared_df.filter(~(col("marker") == Marker.MANUAL_CONSTRUCTION.value) | ~(col("marker") == Marker.FORWARD_IMPUTE_FROM_MANUAL_CONSTRUCTION.value))
print("prepared_df_without_mc_fimc :: ")
prepared_df.show()
# ["ref", "period", "grouping"]
# mc_join_cont = [mc_df1.marker_mc != Marker.MANUAL_CONSTRUCTION.value , mc_df1.ref == prepared_df.ref, mc_df1.period == prepared_df.period,mc_df1.grouping == prepared_df.grouping]
# mc_df2 = prepared_df.join(mc_df1,mc_join_cont, "left")
# mc_df2.show(100)


if back_data_df:

validated_back_data_df = validate_dataframe(
back_data_df,
back_input_params,
type_mapping,
["ref", "period", "grouping"],
["manual_const"]
).localCheckpoint(eager=False)
back_data_period_df = (
validated_back_data_df.select(
"ref", "period", "grouping", "output", "marker"
"ref", "period", "grouping", "output", "marker","manual_const"
)
.join(prior_period_df, [col("period") == col("prior_period")])
.join(prior_period_df, [col("period") == col("prior_period")]) # TODO check what is the impact to miss the mc column in prior_period_df
.drop("prior_period")
.filter(((col(marker_col) != lit(Marker.BACKWARD_IMPUTE.value))))
.withColumn(
@@ -325,11 +368,23 @@ def impute(
)
.localCheckpoint(eager=False)
)
# Remove manual construction record from back data before the rotio calculation.
if manual_construction_col in back_data_period_df.columns: #TODO check is it needed ?!
back_data_mc = back_data_period_df.filter((col("marker") == Marker.MANUAL_CONSTRUCTION.value) | (col("marker") == Marker.FORWARD_IMPUTE_FROM_MANUAL_CONSTRUCTION.value))
print("back_data_mc")
back_data_mc.show(100)
# TODO Additionally, after MC data is entered, filter out the immediate missing responses.
back_data_period_df = back_data_period_df.filter(~(col("marker") == Marker.MANUAL_CONSTRUCTION.value) | ~(col("marker") == Marker.FORWARD_IMPUTE_FROM_MANUAL_CONSTRUCTION.value))
print("back_data_period_df_without_mc_fimc :: ")
back_data_period_df.show()

prepared_df = prepared_df.unionByName(
back_data_period_df.filter(col("marker") == lit(Marker.RESPONSE.value)),
back_data_period_df.filter(col("marker") == lit(Marker.RESPONSE.value)),
allowMissingColumns=True,
)




def calculate_ratios():
# This allows us to return early if we have nothing to do
nonlocal prepared_df
@@ -354,6 +409,7 @@ def calculate_ratios():
ratio_calculators.append(construction_ratio_calculator)

if not ratio_calculators:
print("empty return......")
return

# Since we're going to join on to the main df filtering here
@@ -546,6 +602,8 @@ def impute_helper(
) -> DataFrame:
nonlocal imputed_df
nonlocal null_response_df
print("inside impute_helper")
print(f"inside impute_helper :: {marker.value}")
if direction:
# Forward imputation
other_period_col = "previous_period"
@@ -722,16 +780,29 @@ def forward_impute_from_construction(df: DataFrame) -> DataFrame:
)

df = prepared_df
i=0
print(" imputation call ...")
prepared_df.show(100)
for stage in (
forward_impute_from_response,
backward_impute,
construct_values,
forward_impute_from_construction,
):
df = stage(df).localCheckpoint(eager=False)
if df.filter(col("output").isNull()).count() == 0:
print(f"stage:: {i}")
df.show(100)
if manual_construction_col in input_df.columns and stage == backward_impute:
print("after backward_impute add the mc only data")
only_mc_data.show(100)
df = df.unionByName(only_mc_data, allowMissingColumns=True)
print("Merged mc only data")
df.show(100)

if df.filter(col("output").isNull()).count() == 0 and i==2:
break


i=i+1
return df.join(prior_period_df, [col("prior_period") < col("period")]).select(
[
col(k).alias(output_col_mapping[k])
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
identifier,date,group,question,other,manual_construction
30001,202001,100,8444,51,
30001,202002,100,7476,51,
30001,202003,100,2003,51,
30002,202001,100,9343,72,
30002,202002,100,7818,72,
30002,202003,100,4897,72,
30003,202001,100,7511,7,
30003,202002,100,1761,7,
30003,202003,100,6492,7,
30004,202001,100,,81,4321
30004,202002,100,2113,81,
30004,202003,100,,81,3189
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
identifier,date,group,growth_forward,growth_backward,forward,backward,construction,output,marker,count_forward,count_backward,count_construction
30001,202001,100,,1.129481006,,2.196577972,194.6,8444,R,,3,3
30001,202002,100,0.885362387,3.732401398,0.652198238,1.866715325,90.8436019,7476,R,3,3,4
30001,202003,100,0.267924024,,1.526946931,,103.0153846,2003,R,3,,3
30002,202001,100,,1.195062676,,2.196577972,194.6,9343,R,,3,3
30002,202002,100,0.836776196,1.596487645,0.652198238,1.866715325,90.8436,7818,R,3,3,4
30002,202003,100,0.626375032,,1.526947,,103.0153846,4897,R,3,,3
30003,202001,100,,4.265190233,,2.196577972,194.6,7511,R,,3,3
30003,202002,100,0.234456131,0.271256932,0.652198238,1.866715325,90.8436,1761,R,3,3,4
30003,202003,100,3.686541738,,1.526947,,103.0153846,6492,R,3,,3
30004,202001,100,,,,2.196577972,194.6,4321,MC,,3,3
30004,202002,100,,,0.652198238,1.866715325,90.8436,2113,R,3,3,4
30004,202003,100,,,1.526947,,103.0153846,3189,MC,3,,3
7 changes: 7 additions & 0 deletions tests/imputation/mean_of_ratios.toml
Original file line number Diff line number Diff line change
@@ -4,11 +4,15 @@ backward_growth_col = "growth_backward"
trimmed_forward_col = "trim_inclusion_forward"
trimmed_backward_col = "trim_inclusion_backward"

manual_construction_col = "manual_construction"

[field_types]
forward_growth_col = "decimal(15,6)"
backward_growth_col = "decimal(15,6)"
trimmed_forward_col = "boolean"
trimmed_backward_col = "boolean"
#TODO check
manual_construction_col = "decimal(15,6)"

[scenarios.05_R_R_FI_FI_FI_year_span]
starting_period = "202010"
@@ -195,3 +199,6 @@ weight_periodicity_multiplier = 12
[scenarios.70_C_FI_FI_65_weight]
weight = "0.65"
weight_periodicity_multiplier = 12

[scenarios.001_71_MC_R_MC_1]
manual_construction_col = "manual_construction"
15 changes: 13 additions & 2 deletions tests/imputation/test_scenarios.py
Original file line number Diff line number Diff line change
@@ -57,13 +57,17 @@ def test_calculations(fxt_load_test_csv, ratio_calculator, scenario_type, scenar

scenarios = test_config.pop("scenarios", {})
scenario_config = scenarios.get(scenario, {})
print("test1111 scenario_config")
print(scenario_config)
fields = default_config["field_names"]
fields.update(test_config.get("field_names", {}))
fields.update(scenario_config.pop("field_names", {}))
excluded_fields = default_config.get("excluded_fields", [])
excluded_fields.extend(test_config.get("excluded_fields", []))
excluded_fields.extend(scenario_config.pop("excluded_fields", []))
imputation_kwargs = fields.copy()
print("test1111 fields")
print(fields)
imputation_kwargs["forward_backward_ratio_calculator"] = getattr(
imputation, ratio_calculator
)
@@ -78,7 +82,11 @@ def test_calculations(fxt_load_test_csv, ratio_calculator, scenario_type, scenar
field_types = default_config["field_types"]
field_types.update(test_config.get("field_types", {}))
field_types.update(scenario_config.get("field_types", {}))
print("test1111 field_types")
print(field_types)
imputation_kwargs.update(scenario_config)
print("test1111 imputation_kwargs")
print(imputation_kwargs)
types = {fields[k]: v for k, v in field_types.items()}
scenario_file_type = scenario_type.replace("back_data_", "")
scenario_input = fxt_load_test_csv(
@@ -112,7 +120,9 @@ def test_calculations(fxt_load_test_csv, ratio_calculator, scenario_type, scenar
scenario_expected_output = scenario_expected_output.filter(
col(fields["period_col"]) >= starting_period
)

scenario_input.show(100)
print("test2222 imputation_kwargs")
print(imputation_kwargs)
scenario_actual_output = imputation.impute(
input_df=scenario_input, **imputation_kwargs
)
@@ -121,7 +131,8 @@ def test_calculations(fxt_load_test_csv, ratio_calculator, scenario_type, scenar
scenario_actual_output = scenario_actual_output.withColumn(
field_name, col(field_name).cast("decimal(15, 6)")
)

print("test33333 scenario_actual_output")
scenario_actual_output.show(100)
sort_cols = [
fields["reference_col"],
fields["period_col"],