Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include predefined check functions by default when applying custom checks by metadata #203

Merged
merged 7 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion demos/dqx_demo_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,10 @@ def ends_with_foo(col_name: str) -> Column:

dq_engine = DQEngine(WorkspaceClient())

valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
custom_check_functions = {"ends_with_foo": ends_with_foo}
#custom_check_functions=globals() # include all functions for simplicity

valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, custom_check_functions)
display(valid_and_quarantined_df)

# COMMAND ----------
Expand Down
31 changes: 19 additions & 12 deletions docs/dqx/docs/guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Validating quality rules can be added to the CI/CD process to ensure checks are

Note that checks are validated automatically when applied as part of the
`apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods
(see [Quality rules defined as config](#quality-rules-defined-as-config)).
(see [Quality rules defined as config](#quality-rules-defined-in-files)).

### Using CLI

Expand All @@ -137,7 +137,7 @@ The following DQX configuration from 'config.yml' will be used by default:

## Adding quality checks to the application

### Quality rules defined as config
### Quality rules defined in files

Quality rules can be stored in `yaml` or `json` file. Below an example `yaml` file defining checks ('checks.yml'):
```yaml
Expand Down Expand Up @@ -171,9 +171,12 @@ Fields:
### Loading and execution methods

Checks can be loaded from a file in the installation folder, workspace, or local file system. If the checks file contains invalid json or yaml syntax, the engine will raise an error.
The checks can be applied using `apply_checks_by_metadata_and_split` or `apply_checks_by_metadata` methods. The checks are validated automatically as part of these methods.
If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_by_metadata_and_split`.
If you want to report issues as additional columns, use `apply_checks_by_metadata`.

Checks loaded from a file can be applied using one of the following methods:
* `apply_checks_by_metadata_and_split`: splits the input data into valid and invalid (quarantined) dataframes.
* `apply_checks_by_metadata`: report issues as additional columns.

Syntax of the loaded checks are validated automatically as part of these methods.

#### Method 1: Loading checks from a workspace file in the installation folder

Expand Down Expand Up @@ -216,8 +219,6 @@ valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
```

Checks are validated automatically as part of the `apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods.

#### Method 3: Loading checks from a local file

Checks can also be loaded from a file in the local file system:
Expand All @@ -240,12 +241,13 @@ valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)

### Quality rules defined as code

Check can be defined in the code and applied using `apply_checks_and_split` or `apply_checks` methods.
If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_and_split`.
If you want to report issues as additional columns, use `apply_checks`.

#### Method 1: Using DQX classes

Checks defined using DQX classes can applied using one of the following methods:
* `apply_checks_and_split`: if you want to split the checked data into valid and invalid (quarantined) dataframes.
* `apply_checks`: if you want to report issues as additional columns.

