Skip to content

Commit

Permalink
Add site to powerset during merge (#75)
Browse files Browse the repository at this point in the history
* Add site to powerset during merge

* restoring old sample data

* Less verbose Series construction

* naming things! its a mugs game

* Unit test fixes

* lint cleanup
  • Loading branch information
dogversioning authored Apr 20, 2023
1 parent 014d334 commit 10553c1
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 47 deletions.
6 changes: 3 additions & 3 deletions scripts/cumulus_upload_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ def upload_file(cli_args):
args_dict["user"] = os.environ.get("CUMULUS_TEST_UPLOAD_USER", "general")
args_dict["file"] = (
f"{str(Path(__file__).resolve().parents[1])}"
f"/tests/test_data/cube_simple_example.parquet"
f"/tests/test_data/count_synthea_patient.parquet"
)
args_dict["auth"] = os.environ.get("CUMULUS_TEST_UPLOAD_AUTH", "secretval")
args_dict["study"] = "covid"
args_dict["data_package"] = "encounter"
args_dict["study"] = "core"
args_dict["data_package"] = "patient"

for key in args.keys():
if args[key] is not None:
Expand Down
59 changes: 47 additions & 12 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pandas

from numpy import nan
from pandas.core.indexes.range import RangeIndex

from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath
Expand All @@ -24,22 +25,55 @@
)


def concat_sets(df: pandas.DataFrame, file_path: str) -> pandas.DataFrame:
"""concats a count dataset in a specified S3 location with in memory dataframe"""
def get_static_string_series(static_str: str, index: RangeIndex):
"""Helper for the verbose way of defining a pandas string series"""
return pandas.Series([static_str] * len(index)).astype("string")


