Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Generate ODS Fact Tables #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

rymarczy
Copy link
Collaborator

@rymarczy rymarczy commented Mar 7, 2025

This change contains the initial business logic for generating ODS Fact tables for ODS Qlik History datasets.

This business logic allows for incremental loading of ODS Fact table from History datasets sources. Progress is tracked by header__change_seq column that is available from Qlik CDC records. This process adds an odin_index field to all Fact tables so that records can be referenced by a single index column.

This change required the addition of a couple new Parquet utility functions.

  • fast_last_mod_ds_max
    • faster way to get the maximum value from a dataset column, if you know the maximum value will be in the last modified file of a parquet dataset
  • ds_limit_k_sorted
    • Produces a limited number of sorted results from a very large parquet dataset in a memory constrained manner
  • ds_batched_join
    • Join dataframe records to a very large parquet dataset in a memory constrained manner

There are some areas where the Fact table business logic can be streamed lined, however this initial version of the logic is functional, if not pretty.

@rymarczy rymarczy requested review from grejdi-mbta and huangh March 7, 2025 18:56
Comment on lines -95 to -101
def reset_tmpdir(self, make_new: bool = True) -> None:
"""Reset TemporaryDirectory folder."""
if hasattr(self, "_tdir"):
self._tdir.cleanup() # type: ignore
if make_new:
self._tdir = tempfile.TemporaryDirectory()
self.tmpdir = self._tdir.name
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to OdinJob to make the tempdir functionality available to all OdinJob objects be default.


history_ds_rows = self.history_ds.count_rows()
history_ds_groups = 0
frag: pd.ParquetFileFragment
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this declaration do anything? looks like frag is just used in this loop so it's an extra line that doesn't provide much additional info. For lint?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, just for linting. It's one of the ways to type hint for loop iter variables.

frag: pd.ParquetFileFragment
for frag in self.history_ds.get_fragments():
history_ds_groups += frag.num_row_groups
self.batch_size = int(history_ds_rows / (20 * history_ds_groups))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm used to extracting out "magic numbers" in my code - would it be possible/make sense to bubble these values up into a config struct, defined at the top of the file maybe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I totally agree. I want to make a custom dataset class that offers a bunch of methods and properties like this automatically.

sync_file = found_objs[-1].path.replace("s3://", "")
destination = os.path.join(self.tmpdir, sync_file.replace("/year_", "/temp_"))
download_object(found_objs[-1].path, destination)
sync_paths.append(destination)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if/else looks to be used to populate search_path. Can we generate search_paths in a list (or just the default path if part_column = None), then list comprehension to generate sync_paths? wrap found_objs...download in a method download_found_objects(...) to be the lambda in list comprehension.

This deduplicates code, and minimizes the amount of code in the control flow, which is more clear imo.

batch = batch.drop_columns(self.history_drop_columns)
writer.write_batch(batch)
odin_index += batch.num_rows
if os.path.getsize(write_path) * 3 > free_disk_bytes():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bubble up this tunable parameter?

fact_ds = ds_from_path(f"s3://{self.s3_export}/")
_, max_fact_seq = ds_column_min_max(fact_ds, "header__change_seq")
cdc_filter = (pc.field("header__change_seq") > max_fact_seq) & (
pc.field("header__change_oper") != "B"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we explain this "B" in a comment?

move_path = new_path.replace(f"{self.tmpdir}/", "")
upload_file(new_path, move_path)

if cdc_iter > 1:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this means

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. can we set a variable and name it something like rerun_quick = 60*5, rerun_next_hour = 60*60, and return those instead?

@@ -46,6 +52,8 @@ def start(self, schedule: sched.scheduler) -> None:
)

except Exception as exception:
# Don't run again for 12 hours on failure
run_delay_secs = 60 * 60 * 12
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

"""
Find max value of column from the file of parquet partition that was most recently modified.

This is usefull for very large datasets where it is guaranteed that the max value of a column
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usefull->useful

This is usefull for very large datasets where it is guaranteed that the max value of a column
will be in the most recently modified file of the partition.

TODO: make this work locally as well??
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants