-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT: Generate ODS Fact Tables #3
base: main
Are you sure you want to change the base?
Conversation
def reset_tmpdir(self, make_new: bool = True) -> None: | ||
"""Reset TemporaryDirectory folder.""" | ||
if hasattr(self, "_tdir"): | ||
self._tdir.cleanup() # type: ignore | ||
if make_new: | ||
self._tdir = tempfile.TemporaryDirectory() | ||
self.tmpdir = self._tdir.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to OdinJob to make the tempdir functionality available to all OdinJob objects be default.
|
||
history_ds_rows = self.history_ds.count_rows() | ||
history_ds_groups = 0 | ||
frag: pd.ParquetFileFragment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this declaration do anything? looks like frag is just used in this loop so it's an extra line that doesn't provide much additional info. For lint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, just for linting. It's one of the ways to type hint for loop iter variables.
frag: pd.ParquetFileFragment | ||
for frag in self.history_ds.get_fragments(): | ||
history_ds_groups += frag.num_row_groups | ||
self.batch_size = int(history_ds_rows / (20 * history_ds_groups)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm used to extracting out "magic numbers" in my code - would it be possible/make sense to bubble these values up into a config struct, defined at the top of the file maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I totally agree. I want to make a custom dataset
class that offers a bunch of methods and properties like this automatically.
sync_file = found_objs[-1].path.replace("s3://", "") | ||
destination = os.path.join(self.tmpdir, sync_file.replace("/year_", "/temp_")) | ||
download_object(found_objs[-1].path, destination) | ||
sync_paths.append(destination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if/else looks to be used to populate search_path. Can we generate search_paths in a list (or just the default path if part_column = None), then list comprehension to generate sync_paths? wrap found_objs...download in a method download_found_objects(...) to be the lambda in list comprehension.
This deduplicates code, and minimizes the amount of code in the control flow, which is more clear imo.
batch = batch.drop_columns(self.history_drop_columns) | ||
writer.write_batch(batch) | ||
odin_index += batch.num_rows | ||
if os.path.getsize(write_path) * 3 > free_disk_bytes(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bubble up this tunable parameter?
fact_ds = ds_from_path(f"s3://{self.s3_export}/") | ||
_, max_fact_seq = ds_column_min_max(fact_ds, "header__change_seq") | ||
cdc_filter = (pc.field("header__change_seq") > max_fact_seq) & ( | ||
pc.field("header__change_oper") != "B" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we explain this "B" in a comment?
move_path = new_path.replace(f"{self.tmpdir}/", "") | ||
upload_file(new_path, move_path) | ||
|
||
if cdc_iter > 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what this means
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. can we set a variable and name it something like rerun_quick = 60*5, rerun_next_hour = 60*60, and return those instead?
@@ -46,6 +52,8 @@ def start(self, schedule: sched.scheduler) -> None: | |||
) | |||
|
|||
except Exception as exception: | |||
# Don't run again for 12 hours on failure | |||
run_delay_secs = 60 * 60 * 12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
""" | ||
Find max value of column from the file of parquet partition that was most recently modified. | ||
|
||
This is usefull for very large datasets where it is guaranteed that the max value of a column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usefull->useful
This is usefull for very large datasets where it is guaranteed that the max value of a column | ||
will be in the most recently modified file of the partition. | ||
|
||
TODO: make this work locally as well?? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes please 😄
This change contains the initial business logic for generating ODS Fact tables for ODS Qlik History datasets.
This business logic allows for incremental loading of ODS Fact table from History datasets sources. Progress is tracked by
header__change_seq
column that is available from Qlik CDC records. This process adds anodin_index
field to all Fact tables so that records can be referenced by a single index column.This change required the addition of a couple new Parquet utility functions.
There are some areas where the Fact table business logic can be streamed lined, however this initial version of the logic is functional, if not pretty.