def expand_and_concat_sets(
df: pandas.DataFrame, file_path: str, site_name: str
) -> pandas.DataFrame:
"""Processes and joins dataframes containing powersets.
:param df: A dataframe to merge with
:param file_path: An S3 location of an uploaded dataframe
:param site: The site name used by the aggregator, for convenience
:return: expanded and merged dataframe
This function has two steps in terms of business logic:
- For a powerset we load from S3, we need to add a new column for site name,
and then duplicate that data and populate the site into the cloned copy,
so that we will still have a valid powerset
- We need to take that new powerset and merge it via unique hash of non-count
columns with the provided in-memory dataframe. We need to preserve N/A
values since the powerset, by definition, contains lots of them.
"""
#
site_df = awswrangler.s3.read_parquet(file_path)
df_copy = site_df.copy()
site_df["site"] = get_static_string_series(None, site_df.index)
df_copy["site"] = get_static_string_series(site_name, df_copy.index)
# concating in this way adds a new column we want to explictly drop
# from the final set
site_df = pandas.concat([site_df, df_copy]).reset_index().drop("index", axis=1)
data_cols = list(site_df.columns) # type: ignore[union-attr]
# There is a baked in assumption with the following line related to the powerset
# structures, which we will need to handle differently in the future:
# Specifically, we are assuming the bucket sizes are in a column labeled "cnt",
# but at some point, we may have different kinds of counts, like "cnt_encounter".
# We'll need to modify this once we know a bit more about the final design.
data_cols.remove("cnt")
return (
agg_df = (
pandas.concat([df, site_df])
.groupby(data_cols, dropna=False)
.sum()
.sort_values(by=["cnt", "site"], ascending=False, na_position="first")
.reset_index()
# this last line makes "cnt" the first column in the set, matching the
# library style
.filter(["cnt"] + data_cols)
)
return agg_df


def merge_powersets(
Expand All @@ -58,14 +92,14 @@ def merge_powersets(
)
for s3_path in last_valid_file_list:
site_specific_name = get_s3_site_filename_suffix(s3_path)
site = site_specific_name.split("/", maxsplit=1)[0]
last_valid_site = site_specific_name.split("/", maxsplit=1)[0]

# If the latest uploads don't include this site, we'll use the last-valid
# one instead
if not any(x.endswith(site_specific_name) for x in latest_file_list):
df = concat_sets(df, s3_path)
df = expand_and_concat_sets(df, s3_path, last_valid_site)
metadata = update_metadata(
metadata, site, study, data_package, "last_uploaded_date"
metadata, last_valid_site, study, data_package, "last_uploaded_date"
)
for s3_path in latest_file_list:
site_specific_name = get_s3_site_filename_suffix(s3_path)
Expand All @@ -85,19 +119,19 @@ def merge_powersets(
Bucket=s3_bucket_name,
Key=f"{BucketPath.ARCHIVE.value}/{timestamped_path}",
)
df = concat_sets(df, s3_path)
df = expand_and_concat_sets(df, s3_path, site)
move_s3_file(
s3_client,
s3_bucket_name,
f"{BucketPath.LATEST.value}/{subbucket_path}",
f"{BucketPath.LAST_VALID.value}/{subbucket_path}",
)
site = site_specific_name.split("/", maxsplit=1)[0]
latest_site = site_specific_name.split("/", maxsplit=1)[0]
metadata = update_metadata(
metadata, site, study, data_package, "last_data_update"
metadata, latest_site, study, data_package, "last_data_update"
)
metadata = update_metadata(
metadata, site, study, data_package, "last_aggregation"
metadata, latest_site, study, data_package, "last_aggregation"
)
except Exception as e: # pylint: disable=broad-except
logging.error("File %s failed to aggregate: %s", s3_path, str(e))
Expand All @@ -113,10 +147,11 @@ def merge_powersets(
# if a new file fails, we want to replace it with the last valid
# for purposes of aggregation
if any(x.endswith(site_specific_name) for x in last_valid_file_list):
df = concat_sets(
df = expand_and_concat_sets(
df,
f"s3://{s3_bucket_name}/{BucketPath.LAST_VALID.value}"
f"/{subbucket_path}",
site,
)
metadata = update_metadata(
metadata, site, study, data_package, "last_aggregation"
Expand Down Expand Up @@ -146,7 +181,7 @@ def powerset_merge_handler(event, context):
s3_client = boto3.client("s3")
s3_key = event["Records"][0]["Sns"]["Message"]
s3_key_array = s3_key.split("/")
site = s3_key_array[0]
site = s3_key_array[3]
study = s3_key_array[1]
data_package = s3_key_array[2]
merge_powersets(s3_client, s3_bucket, site, study, data_package)
Expand Down
19 changes: 9 additions & 10 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
from tests.utils import get_mock_metadata, get_mock_study_metadata, ITEM_COUNT, MOCK_ENV


def _init_mock_data(s3_client, bucket_name, site, study, data_package):
def _init_mock_data(s3_client, bucket_name, study, data_package):
s3_client.upload_file(
"./tests/test_data/cube_simple_example.parquet",
"./tests/test_data/count_synthea_patient.parquet",
bucket_name,
f"{BucketPath.AGGREGATE.value}/{site}/{study}/"
f"{site}__{data_package}/{site}__{data_package}__aggregate.parquet",
f"{BucketPath.AGGREGATE.value}/{study}/"
f"{study}__{data_package}/{study}__{data_package}__aggregate.parquet",
)
s3_client.upload_file(
"./tests/test_data/cube_simple_example.csv",
"./tests/test_data/count_synthea_patient.csv",
bucket_name,
f"{BucketPath.CSVAGGREGATE.value}/{site}/{study}/"
f"{site}__{data_package}/{site}__{data_package}__aggregate.csv",
f"{BucketPath.CSVAGGREGATE.value}/{study}/"
f"{study}__{data_package}/{study}__{data_package}__aggregate.csv",
)
create_auth(s3_client, bucket_name, "general_1", "test_1", "general")
create_meta(s3_client, bucket_name, "general", "general_hospital")
Expand All @@ -52,9 +52,8 @@ def mock_bucket():
bucket = os.environ["BUCKET_NAME"]
s3_client.create_bucket(Bucket=bucket)
aggregate_params = [
["general_hospital", "covid", "encounter"],
["general_hospital", "lyme", "encounter"],
["st_elsewhere", "covid", "encounter"],
["covid", "encounter"],
["lyme", "encounter"],
]
for param_list in aggregate_params:
_init_mock_data(s3_client, bucket, *param_list)
Expand Down
70 changes: 49 additions & 21 deletions tests/site_upload/test_powerset_merge.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import boto3
import os

import awswrangler
import pytest

from datetime import datetime, timezone
from freezegun import freeze_time

Expand All @@ -12,42 +14,47 @@
from tests.utils import get_mock_metadata, TEST_BUCKET, ITEM_COUNT


SITE_NAME = "general_hospital"
STUDY_NAME = "study"
DATA_P_NAME = "encounter"


@freeze_time("2020-01-01")
@pytest.mark.parametrize(
"site,upload_file,upload_path,event_key,archives,status,expected_contents",
[
( # Adding a new data package to a site with uploads
"general_hospital",
"./tests/test_data/cube_simple_example.parquet",
"/covid/encounter/general_hospital/document.parquet",
"/covid/encounter/general_hospital/document.parquet",
f"{SITE_NAME}",
"./tests/test_data/count_synthea_patient.parquet",
f"/{STUDY_NAME}/{DATA_P_NAME}/{SITE_NAME}/encounter.parquet",
f"/{STUDY_NAME}/{DATA_P_NAME}/{SITE_NAME}/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
),
( # Adding a new data package to a site without uploads
"chicago_hope",
"./tests/test_data/cube_simple_example.parquet",
"/covid/encounter/chicago_hope/document.parquet",
"/covid/encounter/chicago_hope/document.parquet",
"./tests/test_data/count_synthea_patient.parquet",
f"/{STUDY_NAME}/{DATA_P_NAME}/chicago_hope/encounter.parquet",
f"/{STUDY_NAME}/{DATA_P_NAME}/chicago_hope/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
),
( # Updating an existing data package
"general_hospital",
"./tests/test_data/cube_simple_example.parquet",
"/covid/encounter/general_hospital/encounter.parquet",
"/covid/encounter/general_hospital/encounter.parquet",
f"{SITE_NAME}",
"./tests/test_data/count_synthea_patient.parquet",
f"/covid/{DATA_P_NAME}/{SITE_NAME}/encounter.parquet",
f"/covid/{DATA_P_NAME}/{SITE_NAME}/encounter.parquet",
True,
200,
ITEM_COUNT + 4,
ITEM_COUNT + 2,
),
( # Invalid parquet file
"general_hospital",
f"{SITE_NAME}",
"./tests/site_upload/test_powerset_merge.py",
"/covid/encounter/general_hospital/document.parquet",
"/covid/encounter/general_hospital/document.parquet",
f"/{STUDY_NAME}/{DATA_P_NAME}/{SITE_NAME}/patient.parquet",
f"/{STUDY_NAME}/{DATA_P_NAME}/{SITE_NAME}/patient.parquet",
False,
500,
ITEM_COUNT + 1,
Expand Down Expand Up @@ -77,36 +84,57 @@ def test_powerset_merge(
TEST_BUCKET,
f"{BucketPath.LAST_VALID.value}{upload_path}",
)
event = {"Records": [{"Sns": {"Message": f"{BucketPath.LATEST.value}{event_key}"}}]}

event = {"Records": [{"Sns": {"Message": f"{BucketPath.LATEST.value}{event_key}"}}]}
# This array looks like:
# ['', 'study', 'package', 'site', 'file']
event_list = event_key.split("/")
expected_study = event_list[1]
expected_package = event_list[2]
expected_site = event_list[3]
res = powerset_merge_handler(event, {})
assert res["statusCode"] == status
s3_res = s3_client.list_objects_v2(Bucket=TEST_BUCKET)
assert len(s3_res["Contents"]) == expected_contents
for item in s3_res["Contents"]:
if item["Key"].endswith("aggregate.parquet"):
assert item["Key"].startswith(BucketPath.AGGREGATE.value)
# This finds the aggregate that was created/updated - ie it skips mocks
if (
expected_study in item["Key"]
and expected_study in item["Key"]
and status == 200
):
agg_df = awswrangler.s3.read_parquet(
f"s3://{TEST_BUCKET}/{item['Key']}"
)
assert (agg_df["site"].eq(expected_site)).any()
elif item["Key"].endswith("aggregate.csv"):
assert item["Key"].startswith(BucketPath.CSVAGGREGATE.value)
elif item["Key"].endswith("transactions.json"):
assert item["Key"].startswith(BucketPath.META.value)
metadata = read_metadata(s3_client, TEST_BUCKET)
print(metadata)
if res["statusCode"] == 200:
study = event_key.split("/")[1]
assert (
metadata[site]["covid"]["encounter"]["last_aggregation"]
metadata[site][study][DATA_P_NAME]["last_aggregation"]
== datetime.now(timezone.utc).isoformat()
)
else:
assert (
metadata[site]["covid"]["encounter"]["last_aggregation"]
== get_mock_metadata()[site]["covid"]["encounter"][
metadata["general_hospital"]["covid"]["encounter"][
"last_aggregation"
]
== get_mock_metadata()["general_hospital"]["covid"]["encounter"][
"last_aggregation"
]
)
elif item["Key"].startswith(BucketPath.LAST_VALID.value):
assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}")
else:
assert (
item["Key"].startswith(BucketPath.LAST_VALID.value)
or item["Key"].startswith(BucketPath.ARCHIVE.value)
item["Key"].startswith(BucketPath.ARCHIVE.value)
or item["Key"].startswith(BucketPath.ERROR.value)
or item["Key"].startswith(BucketPath.ADMIN.value)
or item["Key"].endswith("study_periods.json")
Expand Down
Loading

0 comments on commit 10553c1

Please sign in to comment.