@@ -38,8 +46,8 @@ after curation to ensure that data quality constraints are met.
## When to use DQX
-* Use DQX if you need pro-active monitoring (before data is written to a target table).
+* Use DQX if you need pro-active monitoring (before data is written to a target table), especially in the streaming pipelines.
* For monitoring data quality of already persisted data in Delta tables (post-factum monitoring), try
[Databricks Lakehouse Monitoring](https://docs.databricks.com/en/lakehouse-monitoring/index.html).
-* DQX can be integrated with DLT for data quality checking but your first choice for DLT pipelines should be [DLT Expectations](https://docs.databricks.com/en/delta-live-tables/expectations.html#what-are-delta-live-tables-expectations). DQX can be used to profile data and generate DLT expectation candidates.
+* DQX can be integrated with DLT to check data quality. Still, your first choice for DLT pipelines should be [DLT Expectations](https://docs.databricks.com/en/delta-live-tables/expectations.html#what-are-delta-live-tables-expectations). DQX can be used to profile data and generate DLT expectation candidates.
* DQX can be integrated with other data transformation frameworks that support PySpark, such as [dbt](https://docs.getdbt.com/docs/build/python-models). However, this integration is limited to dbt Python models and does not extend to dbt SQL models.
diff --git a/docs/dqx/docs/reference/engine.mdx b/docs/dqx/docs/reference/engine.mdx
index 95124689..829de3f1 100644
--- a/docs/dqx/docs/reference/engine.mdx
+++ b/docs/dqx/docs/reference/engine.mdx
@@ -1,34 +1,45 @@
+import Admonition from '@theme/Admonition';
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
# DQX Engine
-To perform data quality checking with DQX, you need to create `DQEngine` object.
+To perform data quality checking with DQX, you must create a `DQEngine` object.
The engine requires a Databricks [workspace client](https://docs.databricks.com/aws/en/dev-tools/sdk-python) for authentication and interaction with the Databricks workspace.
-When running the code on a Databricks workspace, the workspace client is automatically authenticated, whether DQX is used in a notebook, script, or as part of a job/workflow.
+When running the code on a Databricks workspace, the workspace client is automatically authenticated, whether DQX is used in a notebook, script, or job/workflow.
You only need the following code to create the workspace client if you run DQX on Databricks workspace:
-```python
-from databricks.sdk import WorkspaceClient
-from databricks.labs.dqx.engine import DQEngine
-ws = WorkspaceClient()
-dq_engine = DQEngine(ws)
-```
+
For external environments, such as CI servers or local machines, you can authenticate to Databricks using any method supported by the Databricks SDK. For detailed instructions, refer to the [default authentication flow](https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html#default-authentication-flow).
-If you're using Databricks [configuration profiles](https://docs.databricks.com/dev-tools/auth.html#configuration-profiles) or Databricks-specific [environment variables](https://docs.databricks.com/dev-tools/auth.html#environment-variables) for authentication, you can easily create the workspace client without needing to provide additional arguments:
+If you're using Databricks [configuration profiles](https://docs.databricks.com/dev-tools/auth.html#configuration-profiles) or Databricks-specific [environment variables](https://docs.databricks.com/dev-tools/auth.html#environment-variables) for authentication, you can create the workspace client without needing to provide additional arguments:
```python
ws = WorkspaceClient()
```
-Information on testing applications that use `DQEngine` can be found [here](/docs/reference/testing).
+Information on testing applications that use `DQEngine` including local execution without a Databricks workspace can be found [here](/docs/reference/testing).
## DQX engine methods
-The following table outlines the available methods of the DQEngine and their functionalities:
+The following table outlines the available methods of the `DQEngine` and their functionalities:
+
diff --git a/docs/dqx/docs/reference/quality_rules.mdx b/docs/dqx/docs/reference/quality_rules.mdx
index 25ca722d..d0420473 100644
--- a/docs/dqx/docs/reference/quality_rules.mdx
+++ b/docs/dqx/docs/reference/quality_rules.mdx
@@ -1,52 +1,491 @@
+import Admonition from '@theme/Admonition';
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
# Quality rules
-This page provides a reference for the quality rule functions (checks) available in DQX.
-
-## Quality rule functions (checks)
-
-The following quality rules / functions are currently available:
-
-| Check | Description | Arguments |
-| -------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| is_not_null | Check if input column is not null | col_name: column name to check |
-| is_not_empty | Check if input column is not empty | col_name: column name to check |
-| is_not_null_and_not_empty | Check if input column is not null or empty | col_name: column name to check; trim_strings: boolean flag to trim spaces from strings |
-| value_is_in_list | Check if the provided value is present in the input column. | col_name: column name to check; allowed: list of allowed values |
-| value_is_not_null_and_is_in_list | Check if provided value is present if the input column is not null | col_name: column name to check; allowed: list of allowed values |
-| is_not_null_and_not_empty_array | Check if input array column is not null or empty | col_name: column name to check |
-| is_in_range | Check if input column is in the provided range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit value; max_limit: max limit value; min_limit_col_expr: min limit column name or expr; max_limit_col_expr: max limit column name or expr |
-| is_not_in_range | Check if input column is not within defined range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit value; max_limit: max limit value; min_limit_col_expr: min limit column name or expr; max_limit_col_expr: max limit column name or expr |
-| not_less_than | Check if input column is not less than the provided limit | col_name: column name to check; limit: limit value |
-| not_greater_than | Check if input column is not greater than the provided limit | col_name: column name to check; limit: limit value |
-| is_valid_date | Check if input column is a valid date | col_name: column name to check; date_format: date format (e.g. 'yyyy-mm-dd') |
-| is_valid_timestamp | Check if input column is a valid timestamp | col_name: column name to check; timestamp_format: timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') |
-| not_in_future | Check if input column defined as date is not in the future (future defined as current_timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used |
-| not_in_near_future | Check if input column defined as date is not in the near future (near future defined as grater than current timestamp but less than current timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used |
-| is_older_than_n_days | Check if input column is older than n number of days | col_name: column name to check; days: number of days; curr_date: current date, if not provided current_date() function is used |
-| is_older_than_col2_for_n_days | Check if one column is not older than another column by n number of days | col_name1: first column name to check; col_name2: second column name to check; days: number of days |
-| regex_match | Check if input column matches a given regex | col_name: column name to check; regex: regex to check; negate: if the condition should be negated (true) or not |
-| sql_expression | Check if input column is matches the provided sql expression, eg. a = 'str1', a > b | expression: sql expression to check; msg: optional message to output; name: optional name of the resulting column; negate: if the condition should be negated |
-
-You can check implementation details of the rules [here](https://github.com/databrickslabs/dqx/blob/main/src/databricks/labs/dqx/col_functions.py).
-
-## Apply filters on checks
+This page provides a reference for the quality checks (rule functions) available in DQX.
+
+## Row-level quality checks
+
+The following row-level checks are currently available in DQX.
+These checks are applied to each row of a PySpark DataFrame and generate issue reports as additional columns.
+You can also define your own custom checks (see [Creating custom checks](#creating-custom-checks)).
+
+
+
+You can explore the implementation details of the rules [here](https://github.com/databrickslabs/dqx/blob/main/src/databricks/labs/dqx/col_functions.py).
+If you have a custom check that could be broadly useful, feel free to submit a PR to [DQX](https://github.com/databrickslabs/dqx) (see the [contribution guide](/docs/dev/contributing) for details).
+
+### Usage examples of row-level checks
+
+Below are fully specified examples of how to use each check in YAML format and with DQX classes. Both are equivalent and can be used interchangeably.
+
+The `criticality` field can be either "error" (data goes only into the 'bad' or 'quarantine' DataFrame) or "warn" (data goes into both DataFrames).
+For brevity, the `name` field in the examples is omitted, meaning it will be auto-generated in the results.
+
+
+
+## Applying filters on checks
You can apply checks to a part of the DataFrame by using a `filter`.
-For example, to ensure that a column `a` is not null only when a column `b` is positive, you can define the check as follows:
+For example, to check that a column `a` is not null only when a column `b` is positive, you can define the check as follows:
```yaml
- criticality: error
- filter: b > 0
+ filter: col2 > 0
check:
function: is_not_null
arguments:
- col_name: a
+ col_name: col1
```
-## Creating your own checks
+## Creating custom checks
-### Use sql expression
+### Custom checks with SQL Expression
-If a check that you need does not exist in DQX, you can define them using sql expression rule (`sql_expression`),
+You can define custom checks using SQL Expression rule (`sql_expression`),
for example:
```yaml
- criticality: error
@@ -57,83 +496,96 @@ for example:
msg: col1 ends with 'foo'
```
-Sql expression is also useful if you want to make cross-column validation, for example:
+SQL Expressions are also useful if you need to make cross-column validation, for example:
```yaml
- criticality: error
check:
function: sql_expression
arguments:
- expression: a > b
- msg: a is greater than b
+ expression: col1 > col2
+ msg: col1 is greater than col2
```
-### Define custom check functions
+### Custom checks as a Python function
-If you need a reusable check or want to implement more complex logic, you can define your own custom check functions.
-A check function is a callable that returns a `pyspark.sql.Column`. For example:
+If you need a reusable check or want to implement more complex logic which is challenging to implement with SQL, you can define your own custom check functions.
+A check function is a callable that returns a `pyspark.sql.Column`.
-```python
-import pyspark.sql.functions as F
-from pyspark.sql import Column
-from databricks.labs.dqx.col_functions import make_condition
+#### Custom check example
-def ends_with_foo(col_name: str) -> Column:
- column = F.col(col_name)
- return make_condition(column.endswith("foo"), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo")
-```
+
-checks = [
- DQRule(criticality="error", check=is_not_null("col1")),
- DQRule(criticality="error", check=ends_with_foo("col1")),
-]
+#### Execution of the custom check using DQX classes
-dq_engine = DQEngine(WorkspaceClient())
+
-dq_engine = DQEngine(WorkspaceClient())
+#### Execution of the custom check using YAML definition
-custom_check_functions = {"ends_with_foo": ends_with_foo} # list of custom check functions
-#custom_check_functions=globals() # include all functions for simplicity
+
diff --git a/docs/dqx/docs/reference/testing.mdx b/docs/dqx/docs/reference/testing.mdx
index 79533d63..8f2f9345 100644
--- a/docs/dqx/docs/reference/testing.mdx
+++ b/docs/dqx/docs/reference/testing.mdx
@@ -1,3 +1,7 @@
+import Admonition from '@theme/Admonition';
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
# Testing Applications Using DQX
## Standard testing with DQEngine
@@ -5,42 +9,47 @@
Testing applications that use DQEngine requires proper initialization of the Databricks workspace client. Detailed guidance on authentication for the workspace client is available [here](https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html#default-authentication-flow).
For testing, we recommend:
-* [pytester fixtures](https://github.com/databrickslabs/pytester) to setup Databricks remote Spark session and workspace client. For pytester to be able to authenticate to a workspace you need to use [debug_env_name fixture](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture). We recommend using the `~/.databricks/debug-env.json` file to store different sets of environment variables (see more details below).
+* [pytester fixtures](https://github.com/databrickslabs/pytester) is used to set up Databricks remote Spark session and workspace client. For pytester to be able to authenticate to a workspace, you need to use [debug_env_name fixture](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture). We recommend using the `~/.databricks/debug-env.json` file to store different sets of environment variables (see more details below).
* [chispa](https://github.com/MrPowers/chispa) for asserting Spark DataFrames.
-These libraries are also used internally for testing DQX.
+These libraries are also used internally to test DQX.
-Example test:
-```python
-from chispa.dataframe_comparer import assert_df_equality
-from databricks.labs.dqx.col_functions import is_not_null_and_not_empty
-from databricks.labs.dqx.engine import DQEngine
-from databricks.labs.dqx.rule import DQRule
+Below is an example test.
+
-### Setting up Databricks workspace client authentication in a terminal
+### Setting up Databricks Workspace Client authentication in a terminal
If you want to run the tests from your local machine in the terminal, you need to set up the following environment variables:
```shell
@@ -58,7 +67,7 @@ export DATABRICKS_SERVERLESS_COMPUTE_ID=auto
We recommend using [OAuth access token](https://docs.databricks.com/en/dev-tools/auth/oauth-m2m.html) generated for a service principal to authenticate with Databricks as presented above.
Alternatively, you can authenticate using [PAT token](https://docs.databricks.com/en/dev-tools/auth/pat.html) by setting the `DATABRICKS_TOKEN` environment variable. However, we do not recommend this method, as it is less secure than OAuth.
-### Setting up Databricks workspace client authentication in an IDE
+### Setting up Databricks Workspace Client authentication in an IDE
If you want to run the tests from your IDE, you must setup `.env` or `~/.databricks/debug-env.json` file
(see [instructions](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture)).
@@ -78,7 +87,7 @@ Create the `~/.databricks/debug-env.json` with the following content, replacing
}
```
-You must provide an existing cluster which will be auto-started for you as part of the tests.
+You must provide an existing cluster. It will auto-start for you as part of the tests.
We recommend using [OAuth access token](https://docs.databricks.com/en/dev-tools/auth/oauth-m2m.html) generated for a service principal to authenticate with Databricks as presented above.
Alternatively, you can authenticate using [PAT token](https://docs.databricks.com/en/dev-tools/auth/pat.html) by providing the `DATABRICKS_TOKEN` field. However, we do not recommend this method, as it is less secure than OAuth.
@@ -97,44 +106,50 @@ To run the integration tests on serverless compute, add the `DATABRICKS_SERVERLE
}
}
```
-When `DATABRICKS_SERVERLESS_COMPUTE_ID` is set the `DATABRICKS_CLUSTER_ID` is ignored, and tests run on serverless compute.
+When `DATABRICKS_SERVERLESS_COMPUTE_ID` is set, the `DATABRICKS_CLUSTER_ID` is ignored, and tests run on serverless compute.
### Local testing with DQEngine
If workspace-level access is unavailable in your testing environment, you can perform local testing by installing the latest `pyspark` package and mocking the workspace client.
+Below is an example test.
-**Note: This approach should be treated as experimental!** It does not offer the same level of testing as the standard approach and it is only applicable to selected methods.
+
diff --git a/docs/dqx/src/pages/index.tsx b/docs/dqx/src/pages/index.tsx
index f22582f7..8c74ecde 100644
--- a/docs/dqx/src/pages/index.tsx
+++ b/docs/dqx/src/pages/index.tsx
@@ -7,7 +7,7 @@ const CallToAction = () => {
return (
- Improve your data quality now 🚀
+ Improve your Data Quality now 🚀
Follow our comprehensive guide to get up and running with DQX in no time.
@@ -33,11 +33,11 @@ const Capabilities = () => {
},
{
title: 'Data Format Agnostic',
- description: 'Works seamlessly with Spark DataFrames.',
+ description: 'Works seamlessly with PySpark DataFrames.',
icon: FileText,
},
{
- title: 'Spark Batch & Streaming Support',
+ title: 'Spark Batch & Spark Structured Streaming Support',
description: 'Includes Delta Live Tables (DLT) integration.',
icon: Activity,
},
@@ -57,8 +57,8 @@ const Capabilities = () => {
icon: Grid,
},
{
- title: 'Profiling & Rule Generation',
- description: 'Automatically profile and generate data quality rule candidates.',
+ title: 'Profiling & Quality Rules Generation',
+ description: 'Automatically profile input data and generate data quality rule candidates.',
icon: BarChart2,
},
{
@@ -67,7 +67,7 @@ const Capabilities = () => {
icon: Code,
},
{
- title: 'Validation Summary & Dashboard',
+ title: 'Validation Summary & Quality Dashboard',
description: 'Track and identify data quality issues effectively.',
icon: PieChart,
},
@@ -116,7 +116,7 @@ const Hero = () => {
DQX is a data quality framework for Apache Spark that enables you to define, monitor, and
- react to data quality issues in your data pipelines.
+ address data quality issues in your Python-based data pipelines.
{/* Call to Action Buttons */}
diff --git a/src/databricks/labs/dqx/col_functions.py b/src/databricks/labs/dqx/col_functions.py
index 43f18c6f..f8dbfdfd 100644
--- a/src/databricks/labs/dqx/col_functions.py
+++ b/src/databricks/labs/dqx/col_functions.py
@@ -3,6 +3,7 @@
import pyspark.sql.functions as F
from pyspark.sql import Column
+from pyspark.sql.window import Window
def make_condition(condition: Column, message: Column | str, alias: str) -> Column:
@@ -22,13 +23,8 @@ def make_condition(condition: Column, message: Column | str, alias: str) -> Colu
return (F.when(condition, msg_col).otherwise(F.lit(None).cast("string"))).alias(_cleanup_alias_name(alias))
-def _cleanup_alias_name(col_name: str) -> str:
- # avoid issues with structs
- return col_name.replace(".", "_")
-
-
-def is_not_null_and_not_empty(col_name: str, trim_strings: bool = False) -> Column:
- """Creates a condition column to check if value is null or empty.
+def is_not_null_and_not_empty(col_name: str, trim_strings: bool | None = False) -> Column:
+ """Checks whether the values in the input column are not null and not empty.
:param col_name: column name to check
:param trim_strings: boolean flag to trim spaces from strings
@@ -42,7 +38,7 @@ def is_not_null_and_not_empty(col_name: str, trim_strings: bool = False) -> Colu
def is_not_empty(col_name: str) -> Column:
- """Creates a condition column to check if value is empty (but could be null).
+ """Checks whether the values in the input column are not empty (but may be null).
:param col_name: column name to check
:return: Column object for condition
@@ -53,7 +49,7 @@ def is_not_empty(col_name: str) -> Column:
def is_not_null(col_name: str) -> Column:
- """Creates a condition column to check if value is null.
+ """Checks whether the values in the input column are not null.
:param col_name: column name to check
:return: Column object for condition
@@ -62,13 +58,16 @@ def is_not_null(col_name: str) -> Column:
return make_condition(column.isNull(), f"Column {col_name} is null", f"{col_name}_is_null")
-def value_is_not_null_and_is_in_list(col_name: str, allowed: list) -> Column:
- """Creates a condition column to check if value is null or not in the list of allowed values.
+def is_not_null_and_is_in_list(col_name: str, allowed: list) -> Column:
+ """Checks whether the values in the input column are not null and present in the list of allowed values.
:param col_name: column name to check
:param allowed: list of allowed values (actual values or Column objects)
:return: Column object for condition
"""
+ if not allowed:
+ raise ValueError("allowed list is not provided.")
+
allowed_cols = [item if isinstance(item, Column) else F.lit(item) for item in allowed]
column = F.col(col_name)
condition = column.isNull() | ~column.isin(*allowed_cols)
@@ -78,21 +77,25 @@ def value_is_not_null_and_is_in_list(col_name: str, allowed: list) -> Column:
"",
F.lit("Value "),
F.when(column.isNull(), F.lit("null")).otherwise(column.cast("string")),
- F.lit(" is not in the allowed list: ["),
+ F.lit(" is null or not in the allowed list: ["),
F.concat_ws(", ", *allowed_cols),
F.lit("]"),
),
- f"{col_name}_value_is_not_in_the_list",
+ f"{col_name}_is_null_or_is_not_in_the_list",
)
-def value_is_in_list(col_name: str, allowed: list) -> Column:
- """Creates a condition column to check if value not in the list of allowed values (could be null).
+def is_in_list(col_name: str, allowed: list) -> Column:
+ """Checks whether the values in the input column are present in the list of allowed values
+ (null values are allowed).
:param col_name: column name to check
:param allowed: list of allowed values (actual values or Column objects)
:return: Column object for condition
"""
+ if not allowed:
+ raise ValueError("allowed list is not provided.")
+
allowed_cols = [item if isinstance(item, Column) else F.lit(item) for item in allowed]
column = F.col(col_name)
condition = ~column.isin(*allowed_cols)
@@ -106,7 +109,7 @@ def value_is_in_list(col_name: str, allowed: list) -> Column:
F.concat_ws(", ", *allowed_cols),
F.lit("]"),
),
- f"{col_name}_value_is_not_in_the_list",
+ f"{col_name}_is_not_in_the_list",
)
@@ -114,7 +117,7 @@ def value_is_in_list(col_name: str, allowed: list) -> Column:
def sql_expression(expression: str, msg: str | None = None, name: str | None = None, negate: bool = False) -> Column:
- """Creates a condition column from the SQL expression.
+ """Checks whether the condition provided as an SQL expression is met.
:param expression: SQL expression
:param msg: optional message of the `Column` type, automatically generated if None
@@ -137,8 +140,8 @@ def sql_expression(expression: str, msg: str | None = None, name: str | None = N
return make_condition(expr_col, F.concat_ws("", F.lit(f"Value matches expression: {expression_msg}")), name)
-def is_older_than_col2_for_n_days(col_name1: str, col_name2: str, days: int) -> Column:
- """Creates a condition column for case when one date or timestamp column is older than another column by N days.
+def is_older_than_col2_for_n_days(col_name1: str, col_name2: str, days: int = 0) -> Column:
+ """Checks whether the values in one input column are at least N days older than the values in another column.
:param col_name1: first column
:param col_name2: second column
@@ -159,13 +162,12 @@ def is_older_than_col2_for_n_days(col_name1: str, col_name2: str, days: int) ->
col2_date,
F.lit(f"' for more than {days} days"),
),
- f"is_col_{col_name1}_older_than_{col_name2}_for_N_days",
+ f"is_col_{col_name1}_older_than_{col_name2}_for_n_days",
)
def is_older_than_n_days(col_name: str, days: int, curr_date: Column | None = None) -> Column:
- """Creates a condition column for case when specified date or timestamp column is older (compared to current date)
- than N days.
+ """Checks whether the values in the input column are at least N days older than the current date.
:param col_name: name of the column to check
:param days: number of days
@@ -188,13 +190,13 @@ def is_older_than_n_days(col_name: str, days: int, curr_date: Column | None = No
curr_date,
F.lit(f"' for more than {days} days"),
),
- f"is_col_{col_name}_older_than_N_days",
+ f"is_col_{col_name}_older_than_n_days",
)
-def not_in_future(col_name: str, offset: int = 0, curr_timestamp: Column | None = None) -> Column:
- """Creates a condition column that checks if specified date or timestamp column is in the future.
- Future is considered as grater than current timestamp plus `offset` seconds.
+def is_not_in_future(col_name: str, offset: int = 0, curr_timestamp: Column | None = None) -> Column:
+ """Checks whether the values in the input column contain a timestamp that is not in the future,
+ where 'future' is defined as current_timestamp + offset (in seconds).
:param col_name: column name
:param offset: offset (in seconds) to add to the current timestamp at time of execution
@@ -216,9 +218,10 @@ def not_in_future(col_name: str, offset: int = 0, curr_timestamp: Column | None
)
-def not_in_near_future(col_name: str, offset: int = 0, curr_timestamp: Column | None = None) -> Column:
- """Creates a condition column that checks if specified date or timestamp column is in the near future.
- Near future is considered as grater than current timestamp but less than current timestamp plus `offset` seconds.
+def is_not_in_near_future(col_name: str, offset: int = 0, curr_timestamp: Column | None = None) -> Column:
+ """Checks whether the values in the input column contain a timestamp that is not in the near future,
+ where 'near future' is defined as greater than the current timestamp
+ but less than the current_timestamp + offset (in seconds).
:param col_name: column name
:param offset: offset (in seconds) to add to the current timestamp at time of execution
@@ -247,87 +250,59 @@ def not_in_near_future(col_name: str, offset: int = 0, curr_timestamp: Column |
)
-def not_less_than(col_name: str, limit: int | datetime.date | datetime.datetime) -> Column:
- """Creates a condition column that checks if a value is less than specified limit.
+def is_not_less_than(
+ col_name: str, limit: int | datetime.date | datetime.datetime | str | Column | None = None
+) -> Column:
+ """Checks whether the values in the input column are not less than the provided limit.
:param col_name: column name
- :param limit: limit to use in the condition
+ :param limit: limit to use in the condition as number, date, timestamp, column name or expression
:return: new Column
"""
- limit_expr = F.lit(limit)
+ limit_expr = _get_column_expr_limit(limit)
condition = F.col(col_name) < limit_expr
return make_condition(
condition,
- F.concat_ws(" ", F.lit("Value"), F.col(col_name), F.lit("is less than limit:"), F.lit(limit).cast("string")),
+ F.concat_ws(" ", F.lit("Value"), F.col(col_name), F.lit("is less than limit:"), limit_expr.cast("string")),
f"{col_name}_less_than_limit",
)
-def not_greater_than(col_name: str, limit: int | datetime.date | datetime.datetime) -> Column:
- """Creates a condition column that checks if a value is greater than specified limit.
+def is_not_greater_than(
+ col_name: str, limit: int | datetime.date | datetime.datetime | str | Column | None = None
+) -> Column:
+ """Checks whether the values in the input column are not greater than the provided limit.
:param col_name: column name
- :param limit: limit to use in the condition
+ :param limit: limit to use in the condition as number, date, timestamp, column name or expression
:return: new Column
"""
- limit_expr = F.lit(limit)
+ limit_expr = _get_column_expr_limit(limit)
condition = F.col(col_name) > limit_expr
return make_condition(
condition,
- F.concat_ws(" ", F.lit("Value"), F.col(col_name), F.lit("is greater than limit:"), F.lit(limit).cast("string")),
+ F.concat_ws(" ", F.lit("Value"), F.col(col_name), F.lit("is greater than limit:"), limit_expr.cast("string")),
f"{col_name}_greater_than_limit",
)
-def _get_min_max_column_expr(
- min_limit: int | datetime.date | datetime.datetime | str | None = None,
- max_limit: int | datetime.date | datetime.datetime | str | None = None,
- min_limit_col_expr: str | Column | None = None,
- max_limit_col_expr: str | Column | None = None,
-) -> tuple[Column, Column]:
- """Helper function to create a condition for the is_(not)_in_range functions.
-
- :param min_limit: min limit value
- :param max_limit: max limit value
- :param min_limit_col_expr: min limit column name or expr
- :param max_limit_col_expr: max limit column name or expr
- :return: tuple containing min_limit_expr and max_limit_expr
- :raises: ValueError when both min_limit/min_limit_col_expr or max_limit/max_limit_col_expr are null
- """
- if (min_limit is None and min_limit_col_expr is None) or (max_limit is None and max_limit_col_expr is None):
- raise ValueError('Either min_limit / min_limit_col_expr or max_limit / max_limit_col_expr is empty')
- if min_limit_col_expr is None:
- min_limit_expr = F.lit(min_limit)
- else:
- min_limit_expr = F.col(min_limit_col_expr) if isinstance(min_limit_col_expr, str) else min_limit_col_expr
- if max_limit_col_expr is None:
- max_limit_expr = F.lit(max_limit)
- else:
- max_limit_expr = F.col(max_limit_col_expr) if isinstance(max_limit_col_expr, str) else max_limit_col_expr
- return (min_limit_expr, max_limit_expr)
-
-
def is_in_range(
col_name: str,
- min_limit: int | datetime.date | datetime.datetime | str | None = None,
- max_limit: int | datetime.date | datetime.datetime | str | None = None,
- min_limit_col_expr: str | Column | None = None,
- max_limit_col_expr: str | Column | None = None,
+ min_limit: int | datetime.date | datetime.datetime | str | Column | None = None,
+ max_limit: int | datetime.date | datetime.datetime | str | Column | None = None,
) -> Column:
- """Creates a condition column that checks if a value is smaller than min limit or greater than max limit.
+ """Checks whether the values in the input column are in the provided limits (inclusive of both boundaries).
:param col_name: column name
- :param min_limit: min limit value
- :param max_limit: max limit value
- :param min_limit_col_expr: min limit column name or expr
- :param max_limit_col_expr: max limit column name or expr
+ :param min_limit: min limit to use in the condition as number, date, timestamp, column name or expression
+ :param max_limit: max limit to use in the condition as number, date, timestamp, column name or expression
:return: new Column
"""
- min_limit_expr, max_limit_expr = _get_min_max_column_expr(
- min_limit, max_limit, min_limit_col_expr, max_limit_col_expr
- )
+ min_limit_expr = _get_column_expr_limit(min_limit)
+ max_limit_expr = _get_column_expr_limit(max_limit)
+
condition = (F.col(col_name) < min_limit_expr) | (F.col(col_name) > max_limit_expr)
return make_condition(
@@ -348,24 +323,20 @@ def is_in_range(
def is_not_in_range(
col_name: str,
- min_limit: int | datetime.date | datetime.datetime | str | None = None,
- max_limit: int | datetime.date | datetime.datetime | str | None = None,
- min_limit_col_expr: str | Column | None = None,
- max_limit_col_expr: str | Column | None = None,
+ min_limit: int | datetime.date | datetime.datetime | str | Column | None = None,
+ max_limit: int | datetime.date | datetime.datetime | str | Column | None = None,
) -> Column:
- """Creates a condition column that checks if a value is within min and max limits.
+ """Checks whether the values in the input column are outside the provided limits (inclusive of both boundaries).
:param col_name: column name
- :param min_limit: min limit value
- :param max_limit: max limit value
- :param min_limit_col_expr: min limit column name or expr
- :param max_limit_col_expr: max limit column name or expr
+ :param min_limit: min limit to use in the condition as number, date, timestamp, column name or expression
+ :param max_limit: min limit to use in the condition as number, date, timestamp, column name or expression
:return: new Column
"""
- min_limit_expr, max_limit_expr = _get_min_max_column_expr(
- min_limit, max_limit, min_limit_col_expr, max_limit_col_expr
- )
- condition = (F.col(col_name) > min_limit_expr) & (F.col(col_name) < max_limit_expr)
+ min_limit_expr = _get_column_expr_limit(min_limit)
+ max_limit_expr = _get_column_expr_limit(max_limit)
+
+ condition = (F.col(col_name) >= min_limit_expr) & (F.col(col_name) <= max_limit_expr)
return make_condition(
condition,
@@ -384,7 +355,7 @@ def is_not_in_range(
def regex_match(col_name: str, regex: str, negate: bool = False) -> Column:
- """Creates a condition column to check if value not matches given regex.
+ """Checks whether the values in the input column matches a given regex.
:param col_name: column name to check
:param regex: regex to check
@@ -402,8 +373,8 @@ def regex_match(col_name: str, regex: str, negate: bool = False) -> Column:
def is_not_null_and_not_empty_array(col_name: str) -> Column:
- """
- Creates a condition column to check if an array is null and or empty.
+ """Checks whether the values in the array input column are not null and not empty.
+
:param col_name: column name to check
:return: Column object for condition
"""
@@ -413,8 +384,8 @@ def is_not_null_and_not_empty_array(col_name: str) -> Column:
def is_valid_date(col_name: str, date_format: str | None = None) -> Column:
- """
- Creates a condition column to check if a string is a valid date.
+ """Checks whether the values in the input column have valid date formats.
+
:param col_name: column name to check
:param date_format: date format (e.g. 'yyyy-mm-dd')
:return: Column object for condition
@@ -433,8 +404,8 @@ def is_valid_date(col_name: str, date_format: str | None = None) -> Column:
def is_valid_timestamp(col_name: str, timestamp_format: str | None = None) -> Column:
- """
- Creates a condition column to check if a string is a valid timestamp.
+ """Checks whether the values in the input column have valid timestamp formats.
+
:param col_name: column name to check
:param timestamp_format: timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss')
:return: Column object for condition
@@ -452,3 +423,52 @@ def is_valid_timestamp(col_name: str, timestamp_format: str | None = None) -> Co
F.concat_ws("", F.lit("Value '"), column, F.lit(condition_str)),
f"{col_name}_is_not_valid_timestamp",
)
+
+
+def is_unique(col_name: str, window_spec: str | Column | None = None) -> Column:
+ """Checks whether the values in the input column are unique
+ and reports an issue for each row that contains a duplicate value.
+ Null values are not considered duplicates, following the ANSI SQL standard.
+ It should be used carefully in the streaming context,
+ as uniqueness check will only be performed on individual micro-batches.
+
+ :param col_name: column name to check
+ :param window_spec: window specification for the partition by clause. Default value for NULL in the time column
+ of the window spec must be provided using coalesce() to prevent rows exclusion!
+ e.g. "window(coalesce(b, '1970-01-01'), '2 hours')"
+ :return: Column object for condition
+ """
+ column = F.col(col_name)
+ if window_spec is None:
+ partition_by_spec = Window.partitionBy(column)
+ else:
+ if isinstance(window_spec, str):
+ window_spec = F.expr(window_spec)
+ partition_by_spec = Window.partitionBy(window_spec)
+
+ condition = F.when(column.isNotNull(), F.count(column).over(partition_by_spec) == 1)
+ return make_condition(~condition, f"Column {col_name} has duplicate values", f"{col_name}_is_not_unique")
+
+
+def _cleanup_alias_name(col_name: str) -> str:
+ # avoid issues with structs
+ return col_name.replace(".", "_")
+
+
+def _get_column_expr_limit(
+ limit: int | datetime.date | datetime.datetime | str | Column | None = None,
+) -> Column:
+ """Helper function to generate a column expression limit based on the provided limit value.
+
+ :param limit: limit to use in the condition (literal value or Column expression)
+ :return: column expression.
+ :raises ValueError: if limit is not provided.
+ """
+ if limit is None:
+ raise ValueError("Limit is not provided.")
+
+ if isinstance(limit, str):
+ return F.expr(limit)
+ if isinstance(limit, Column):
+ return limit
+ return F.lit(limit)
diff --git a/src/databricks/labs/dqx/profiler/generator.py b/src/databricks/labs/dqx/profiler/generator.py
index 2c51b518..2d1e6489 100644
--- a/src/databricks/labs/dqx/profiler/generator.py
+++ b/src/databricks/labs/dqx/profiler/generator.py
@@ -48,7 +48,7 @@ def dq_generate_is_in(col_name: str, level: str = "error", **params: dict):
:return: A dictionary representing the data quality rule.
"""
return {
- "check": {"function": "value_is_in_list", "arguments": {"col_name": col_name, "allowed": params["in"]}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": col_name, "allowed": params["in"]}},
"name": f"{col_name}_other_value",
"criticality": level,
}
@@ -86,7 +86,7 @@ def dq_generate_min_max(col_name: str, level: str = "error", **params: dict):
if max_limit is not None:
return {
"check": {
- "function": "not_greater_than",
+ "function": "is_not_greater_than",
"arguments": {
"col_name": col_name,
"val": val_maybe_to_str(max_limit, include_sql_quotes=False),
@@ -99,7 +99,7 @@ def dq_generate_min_max(col_name: str, level: str = "error", **params: dict):
if min_limit is not None:
return {
"check": {
- "function": "not_less_than",
+ "function": "is_not_less_than",
"arguments": {
"col_name": col_name,
"val": val_maybe_to_str(min_limit, include_sql_quotes=False),
diff --git a/tests/conftest.py b/tests/conftest.py
index f8dcda26..1c79fa9a 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -22,7 +22,7 @@ def checks_yml_content():
trim_strings: true
- criticality: warn
check:
- function: value_is_in_list
+ function: is_in_list
arguments:
col_name: col4
allowed:
@@ -67,7 +67,7 @@ def checks_json_content():
{
"criticality": "warn",
"check": {
- "function": "value_is_in_list",
+ "function": "is_in_list",
"arguments": {
"col_name": "col4",
"allowed": [1, 2]
@@ -130,7 +130,7 @@ def expected_checks():
},
{
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_name": "col4", "allowed": [1, 2]}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": "col4", "allowed": [1, 2]}},
},
{
"criticality": "error",
diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py
index 9b3a65d3..48c194e4 100644
--- a/tests/integration/test_apply_checks.py
+++ b/tests/integration/test_apply_checks.py
@@ -1,3 +1,6 @@
+from datetime import datetime
+
+import yaml
import pyspark.sql.functions as F
import pytest
from pyspark.sql import Column
@@ -213,14 +216,14 @@ def test_apply_checks_and_split_by_metadata(ws, spark):
"check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "c"}},
},
{
- "name": "col_a_value_is_not_in_the_list",
+ "name": "col_a_is_not_in_the_list",
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_name": "a", "allowed": [1, 3, 4]}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": "a", "allowed": [1, 3, 4]}},
},
{
- "name": "col_c_value_is_not_in_the_list",
+ "name": "col_c_is_not_in_the_list",
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_name": "c", "allowed": [1, 3, 4]}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": "c", "allowed": [1, 3, 4]}},
},
]
@@ -236,7 +239,7 @@ def test_apply_checks_and_split_by_metadata(ws, spark):
None,
4,
{"col_b_is_null_or_empty": "Column b is null or empty"},
- {"col_a_value_is_not_in_the_list": "Value 2 is not in the allowed list: [1, 3, 4]"},
+ {"col_a_is_not_in_the_list": "Value 2 is not in the allowed list: [1, 3, 4]"},
],
[
None,
@@ -280,7 +283,7 @@ def test_apply_checks_and_split_by_metadata_with_autogenerated_col_names(ws, spa
},
{
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_names": ["a", "c"], "allowed": [1, 3, 4]}},
+ "check": {"function": "is_in_list", "arguments": {"col_names": ["a", "c"], "allowed": [1, 3, 4]}},
},
]
@@ -296,7 +299,7 @@ def test_apply_checks_and_split_by_metadata_with_autogenerated_col_names(ws, spa
None,
4,
{"col_b_is_null_or_empty": "Column b is null or empty"},
- {"col_a_value_is_not_in_the_list": "Value 2 is not in the allowed list: [1, 3, 4]"},
+ {"col_a_is_not_in_the_list": "Value 2 is not in the allowed list: [1, 3, 4]"},
],
[
None,
@@ -340,7 +343,7 @@ def test_apply_checks_by_metadata(ws, spark):
},
{
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_names": ["a", "c"], "allowed": [1, 3, 4]}},
+ "check": {"function": "is_in_list", "arguments": {"col_names": ["a", "c"], "allowed": [1, 3, 4]}},
},
]
@@ -354,7 +357,7 @@ def test_apply_checks_by_metadata(ws, spark):
None,
4,
{"col_b_is_null_or_empty": "Column b is null or empty"},
- {"col_a_value_is_not_in_the_list": "Value 2 is not in the allowed list: [1, 3, 4]"},
+ {"col_a_is_not_in_the_list": "Value 2 is not in the allowed list: [1, 3, 4]"},
],
[
None,
@@ -751,3 +754,343 @@ def test_apply_checks_with_sql_expression(ws, spark):
expected_schema,
)
assert_df_equality(checked, expected, ignore_nullable=True)
+
+
+def test_apply_checks_with_is_unique(ws, spark, set_utc_timezone):
+ schema = "col1: int, col2: timestamp"
+ test_df = spark.createDataFrame([[1, datetime(2025, 1, 1)], [1, datetime(2025, 1, 2)], [None, None]], schema)
+
+ checks = [
+ {
+ "criticality": "error",
+ "check": {"function": "is_unique", "arguments": {"col_name": "col1"}},
+ },
+ {
+ "criticality": "error",
+ "name": "col_col1_is_not_unique2",
+ "check": {
+ "function": "is_unique",
+ "arguments": {"col_name": "col1", "window_spec": "window(coalesce(col2, '1970-01-01'), '30 days')"},
+ },
+ },
+ ]
+
+ dq_engine = DQEngine(ws)
+ checked = dq_engine.apply_checks_by_metadata(test_df, checks)
+
+ expected_schema = schema + REPORTING_COLUMNS
+ expected = spark.createDataFrame(
+ [
+ [None, None, None, None],
+ [
+ 1,
+ datetime(2025, 1, 1),
+ {
+ "col_col1_is_not_unique": "Column col1 has duplicate values",
+ "col_col1_is_not_unique2": "Column col1 has duplicate values",
+ },
+ None,
+ ],
+ [
+ 1,
+ datetime(2025, 1, 2),
+ {
+ "col_col1_is_not_unique": "Column col1 has duplicate values",
+ "col_col1_is_not_unique2": "Column col1 has duplicate values",
+ },
+ None,
+ ],
+ ],
+ expected_schema,
+ )
+ assert_df_equality(checked, expected, ignore_nullable=True)
+
+
+def test_apply_checks_all_checks_as_yaml(ws, spark):
+ checks = yaml.safe_load(
+ """
+ # is_not_null check
+ - criticality: error
+ check:
+ function: is_not_null
+ arguments:
+ col_name: col1
+
+ # is_not_empty check
+ - criticality: error
+ check:
+ function: is_not_empty
+ arguments:
+ col_name: col1
+
+ # is_not_null_and_not_empty check
+ - criticality: error
+ check:
+ function: is_not_null_and_not_empty
+ arguments:
+ col_name: col1
+ trim_strings: true
+
+ # is_in_list check
+ - criticality: error
+ check:
+ function: is_in_list
+ arguments:
+ col_name: col2
+ allowed:
+ - 1
+ - 2
+ - 3
+
+ # is_not_null_and_is_in_list check
+ - criticality: error
+ check:
+ function: is_not_null_and_is_in_list
+ arguments:
+ col_name: col2
+ allowed:
+ - 1
+ - 2
+ - 3
+
+ # is_not_null_and_not_empty_array check
+ - criticality: error
+ check:
+ function: is_not_null_and_not_empty_array
+ arguments:
+ col_name: col4
+
+ # is_in_range check
+ - criticality: error
+ check:
+ function: is_in_range
+ arguments:
+ col_name: col2
+ min_limit: 1
+ max_limit: 10
+ - criticality: error
+ check:
+ function: is_in_range
+ arguments:
+ col_name: col5
+ min_limit: 2025-01-01
+ max_limit: 2025-02-24
+ - criticality: error
+ check:
+ function: is_in_range
+ arguments:
+ col_name: col6
+ min_limit: 2025-01-01 00:00:00
+ max_limit: 2025-02-24 01:00:00
+ - criticality: error
+ check:
+ function: is_in_range
+ arguments:
+ col_name: col3
+ min_limit: col2
+ max_limit: col2 * 2
+
+ # is_not_in_range check
+ - criticality: error
+ check:
+ function: is_not_in_range
+ arguments:
+ col_name: col2
+ min_limit: 11
+ max_limit: 20
+ - criticality: error
+ check:
+ function: is_not_in_range
+ arguments:
+ col_name: col5
+ min_limit: 2025-02-25
+ max_limit: 2025-02-26
+ - criticality: error
+ check:
+ function: is_not_in_range
+ arguments:
+ col_name: col6
+ min_limit: 2025-02-25 00:00:00
+ max_limit: 2025-02-26 01:00:00
+ - criticality: error
+ check:
+ function: is_not_in_range
+ arguments:
+ col_name: col3
+ min_limit: col2 + 10
+ max_limit: col2 * 10
+
+ # is_not_less_than check
+ - criticality: error
+ check:
+ function: is_not_less_than
+ arguments:
+ col_name: col2
+ limit: 0
+ - criticality: error
+ check:
+ function: is_not_less_than
+ arguments:
+ col_name: col5
+ limit: 2025-01-01
+ - criticality: error
+ check:
+ function: is_not_less_than
+ arguments:
+ col_name: col6
+ limit: 2025-01-01 01:00:00
+ - criticality: error
+ check:
+ function: is_not_less_than
+ arguments:
+ col_name: col3
+ limit: col2 - 10
+
+ # is_not_greater_than check
+ - criticality: error
+ check:
+ function: is_not_greater_than
+ arguments:
+ col_name: col2
+ limit: 10
+ - criticality: error
+ check:
+ function: is_not_greater_than
+ arguments:
+ col_name: col5
+ limit: 2025-03-01
+ - criticality: error
+ check:
+ function: is_not_greater_than
+ arguments:
+ col_name: col6
+ limit: 2025-03-24 01:00:00
+ - criticality: error
+ check:
+ function: is_not_greater_than
+ arguments:
+ col_name: col3
+ limit: col2 + 10
+
+ # is_valid_date check
+ - criticality: error
+ check:
+ function: is_valid_date
+ arguments:
+ col_name: col5
+ - criticality: error
+ name: col5_is_not_valid_date2
+ check:
+ function: is_valid_date
+ arguments:
+ col_name: col5
+ date_format: yyyy-MM-dd
+
+ # is_valid_timestamp check
+ - criticality: error
+ check:
+ function: is_valid_timestamp
+ arguments:
+ col_name: col6
+ timestamp_format: yyyy-MM-dd HH:mm:ss
+ - criticality: error
+ name: col6_is_not_valid_timestamp2
+ check:
+ function: is_valid_timestamp
+ arguments:
+ col_name: col6
+
+ # is_not_in_future check
+ - criticality: error
+ check:
+ function: is_not_in_future
+ arguments:
+ col_name: col6
+ offset: 86400
+
+ # is_not_in_near_future check
+ - criticality: error
+ check:
+ function: is_not_in_near_future
+ arguments:
+ col_name: col6
+ offset: 36400
+
+ # is_older_than_n_days check
+ - criticality: error
+ check:
+ function: is_older_than_n_days
+ arguments:
+ col_name: col5
+ days: 10000
+
+ # is_older_than_col2_for_n_days check
+ - criticality: error
+ check:
+ function: is_older_than_col2_for_n_days
+ arguments:
+ col_name1: col5
+ col_name2: col6
+ days: 2
+
+ # is_unique check
+ - criticality: error
+ check:
+ function: is_unique
+ arguments:
+ col_name: col1
+ - criticality: error
+ name: col1_is_not_unique2
+ check:
+ function: is_unique
+ arguments:
+ col_name: col1
+ window_spec: window(coalesce(col6, '1970-01-01'), '10 minutes')
+
+ # regex_match check
+ - criticality: error
+ check:
+ function: regex_match
+ arguments:
+ col_name: col2
+ regex: '[0-9]+'
+ negate: false
+
+ # sql_expression check
+ - criticality: error
+ check:
+ function: sql_expression
+ arguments:
+ expression: col3 > col2 and col3 < 10
+ msg: col3 is greater than col2 and col3 less than 10
+ name: custom_output_name
+ negate: false
+ """
+ )
+
+ dq_engine = DQEngine(ws)
+ status = dq_engine.validate_checks(checks)
+ assert not status.has_errors
+
+ schema = "col1: string, col2: int, col3: int, col4 array
, col5: date, col6: timestamp"
+ test_df = spark.createDataFrame(
+ [
+ ["val1", 1, 1, [1], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 1, 0, 0)],
+ ["val2", 2, 2, [2], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 2, 0, 0)],
+ ["val3", 3, 3, [3], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 3, 0, 0)],
+ ],
+ schema,
+ )
+
+ checked = dq_engine.apply_checks_by_metadata(test_df, checks)
+
+ expected_schema = schema + REPORTING_COLUMNS
+ expected = spark.createDataFrame(
+ [
+ ["val1", 1, 1, [1], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 1, 0, 0), None, None],
+ ["val2", 2, 2, [2], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 2, 0, 0), None, None],
+ ["val3", 3, 3, [3], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 3, 0, 0), None, None],
+ ],
+ expected_schema,
+ )
+ assert_df_equality(checked, expected, ignore_nullable=True)
diff --git a/tests/integration/test_col_functions.py b/tests/integration/test_col_functions.py
index f1e3e568..d5959f62 100644
--- a/tests/integration/test_col_functions.py
+++ b/tests/integration/test_col_functions.py
@@ -1,6 +1,9 @@
from datetime import datetime
+from decimal import Decimal
import pyspark.sql.functions as F
from chispa.dataframe_comparer import assert_df_equality # type: ignore
+
+
from databricks.labs.dqx.col_functions import (
is_in_range,
is_not_empty,
@@ -9,17 +12,18 @@
is_not_null_and_not_empty,
is_older_than_col2_for_n_days,
is_older_than_n_days,
- not_in_future,
- not_in_near_future,
- not_less_than,
- not_greater_than,
+ is_not_in_future,
+ is_not_in_near_future,
+ is_not_less_than,
+ is_not_greater_than,
regex_match,
sql_expression,
- value_is_in_list,
- value_is_not_null_and_is_in_list,
+ is_in_list,
+ is_not_null_and_is_in_list,
is_not_null_and_not_empty_array,
is_valid_date,
is_valid_timestamp,
+ is_unique,
)
SCHEMA = "a: string, b: int"
@@ -60,19 +64,20 @@ def test_col_is_not_null(spark):
assert_df_equality(actual, expected, ignore_nullable=True)
-def test_col_value_is_not_null_and_is_in_list(spark):
+def test_col_is_not_null_and_is_in_list(spark):
test_df = spark.createDataFrame([["str1", 1], ["str2", None], ["", 3]], SCHEMA)
- actual = test_df.select(
- value_is_not_null_and_is_in_list("a", ["str1"]), value_is_not_null_and_is_in_list("b", [F.lit(3)])
- )
+ actual = test_df.select(is_not_null_and_is_in_list("a", ["str1"]), is_not_null_and_is_in_list("b", [F.lit(3)]))
- checked_schema = "a_value_is_not_in_the_list: string, b_value_is_not_in_the_list: string"
+ checked_schema = "a_is_null_or_is_not_in_the_list: string, b_is_null_or_is_not_in_the_list: string"
expected = spark.createDataFrame(
[
- [None, "Value 1 is not in the allowed list: [3]"],
- ["Value str2 is not in the allowed list: [str1]", "Value null is not in the allowed list: [3]"],
- ["Value is not in the allowed list: [str1]", None],
+ [None, "Value 1 is null or not in the allowed list: [3]"],
+ [
+ "Value str2 is null or not in the allowed list: [str1]",
+ "Value null is null or not in the allowed list: [3]",
+ ],
+ ["Value is null or not in the allowed list: [str1]", None],
],
checked_schema,
)
@@ -80,12 +85,12 @@ def test_col_value_is_not_null_and_is_in_list(spark):
assert_df_equality(actual, expected, ignore_nullable=True)
-def test_col_value_is_not_in_list(spark):
+def test_col_is_not_in_list(spark):
test_df = spark.createDataFrame([["str1", 1], ["str2", None], ["", 3]], SCHEMA)
- actual = test_df.select(value_is_in_list("a", ["str1"]), value_is_in_list("b", [F.lit(3)]))
+ actual = test_df.select(is_in_list("a", ["str1"]), is_in_list("b", [F.lit(3)]))
- checked_schema = "a_value_is_not_in_the_list: string, b_value_is_not_in_the_list: string"
+ checked_schema = "a_is_not_in_the_list: string, b_is_not_in_the_list: string"
expected = spark.createDataFrame(
[
[None, "Value 1 is not in the allowed list: [3]"],
@@ -136,7 +141,7 @@ def test_is_col_older_than_col2_for_n_days(spark):
actual = test_df.select(is_older_than_col2_for_n_days("a", "b", 2))
- checked_schema = "is_col_a_older_than_b_for_N_days: string"
+ checked_schema = "is_col_a_older_than_b_for_n_days: string"
expected = spark.createDataFrame(
[
["Value of a: '2023-01-10' less than value of b: '2023-01-13' for more than 2 days"],
@@ -157,7 +162,7 @@ def test_is_col_older_than_n_days(spark):
actual = test_df.select(is_older_than_n_days("a", 2, F.lit("2023-01-13")))
- checked_schema = "is_col_a_older_than_N_days: string"
+ checked_schema = "is_col_a_older_than_n_days: string"
expected = spark.createDataFrame(
[["Value of a: '2023-01-10' less than current date: '2023-01-13' for more than 2 days"], [None], [None]],
checked_schema,
@@ -166,11 +171,11 @@ def test_is_col_older_than_n_days(spark):
assert_df_equality(actual, expected, ignore_nullable=True)
-def test_col_not_in_future(spark):
+def test_col_is_not_in_future(spark):
schema_dates = "a: string"
test_df = spark.createDataFrame([["2023-01-10 11:08:37"], ["2023-01-10 11:08:43"], [None]], schema_dates)
- actual = test_df.select(not_in_future("a", 2, F.lit("2023-01-10 11:08:40")))
+ actual = test_df.select(is_not_in_future("a", 2, F.lit("2023-01-10 11:08:40")))
checked_schema = "a_in_future: string"
expected = spark.createDataFrame(
@@ -180,13 +185,13 @@ def test_col_not_in_future(spark):
assert_df_equality(actual, expected, ignore_nullable=True)
-def test_col_not_in_near_future(spark):
+def test_col_is_not_in_near_future(spark):
schema_dates = "a: string"
test_df = spark.createDataFrame(
[["2023-01-10 11:08:40"], ["2023-01-10 11:08:41"], ["2023-01-10 11:08:42"], [None]], schema_dates
)
- actual = test_df.select(not_in_near_future("a", 2, F.lit("2023-01-10 11:08:40")))
+ actual = test_df.select(is_not_in_near_future("a", 2, F.lit("2023-01-10 11:08:40")))
checked_schema = "a_in_near_future: string"
expected = spark.createDataFrame(
@@ -210,7 +215,7 @@ def test_is_col_older_than_n_days_cur(spark):
actual = test_df.select(is_older_than_n_days("a", 2, None))
- checked_schema = "is_col_a_older_than_N_days: string"
+ checked_schema = "is_col_a_older_than_n_days: string"
expected = spark.createDataFrame(
[[f"Value of a: '2023-01-10' less than current date: '{cur_date}' for more than 2 days"], [None]],
@@ -220,33 +225,59 @@ def test_is_col_older_than_n_days_cur(spark):
assert_df_equality(actual, expected, ignore_nullable=True)
-def test_col_not_less_than(spark, set_utc_timezone):
- schema_num = "a: int, b: date, c: timestamp"
+def test_col_is_not_less_than(spark, set_utc_timezone):
+ schema_num = "a: int, b: int, c: date, d: timestamp, e: decimal(10,2)"
test_df = spark.createDataFrame(
[
- [1, datetime(2025, 1, 1).date(), datetime(2025, 1, 1)],
- [2, datetime(2025, 2, 1).date(), datetime(2025, 2, 1)],
- [None, None, None],
+ [1, 1, datetime(2025, 1, 1).date(), datetime(2025, 1, 1), Decimal("1.00")],
+ [2, 4, datetime(2025, 2, 1).date(), datetime(2025, 2, 1), Decimal("1.99")],
+ [4, 3, None, None, Decimal("2.01")],
+ [None, None, None, None, None],
],
schema_num,
)
actual = test_df.select(
- not_less_than("a", 2),
- not_less_than("b", datetime(2025, 2, 1).date()),
- not_less_than("c", datetime(2025, 2, 1)),
+ is_not_less_than("a", 2),
+ is_not_less_than("a", F.col("b") * 2),
+ is_not_less_than("b", "a"),
+ is_not_less_than("c", datetime(2025, 2, 1).date()),
+ is_not_less_than("d", datetime(2025, 2, 1)),
+ is_not_less_than("e", 2),
+ )
+
+ checked_schema = (
+ "a_less_than_limit: string, a_less_than_limit: string, b_less_than_limit: string, "
+ "c_less_than_limit: string, d_less_than_limit: string, e_less_than_limit: string"
)
- checked_schema = "a_less_than_limit: string, b_less_than_limit: string, c_less_than_limit: string"
expected = spark.createDataFrame(
[
[
"Value 1 is less than limit: 2",
+ None,
+ None,
"Value 2025-01-01 is less than limit: 2025-02-01",
"Value 2025-01-01 00:00:00 is less than limit: 2025-02-01 00:00:00",
+ "Value 1.00 is less than limit: 2",
+ ],
+ [
+ None,
+ "Value 2 is less than limit: 8",
+ None,
+ None,
+ None,
+ "Value 1.99 is less than limit: 2",
+ ],
+ [
+ None,
+ "Value 4 is less than limit: 6",
+ "Value 3 is less than limit: 4",
+ None,
+ None,
+ None,
],
- [None, None, None],
- [None, None, None],
+ [None, None, None, None, None, None],
],
checked_schema,
)
@@ -254,33 +285,44 @@ def test_col_not_less_than(spark, set_utc_timezone):
assert_df_equality(actual, expected, ignore_nullable=True)
-def test_col_not_greater_than(spark, set_utc_timezone):
- schema_num = "a: int, b: date, c: timestamp"
+def test_col_is_not_greater_than(spark, set_utc_timezone):
+ schema_num = "a: int, b: int, c: date, d: timestamp, e: decimal(10,2)"
test_df = spark.createDataFrame(
[
- [1, datetime(2025, 1, 1).date(), datetime(2025, 1, 1)],
- [2, datetime(2025, 2, 1).date(), datetime(2025, 2, 1)],
- [None, None, None],
+ [1, 1, datetime(2025, 1, 1).date(), datetime(2025, 1, 1), Decimal("1.00")],
+ [2, 4, datetime(2025, 2, 1).date(), datetime(2025, 2, 1), Decimal("1.01")],
+ [8, 3, None, None, Decimal("0.99")],
+ [None, None, None, None, None],
],
schema_num,
)
actual = test_df.select(
- not_greater_than("a", 1),
- not_greater_than("b", datetime(2025, 1, 1).date()),
- not_greater_than("c", datetime(2025, 1, 1)),
+ is_not_greater_than("a", 1),
+ is_not_greater_than("a", F.col("b") * 2),
+ is_not_greater_than("b", "a"),
+ is_not_greater_than("c", datetime(2025, 1, 1).date()),
+ is_not_greater_than("d", datetime(2025, 1, 1)),
+ is_not_greater_than("e", 1),
)
- checked_schema = "a_greater_than_limit: string, b_greater_than_limit: string, c_greater_than_limit: string"
+ checked_schema = (
+ "a_greater_than_limit: string, a_greater_than_limit: string, b_greater_than_limit: string, "
+ "c_greater_than_limit: string, d_greater_than_limit: string, e_greater_than_limit: string"
+ )
expected = spark.createDataFrame(
[
- [None, None, None],
+ [None, None, None, None, None, None],
[
"Value 2 is greater than limit: 1",
+ None,
+ "Value 4 is greater than limit: 2",
"Value 2025-02-01 is greater than limit: 2025-01-01",
"Value 2025-02-01 00:00:00 is greater than limit: 2025-01-01 00:00:00",
+ "Value 1.01 is greater than limit: 1",
],
- [None, None, None],
+ ["Value 8 is greater than limit: 1", "Value 8 is greater than limit: 6", None, None, None, None],
+ [None, None, None, None, None, None],
],
checked_schema,
)
@@ -289,15 +331,15 @@ def test_col_not_greater_than(spark, set_utc_timezone):
def test_col_is_in_range(spark, set_utc_timezone):
- schema_num = "a: int, b: date, c: timestamp, d: int, e: int, f: int"
+ schema_num = "a: int, b: date, c: timestamp, d: int, e: int, f: int, g: decimal(10,2)"
test_df = spark.createDataFrame(
[
- [0, datetime(2024, 12, 1).date(), datetime(2024, 12, 1), -1, 5, 6],
- [1, datetime(2025, 1, 1).date(), datetime(2025, 1, 1), 2, 6, 3],
- [2, datetime(2025, 2, 1).date(), datetime(2025, 2, 1), 2, 7, 3],
- [3, datetime(2025, 3, 1).date(), datetime(2025, 3, 1), 3, 8, 3],
- [4, datetime(2025, 4, 1).date(), datetime(2025, 4, 1), 2, 9, 3],
- [None, None, None, None, None, None],
+ [0, datetime(2024, 12, 1).date(), datetime(2024, 12, 1), -1, 5, 6, Decimal("2.00")],
+ [1, datetime(2025, 1, 1).date(), datetime(2025, 1, 1), 2, 6, 3, Decimal("1.00")],
+ [2, datetime(2025, 2, 1).date(), datetime(2025, 2, 1), 2, 7, 3, Decimal("3.00")],
+ [3, datetime(2025, 3, 1).date(), datetime(2025, 3, 1), 3, 8, 3, Decimal("1.01")],
+ [4, datetime(2025, 4, 1).date(), datetime(2025, 4, 1), 2, 9, 3, Decimal("3.01")],
+ [None, None, None, None, None, None, None],
],
schema_num,
)
@@ -308,10 +350,15 @@ def test_col_is_in_range(spark, set_utc_timezone):
is_in_range("a", 1, 3),
is_in_range("b", start_date.date(), end_date.date()),
is_in_range("c", start_date, end_date),
- is_in_range("d", min_limit_col_expr=F.col("a"), max_limit_col_expr=F.expr("e - 1")),
- is_in_range("f", min_limit_col_expr="a", max_limit=5),
+ is_in_range("d", F.col("a"), F.expr("e - 1")),
+ is_in_range("f", "a", 5),
+ is_in_range("g", 1, 3),
+ )
+
+ checked_schema = (
+ "a_not_in_range: string, b_not_in_range: string, c_not_in_range: string, "
+ "d_not_in_range: string, f_not_in_range: string, g_not_in_range: string"
)
- checked_schema = "a_not_in_range: string, b_not_in_range: string, c_not_in_range: string, d_not_in_range: string, f_not_in_range: string"
expected = spark.createDataFrame(
[
[
@@ -320,18 +367,20 @@ def test_col_is_in_range(spark, set_utc_timezone):
"Value 2024-12-01 00:00:00 not in range: [ 2025-01-01 00:00:00 , 2025-03-01 00:00:00 ]",
"Value -1 not in range: [ 0 , 4 ]",
"Value 6 not in range: [ 0 , 5 ]",
+ None,
],
- [None, None, None, None, None],
- [None, None, None, None, None],
- [None, None, None, None, None],
+ [None, None, None, None, None, None],
+ [None, None, None, None, None, None],
+ [None, None, None, None, None, None],
[
"Value 4 not in range: [ 1 , 3 ]",
"Value 2025-04-01 not in range: [ 2025-01-01 , 2025-03-01 ]",
"Value 2025-04-01 00:00:00 not in range: [ 2025-01-01 00:00:00 , 2025-03-01 00:00:00 ]",
"Value 2 not in range: [ 4 , 8 ]",
"Value 3 not in range: [ 4 , 5 ]",
+ "Value 3.01 not in range: [ 1 , 3 ]",
],
- [None, None, None, None, None],
+ [None, None, None, None, None, None],
],
checked_schema,
)
@@ -340,40 +389,48 @@ def test_col_is_in_range(spark, set_utc_timezone):
def test_col_is_not_in_range(spark, set_utc_timezone):
- schema_num = "a: int, b: date, c: timestamp, d: timestamp"
+ schema_num = "a: int, b: date, c: timestamp, d: timestamp, e: decimal(10,2)"
test_df = spark.createDataFrame(
[
- [1, datetime(2025, 1, 1).date(), datetime(2024, 1, 1), datetime(2024, 1, 1)],
- [2, datetime(2025, 2, 1).date(), datetime(2025, 2, 1), datetime(2025, 2, 2)],
- [3, datetime(2025, 3, 1).date(), datetime(2025, 3, 1), datetime(2025, 3, 1)],
- [None, None, None, None],
+ [0, datetime(2024, 12, 31).date(), datetime(2025, 1, 4), datetime(2025, 1, 7), Decimal("0.99")],
+ [1, datetime(2025, 1, 1).date(), datetime(2025, 1, 3), datetime(2025, 1, 1), Decimal("1.00")],
+ [3, datetime(2025, 2, 1).date(), datetime(2025, 2, 1), datetime(2025, 2, 3), Decimal("3.00")],
+ [None, None, None, None, None],
],
schema_num,
)
start_date = datetime(2025, 1, 1)
- end_date = datetime(2025, 3, 1)
+ end_date = datetime(2025, 1, 3)
actual = test_df.select(
is_not_in_range("a", 1, 3),
is_not_in_range("b", start_date.date(), end_date.date()),
is_not_in_range("c", start_date, end_date),
- is_not_in_range(
- "d", min_limit_col_expr="c", max_limit_col_expr=F.expr("cast(b as timestamp) + INTERVAL 2 DAY")
- ),
+ is_not_in_range("d", "c", F.expr("cast(b as timestamp) + INTERVAL 2 DAY")),
+ is_not_in_range("e", 1, 3),
)
- checked_schema = "a_in_range: string, b_in_range: string, c_in_range: string, d_in_range: string"
+ checked_schema = (
+ "a_in_range: string, b_in_range: string, c_in_range: string, d_in_range: string, e_in_range: string"
+ )
expected = spark.createDataFrame(
[
- [None, None, None, None],
+ [None, None, None, None, None],
[
- "Value 2 in range: [ 1 , 3 ]",
- "Value 2025-02-01 in range: [ 2025-01-01 , 2025-03-01 ]",
- "Value 2025-02-01 00:00:00 in range: [ 2025-01-01 00:00:00 , 2025-03-01 00:00:00 ]",
- "Value 2025-02-02 00:00:00 in range: [ 2025-02-01 00:00:00 , 2025-02-03 00:00:00 ]",
+ "Value 1 in range: [ 1 , 3 ]",
+ "Value 2025-01-01 in range: [ 2025-01-01 , 2025-01-03 ]",
+ "Value 2025-01-03 00:00:00 in range: [ 2025-01-01 00:00:00 , 2025-01-03 00:00:00 ]",
+ None,
+ "Value 1.00 in range: [ 1 , 3 ]",
],
- [None, None, None, None],
- [None, None, None, None],
+ [
+ "Value 3 in range: [ 1 , 3 ]",
+ None,
+ None,
+ "Value 2025-02-03 00:00:00 in range: [ 2025-02-01 00:00:00 , 2025-02-03 00:00:00 ]",
+ "Value 3.00 in range: [ 1 , 3 ]",
+ ],
+ [None, None, None, None, None],
],
checked_schema,
)
@@ -409,12 +466,12 @@ def test_col_struct(spark):
assert_df_equality(actual, expected, ignore_nullable=True)
-def test_col_not_in_future_cur(spark):
+def test_col_is_not_in_future_cur(spark):
schema_dates = "a: string"
test_df = spark.createDataFrame([["9999-12-31 23:59:59"]], schema_dates)
- actual = test_df.select(not_in_future("a", 0, None))
+ actual = test_df.select(is_not_in_future("a", 0, None))
checked_schema = "a_in_future: string"
@@ -423,12 +480,12 @@ def test_col_not_in_future_cur(spark):
assert actual.select("a_in_future") != expected.select("a_in_future")
-def test_col_not_in_near_future_cur(spark):
+def test_col_is_not_in_near_future_cur(spark):
schema_dates = "a: string"
test_df = spark.createDataFrame([["1900-01-01 23:59:59"], ["9999-12-31 23:59:59"], [None]], schema_dates)
- actual = test_df.select(not_in_near_future("a", 2, None))
+ actual = test_df.select(is_not_in_near_future("a", 2, None))
checked_schema = "a_in_near_future: string"
expected = spark.createDataFrame(
@@ -586,3 +643,147 @@ def test_col_is_valid_timestamp(spark, set_utc_timezone):
expected = spark.createDataFrame(checked_data, checked_schema)
assert_df_equality(actual, expected, ignore_nullable=True)
+
+
+def test_col_is_unique(spark):
+ test_df = spark.createDataFrame([["str1", 1], ["str2", 1], ["str2", 2], ["str3", 3]], SCHEMA)
+
+ actual = test_df.select(is_unique("a"), is_unique("b"))
+
+ checked_schema = "a_is_not_unique: string, b_is_not_unique: string"
+ expected = spark.createDataFrame(
+ [
+ [None, "Column b has duplicate values"],
+ ["Column a has duplicate values", "Column b has duplicate values"],
+ ["Column a has duplicate values", None],
+ [None, None],
+ ],
+ checked_schema,
+ )
+
+ assert_df_equality(actual, expected, ignore_nullable=True)
+
+
+def test_col_is_unique_handle_nulls(spark):
+ test_df = spark.createDataFrame([["", None], ["", None], ["str1", 1], [None, None]], SCHEMA)
+
+ actual = test_df.select(is_unique("a"), is_unique("b"))
+
+ checked_schema = "a_is_not_unique: string, b_is_not_unique: string"
+ expected = spark.createDataFrame(
+ [
+ ["Column a has duplicate values", None], # Null values are not considered duplicates as they are unknown
+ ["Column a has duplicate values", None],
+ [None, None],
+ [None, None],
+ ],
+ checked_schema,
+ )
+
+ assert_df_equality(actual, expected, ignore_nullable=True, ignore_row_order=True)
+
+
+def test_col_is_unique_custom_window_spec(spark):
+ schema_num = "a: int, b: timestamp"
+ test_df = spark.createDataFrame(
+ [
+ [0, datetime(2025, 1, 1)],
+ [0, datetime(2025, 1, 2)],
+ [0, datetime(2025, 1, 3)], # duplicate but not within the first window
+ [1, None], # considered duplicate with "b" as "1970-01-01"
+ [1, None], # considered duplicate with "b" as "1970-01-01"
+ [None, datetime(2025, 1, 6)],
+ [None, None],
+ ],
+ schema_num,
+ )
+
+ actual = test_df.select(
+ # must use coalesce to handle nulls, otherwise records with null for the time column b will be dropped
+ is_unique("a", window_spec=F.window(F.coalesce(F.col("b"), F.lit(datetime(1970, 1, 1))), "2 days"))
+ )
+
+ checked_schema = "a_is_not_unique: string"
+ expected = spark.createDataFrame(
+ [
+ ["Column a has duplicate values"],
+ ["Column a has duplicate values"],
+ ["Column a has duplicate values"],
+ ["Column a has duplicate values"],
+ [None],
+ [None],
+ [None],
+ ],
+ checked_schema,
+ )
+
+ assert_df_equality(actual, expected, ignore_nullable=True, ignore_row_order=True)
+
+
+def test_col_is_unique_custom_window_spec_without_handling_nulls(spark):
+ schema_num = "a: int, b: timestamp"
+ test_df = spark.createDataFrame(
+ [
+ [0, datetime(2025, 1, 1)],
+ [0, datetime(2025, 1, 2)],
+ [0, datetime(2025, 1, 3)], # duplicate but not within the first window
+ [1, None], # considered duplicate with "b" as "1970-01-01"
+ [1, None], # considered duplicate with "b" as "1970-01-01"
+ [None, datetime(2025, 1, 6)],
+ [None, None],
+ ],
+ schema_num,
+ )
+
+ actual = test_df.select(
+ # window functions do not handle nulls by default
+ # incorrect implementation of the window_spec will result in rows being dropped!!!
+ is_unique("a", window_spec=F.window(F.col("b"), "2 days"))
+ )
+
+ checked_schema = "a_is_not_unique: string"
+ expected = spark.createDataFrame(
+ [
+ ["Column a has duplicate values"],
+ ["Column a has duplicate values"],
+ [None],
+ [None],
+ ],
+ checked_schema,
+ )
+
+ assert_df_equality(actual, expected, ignore_nullable=True, ignore_row_order=True)
+
+
+def test_col_is_unique_custom_window_as_string(spark):
+ schema_num = "a: int, b: timestamp"
+ test_df = spark.createDataFrame(
+ [
+ [0, datetime(2025, 1, 1)],
+ [0, datetime(2025, 1, 2)],
+ [0, datetime(2025, 1, 3)], # duplicate but not within the first window
+ [1, None], # considered duplicate with "b" as "1970-01-01"
+ [1, None], # considered duplicate with "b" as "1970-01-01"
+ [None, datetime(2025, 1, 6)],
+ [None, None],
+ ],
+ schema_num,
+ )
+
+ actual = test_df.select(is_unique("a", window_spec="window(coalesce(b, '1970-01-01'), '2 days')"))
+
+ checked_schema = "a_is_not_unique: string"
+ expected = spark.createDataFrame(
+ [
+ ["Column a has duplicate values"],
+ ["Column a has duplicate values"],
+ ["Column a has duplicate values"],
+ ["Column a has duplicate values"],
+ [None],
+ [None],
+ [None],
+ ],
+ checked_schema,
+ )
+
+ assert_df_equality(actual, expected, ignore_nullable=True, ignore_row_order=True)
diff --git a/tests/integration/test_rules_generator.py b/tests/integration/test_rules_generator.py
index 8b57b1ec..89bcd9fa 100644
--- a/tests/integration/test_rules_generator.py
+++ b/tests/integration/test_rules_generator.py
@@ -49,7 +49,7 @@ def test_generate_dq_rules(ws):
},
{
"check": {
- "function": "value_is_in_list",
+ "function": "is_in_list",
"arguments": {"col_name": "vendor_id", "allowed": ["1", "4", "2"]},
},
"name": "vendor_id_other_value",
@@ -86,7 +86,7 @@ def test_generate_dq_rules_warn(ws):
},
{
"check": {
- "function": "value_is_in_list",
+ "function": "is_in_list",
"arguments": {"col_name": "vendor_id", "allowed": ["1", "4", "2"]},
},
"name": "vendor_id_other_value",
diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py
index d7a40a9d..3e473b8b 100644
--- a/tests/unit/test_build_rules.py
+++ b/tests/unit/test_build_rules.py
@@ -5,7 +5,7 @@
from databricks.labs.dqx.col_functions import (
is_not_null_and_not_empty,
sql_expression,
- value_is_in_list,
+ is_in_list,
is_not_null_and_not_empty_array,
)
from databricks.labs.dqx.engine import (
@@ -31,11 +31,11 @@ def test_get_rules():
DQRuleColSet(columns=["a", "b"], check_func=is_not_null_and_not_empty).get_rules()
# with check function params provided as positional arguments
+ DQRuleColSet(
- columns=["c", "d"], criticality="error", check_func=value_is_in_list, check_func_args=[[1, 2]]
+ columns=["c", "d"], criticality="error", check_func=is_in_list, check_func_args=[[1, 2]]
).get_rules()
# with check function params provided as named arguments
+ DQRuleColSet(
- columns=["e"], criticality="warn", check_func=value_is_in_list, check_func_kwargs={"allowed": [3]}
+ columns=["e"], criticality="warn", check_func=is_in_list, check_func_kwargs={"allowed": [3]}
).get_rules()
# should be skipped
+ DQRuleColSet(columns=[], criticality="error", check_func=is_not_null_and_not_empty).get_rules()
@@ -46,9 +46,9 @@ def test_get_rules():
expected_rules = [
DQRule(name="col_a_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty("a")),
DQRule(name="col_b_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty("b")),
- DQRule(name="col_c_value_is_not_in_the_list", criticality="error", check=value_is_in_list("c", allowed=[1, 2])),
- DQRule(name="col_d_value_is_not_in_the_list", criticality="error", check=value_is_in_list("d", allowed=[1, 2])),
- DQRule(name="col_e_value_is_not_in_the_list", criticality="warn", check=value_is_in_list("e", allowed=[3])),
+ DQRule(name="col_c_is_not_in_the_list", criticality="error", check=is_in_list("c", allowed=[1, 2])),
+ DQRule(name="col_d_is_not_in_the_list", criticality="error", check=is_in_list("d", allowed=[1, 2])),
+ DQRule(name="col_e_is_not_in_the_list", criticality="warn", check=is_in_list("e", allowed=[3])),
DQRule(name="col_a_is_null_or_empty_array", criticality="error", check=is_not_null_and_not_empty_array("a")),
DQRule(name="col_b_is_null_or_empty_array", criticality="error", check=is_not_null_and_not_empty_array("b")),
]
@@ -62,11 +62,9 @@ def test_build_rules():
DQRuleColSet(columns=["a", "b"], criticality="error", filter="c>0", check_func=is_not_null_and_not_empty),
DQRuleColSet(columns=["c"], criticality="warn", check_func=is_not_null_and_not_empty),
# with check function params provided as positional arguments
- DQRuleColSet(columns=["d", "e"], criticality="error", check_func=value_is_in_list, check_func_args=[[1, 2]]),
+ DQRuleColSet(columns=["d", "e"], criticality="error", check_func=is_in_list, check_func_args=[[1, 2]]),
# with check function params provided as named arguments
- DQRuleColSet(
- columns=["f"], criticality="warn", check_func=value_is_in_list, check_func_kwargs={"allowed": [3]}
- ),
+ DQRuleColSet(columns=["f"], criticality="warn", check_func=is_in_list, check_func_kwargs={"allowed": [3]}),
# should be skipped
DQRuleColSet(columns=[], criticality="error", check_func=is_not_null_and_not_empty),
# set of columns for the same check
@@ -74,21 +72,21 @@ def test_build_rules():
DQRuleColSet(columns=["c"], criticality="warn", check_func=is_not_null_and_not_empty_array),
) + [
DQRule(name="col_g_is_null_or_empty", criticality="warn", filter="a=0", check=is_not_null_and_not_empty("g")),
- DQRule(criticality="warn", check=value_is_in_list("h", allowed=[1, 2])),
+ DQRule(criticality="warn", check=is_in_list("h", allowed=[1, 2])),
]
expected_rules = [
DQRule(name="col_a_is_null_or_empty", criticality="error", filter="c>0", check=is_not_null_and_not_empty("a")),
DQRule(name="col_b_is_null_or_empty", criticality="error", filter="c>0", check=is_not_null_and_not_empty("b")),
DQRule(name="col_c_is_null_or_empty", criticality="warn", check=is_not_null_and_not_empty("c")),
- DQRule(name="col_d_value_is_not_in_the_list", criticality="error", check=value_is_in_list("d", allowed=[1, 2])),
- DQRule(name="col_e_value_is_not_in_the_list", criticality="error", check=value_is_in_list("e", allowed=[1, 2])),
- DQRule(name="col_f_value_is_not_in_the_list", criticality="warn", check=value_is_in_list("f", allowed=[3])),
+ DQRule(name="col_d_is_not_in_the_list", criticality="error", check=is_in_list("d", allowed=[1, 2])),
+ DQRule(name="col_e_is_not_in_the_list", criticality="error", check=is_in_list("e", allowed=[1, 2])),
+ DQRule(name="col_f_is_not_in_the_list", criticality="warn", check=is_in_list("f", allowed=[3])),
DQRule(name="col_a_is_null_or_empty_array", criticality="error", check=is_not_null_and_not_empty_array("a")),
DQRule(name="col_b_is_null_or_empty_array", criticality="error", check=is_not_null_and_not_empty_array("b")),
DQRule(name="col_c_is_null_or_empty_array", criticality="warn", check=is_not_null_and_not_empty_array("c")),
DQRule(name="col_g_is_null_or_empty", criticality="warn", filter="a=0", check=is_not_null_and_not_empty("g")),
- DQRule(name="col_h_value_is_not_in_the_list", criticality="warn", check=value_is_in_list("h", allowed=[1, 2])),
+ DQRule(name="col_h_is_not_in_the_list", criticality="warn", check=is_in_list("h", allowed=[1, 2])),
]
assert pprint.pformat(actual_rules) == pprint.pformat(expected_rules)
@@ -107,11 +105,11 @@ def test_build_rules_by_metadata():
{
"criticality": "error",
"filter": "c=0",
- "check": {"function": "value_is_in_list", "arguments": {"col_names": ["d", "e"], "allowed": [1, 2]}},
+ "check": {"function": "is_in_list", "arguments": {"col_names": ["d", "e"], "allowed": [1, 2]}},
},
{
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_names": ["f"], "allowed": [3]}},
+ "check": {"function": "is_in_list", "arguments": {"col_names": ["f"], "allowed": [3]}},
},
{
"name": "col_g_is_null_or_empty",
@@ -120,7 +118,7 @@ def test_build_rules_by_metadata():
},
{
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_name": "h", "allowed": [1, 2]}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": "h", "allowed": [1, 2]}},
},
{
"name": "d_not_in_a",
@@ -146,20 +144,20 @@ def test_build_rules_by_metadata():
DQRule(name="col_b_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty("b")),
DQRule(name="col_c_is_null_or_empty", criticality="warn", filter="a>0", check=is_not_null_and_not_empty("c")),
DQRule(
- name="col_d_value_is_not_in_the_list",
+ name="col_d_is_not_in_the_list",
criticality="error",
filter="c=0",
- check=value_is_in_list("d", allowed=[1, 2]),
+ check=is_in_list("d", allowed=[1, 2]),
),
DQRule(
- name="col_e_value_is_not_in_the_list",
+ name="col_e_is_not_in_the_list",
criticality="error",
filter="c=0",
- check=value_is_in_list("e", allowed=[1, 2]),
+ check=is_in_list("e", allowed=[1, 2]),
),
- DQRule(name="col_f_value_is_not_in_the_list", criticality="warn", check=value_is_in_list("f", allowed=[3])),
+ DQRule(name="col_f_is_not_in_the_list", criticality="warn", check=is_in_list("f", allowed=[3])),
DQRule(name="col_g_is_null_or_empty", criticality="warn", check=is_not_null_and_not_empty("g")),
- DQRule(name="col_h_value_is_not_in_the_list", criticality="warn", check=value_is_in_list("h", allowed=[1, 2])),
+ DQRule(name="col_h_is_not_in_the_list", criticality="warn", check=is_in_list("h", allowed=[1, 2])),
DQRule(
name="d_not_in_a",
criticality="error",
diff --git a/tests/unit/test_checks_validation.py b/tests/unit/test_checks_validation.py
index 17817524..78f605ef 100644
--- a/tests/unit/test_checks_validation.py
+++ b/tests/unit/test_checks_validation.py
@@ -30,9 +30,9 @@ def test_valid_multiple_checks():
"check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}},
},
{
- "name": "col_a_value_is_not_in_the_list",
+ "name": "col_a_is_not_in_the_list",
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_name": "a", "allowed": [1, 3, 4]}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": "a", "allowed": [1, 3, 4]}},
},
{
"name": "col_a_is_null_or_empty_array",
@@ -57,9 +57,9 @@ def test_invalid_multiple_checks():
"check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}},
},
{
- "name": "col_a_value_is_not_in_the_list",
+ "name": "col_a_is_not_in_the_list",
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_name": "a", "allowed": 2}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": "a", "allowed": 2}},
},
{
"name": "col_b_is_null_or_empty",
@@ -77,7 +77,7 @@ def test_invalid_multiple_checks():
expected_errors = [
"No arguments provided for function 'is_not_null_and_not_empty' in the 'arguments' block",
"Invalid value for 'criticality' field",
- "Argument 'allowed' should be of type 'list' for function 'value_is_in_list' in the 'arguments' block",
+ "Argument 'allowed' should be of type 'list' for function 'is_in_list' in the 'arguments' block",
"'check' field is missing",
]
assert len(status.errors) == len(expected_errors)
@@ -176,7 +176,7 @@ def test_col_names_argument_type_list():
checks = [
{
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_names": ["a", "b"], "allowed": [1, 3, 4]}},
+ "check": {"function": "is_in_list", "arguments": {"col_names": ["a", "b"], "allowed": [1, 3, 4]}},
}
]
status = DQEngine.validate_checks(checks)
@@ -187,13 +187,12 @@ def test_col_functions_argument_mismtach_type():
checks = [
{
"criticality": "warn",
- "check": {"function": "value_is_in_list", "arguments": {"col_name": "a", "allowed": 2}},
+ "check": {"function": "is_in_list", "arguments": {"col_name": "a", "allowed": 2}},
}
]
status = DQEngine.validate_checks(checks)
- assert (
- "Argument 'allowed' should be of type 'list' for function 'value_is_in_list' in the 'arguments' block"
- in str(status)
+ assert "Argument 'allowed' should be of type 'list' for function 'is_in_list' in the 'arguments' block" in str(
+ status
)
diff --git a/tests/unit/test_col_functions.py b/tests/unit/test_col_functions.py
new file mode 100644
index 00000000..5a315d67
--- /dev/null
+++ b/tests/unit/test_col_functions.py
@@ -0,0 +1,43 @@
+import pytest
+from databricks.labs.dqx.col_functions import (
+ is_in_range,
+ is_not_in_range,
+ is_not_greater_than,
+ is_not_less_than,
+ is_in_list,
+ is_not_null_and_is_in_list,
+)
+
+LIMIT_VALUE_ERROR = "Limit is not provided"
+
+
+@pytest.mark.parametrize("min_limit, max_limit", [(None, 1), (1, None)])
+def test_col_is_in_range_missing_limits(min_limit, max_limit):
+ with pytest.raises(ValueError, match=LIMIT_VALUE_ERROR):
+ is_in_range("a", min_limit, max_limit)
+
+
+@pytest.mark.parametrize("min_limit, max_limit", [(None, 1), (1, None)])
+def test_col_is_not_in_range_missing_limits(min_limit, max_limit):
+ with pytest.raises(ValueError, match=LIMIT_VALUE_ERROR):
+ is_not_in_range("a", min_limit, max_limit)
+
+
+def test_col_not_greater_than_missing_limit():
+ with pytest.raises(ValueError, match=LIMIT_VALUE_ERROR):
+ is_not_greater_than("a", limit=None)
+
+
+def test_col_not_less_than_missing_limit():
+ with pytest.raises(ValueError, match=LIMIT_VALUE_ERROR):
+ is_not_less_than("a", limit=None)
+
+
+def test_col_is_not_null_and_is_in_list_missing_allowed_list():
+ with pytest.raises(ValueError, match="allowed list is not provided"):
+ is_not_null_and_is_in_list("a", allowed=[])
+
+
+def test_col_is_in_list_missing_allowed_list():
+ with pytest.raises(ValueError, match="allowed list is not provided"):
+ is_in_list("a", allowed=[])
diff --git a/tests/unit/resolve_check_function.py b/tests/unit/test_resolve_check_function.py
similarity index 100%
rename from tests/unit/resolve_check_function.py
rename to tests/unit/test_resolve_check_function.py