Example:
```python
from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list
from databricks.labs.dqx.engine import DQEngine
Expand Down Expand Up @@ -284,8 +286,13 @@ valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)

See details of the check functions [here](/docs/reference/quality_rules).

#### Method 2: Using yaml config
#### Method 2: Using metadata (yaml/json)

Checks defined as metadata in `yaml` or `json` can applied using one of the following methods:
* `apply_checks_by_metadata_and_split`: if you want to split the checked data into valid and invalid (quarantined) dataframes.
* `apply_checks_by_metadata`: if you want to report issues as additional columns.

Example:
```python
import yaml
from databricks.labs.dqx.engine import DQEngine
Expand Down
32 changes: 16 additions & 16 deletions docs/dqx/docs/reference/engine.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ Information on testing applications that use `DQEngine` can be found [here](/doc

The following table outlines the available methods of the DQEngine and their functionalities:

| Check | Description | Arguments |
| ---------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| apply_checks | Applies quality checks to the DataFrame and returns a DataFrame with reporting columns. | df: DataFrame to check; checks: List of checks to the DataFrame. Each check is an instance of DQRule class. |
| apply_checks_and_split | Applies quality checks to the DataFrame and returns valid and invalid (quarantine) DataFrames with reporting columns. | df: DataFrame to check; checks: List of checks to apply to the DataFrame. Each check is an instance of DQRule class. |
| apply_checks_by_metadata | Applies quality checks defined as a dictionary to the DataFrame and returns a DataFrame with reporting columns. | df: DataFrame to check. checks: List of dictionaries describing checks. glbs: Optional dictionary with functions mapping (e.g., globals() of the calling module). |
| apply_checks_by_metadata_and_split | Applies quality checks defined as a dictionary and returns valid and invalid (quarantine) DataFrames. | df: DataFrame to check; checks: List of dictionaries describing checks. glbs: Optional dictionary with functions mapping (e.g., globals() of the calling module). |
| validate_checks | Validates the provided quality checks to ensure they conform to the expected structure and types. | checks: List of checks to validate; glbs: Optional dictionary of global functions that can be used. |
| get_invalid | Retrieves records from the DataFrame that violate data quality checks (records with warnings and errors). | df: Input DataFrame. |
| get_valid | Retrieves records from the DataFrame that pass all data quality checks. | df: Input DataFrame. |
| load_checks_from_local_file | Loads quality rules from a local file (supports YAML and JSON). | path: Path to a file containing the checks. |
| save_checks_in_local_file | Saves quality rules to a local file in YAML format. | checks: List of checks to save; path: Path to a file containing the checks. |
| load_checks_from_workspace_file | Loads checks from a file (JSON or YAML) stored in the Databricks workspace. | workspace_path: Path to the file in the workspace. |
| load_checks_from_installation | Loads checks from the workspace installation configuration file (`checks_file` field). | run_config_name: Name of the run config to use; product_name: Name of the product/installation directory; assume_user: If True, assume user installation. |
| save_checks_in_workspace_file | Saves checks to a file (YAML) in the Databricks workspace. | checks: List of checks to save; workspace_path: Destination path for the checks file in the workspace. |
| save_checks_in_installation | Saves checks to the installation folder as a YAML file. | checks: List of checks to save; run_config_name: Name of the run config to use; assume_user: If True, assume user installation. |
| load_run_config | Loads run configuration from the installation folder. | run_config_name: Name of the run config to use; assume_user: If True, assume user installation. |
| Check | Description | Arguments |
| ---------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| apply_checks | Applies quality checks to the DataFrame and returns a DataFrame with reporting columns. | df: DataFrame to check; checks: List of checks to the DataFrame. Each check is an instance of DQRule class. |
| apply_checks_and_split | Applies quality checks to the DataFrame and returns valid and invalid (quarantine) DataFrames with reporting columns. | df: DataFrame to check; checks: List of checks to apply to the DataFrame. Each check is an instance of DQRule class. |
| apply_checks_by_metadata | Applies quality checks defined as a dictionary to the DataFrame and returns a DataFrame with reporting columns. | df: DataFrame to check. checks: List of dictionaries describing checks; custom_check_functions: Optional dictionary with custom check functions (e.g., globals() of the calling module). |
| apply_checks_by_metadata_and_split | Applies quality checks defined as a dictionary and returns valid and invalid (quarantine) DataFrames. | df: DataFrame to check; checks: List of dictionaries describing checks; custom_check_functions: Optional dictionary with custom check functions (e.g., globals() of the calling module). |
| validate_checks | Validates the provided quality checks to ensure they conform to the expected structure and types. | checks: List of checks to validate; custom_check_functions: Optional dictionary of custom check functions that can be used. |
| get_invalid | Retrieves records from the DataFrame that violate data quality checks (records with warnings and errors). | df: Input DataFrame. |
| get_valid | Retrieves records from the DataFrame that pass all data quality checks. | df: Input DataFrame. |
| load_checks_from_local_file | Loads quality rules from a local file (supports YAML and JSON). | path: Path to a file containing the checks. |
| save_checks_in_local_file | Saves quality rules to a local file in YAML format. | checks: List of checks to save; path: Path to a file containing the checks. |
| load_checks_from_workspace_file | Loads checks from a file (JSON or YAML) stored in the Databricks workspace. | workspace_path: Path to the file in the workspace. |
| load_checks_from_installation | Loads checks from the workspace installation configuration file (`checks_file` field). | run_config_name: Name of the run config to use; product_name: Name of the product/installation directory; assume_user: If True, assume user installation. |
| save_checks_in_workspace_file | Saves checks to a file (YAML) in the Databricks workspace. | checks: List of checks to save; workspace_path: Destination path for the checks file in the workspace. |
| save_checks_in_installation | Saves checks to the installation folder as a YAML file. | checks: List of checks to save; run_config_name: Name of the run config to use; assume_user: If True, assume user installation. |
| load_run_config | Loads run configuration from the installation folder. | run_config_name: Name of the run config to use; assume_user: If True, assume user installation. |
44 changes: 36 additions & 8 deletions docs/dqx/docs/reference/quality_rules.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ Sql expression is also useful if you want to make cross-column validation, for e

### Define custom check functions

If you need a reusable check or need to implement a more complicated logic
you can define your own check functions. A check is a function available from 'globals' that returns `pyspark.sql.Column`, for example:
If you need a reusable check or want to implement more complex logic, you can define your own custom check functions.
A check function is a callable that returns a `pyspark.sql.Column`. For example:

```python
import pyspark.sql.functions as F
Expand All @@ -82,28 +82,56 @@ def ends_with_foo(col_name: str) -> Column:
return make_condition(column.endswith("foo"), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo")
```

and use the function as a check:
You can use custom functions directly when defining checks using DQX classes:
```python
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import is_not_null

checks = [
DQRule(criticality="error", check=is_not_null("col1")),
DQRule(criticality="error", check=ends_with_foo("col1")),
]

dq_engine = DQEngine(WorkspaceClient())

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
```

You can use custom functions as follows when defining checks using metadata (yaml):
```python
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import *

checks = yaml.safe_load("""
- criticality: error
check:
function: ends_with_foo
arguments:
col_name: col1
- criticality: error
check:
function: is_not_null
arguments:
col_name: col1
""")

dq_engine = DQEngine(WorkspaceClient())

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks, globals())
custom_check_functions = {"ends_with_foo": ends_with_foo} # list of custom check functions
#custom_check_functions=globals() # include all functions for simplicity

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks, custom_check_functions)

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, custom_check_functions)
```

You can see all existing DQX checks [here](https://github.com/databrickslabs/dqx/blob/main/src/databricks/labs/dqx/col_functions.py).
Expand Down
Loading