Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add context info to output #206

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,453 changes: 2,453 additions & 0 deletions coverage-unit.xml

Large diffs are not rendered by default.

45 changes: 30 additions & 15 deletions demos/dqx_demo_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
# COMMAND ----------

# MAGIC %pip install databricks-labs-dqx

# COMMAND ----------

dbutils.library.restartPython()
# MAGIC %restart_python

# COMMAND ----------

Expand Down Expand Up @@ -92,6 +89,19 @@

# COMMAND ----------

# MAGIC %md
# MAGIC ## Analyze Quality Checks Output

# COMMAND ----------

import pyspark.sql.functions as F

# explore errors
results_df = quarantined_df.select(F.explode(F.col("_errors")).alias("dq")).select(F.expr("dq.*"))
display(results_df)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Validating quality checks definition
# MAGIC This is typically run as part of CI/CD process to ensure checks are ready to use.
Expand Down Expand Up @@ -194,15 +204,19 @@
DQRule( # define rule for a single column
name="col3_is_null_or_empty",
criticality="warn",
check=is_not_null_and_not_empty("col3")),
check_func=is_not_null_and_not_empty,
col_name="col3"),
DQRule( # define rule with a filter
name="col_4_is_null_or_empty",
criticality="warn",
filter="col1 < 3",
check=is_not_null_and_not_empty("col4")),
check_func=is_not_null_and_not_empty,
col_name="col4"),
DQRule( # name for the check auto-generated if not provided
criticality="error",
check=is_in_list("col1", ["1", "2"]))
check_func=is_in_list,
col_name="col1",
check_func_args=[["1", "2"]])
] + DQRuleColSet( # define rule for multiple columns at once
columns=["col1", "col2"],
criticality="error",
Expand Down Expand Up @@ -331,9 +345,10 @@ def ends_with_foo(col_name: str) -> Column:

# use built-in, custom and sql expression checks
checks = [
DQRule(criticality="error", check=is_not_null_and_not_empty("col1")),
DQRule(criticality="warn", check=ends_with_foo("col1")),
DQRule(criticality="warn", check=sql_expression("col1 not like 'str%'", msg="col1 starts with 'str'")),
DQRule(criticality="error", check_func=is_not_null_and_not_empty, col_name="col1"),
DQRule(criticality="warn", check_func=ends_with_foo, col_name="col1"),
DQRule(criticality="warn", check_func=sql_expression, check_func_kwargs={
"expression": "col1 like 'str%'", "msg": "col1 not starting with 'str'"}),
]

schema = "col1: string, col2: string"
Expand Down Expand Up @@ -369,12 +384,12 @@ def ends_with_foo(col_name: str) -> Column:
function: ends_with_foo
arguments:
col_name: col1
- criticality: error
- criticality: warn
check:
function: sql_expression
arguments:
expression: col1 not like 'str%'
msg: col1 starts with 'str'
expression: col1 like 'str%'
msg: col1 not starting with 'str'
"""
)

Expand Down Expand Up @@ -419,8 +434,8 @@ def ends_with_foo(col_name: str) -> Column:
input_df = spark.createDataFrame([[None, "foo"], ["foo", None], [None, None]], schema)

checks = [
DQRule(criticality="error", check=is_not_null_and_not_empty("col1")),
DQRule(criticality="warn", check=is_not_null_and_not_empty("col2")),
DQRule(criticality="error", check_func=is_not_null_and_not_empty, col_name="col1"),
DQRule(criticality="warn", check_func=is_not_null_and_not_empty, col_name="col2"),
]

valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
Expand Down
5 changes: 1 addition & 4 deletions demos/dqx_demo_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@
dqx_wheel_files = glob.glob(f"/Workspace/Users/{user_name}/.dqx/wheels/databricks_labs_dqx-*.whl")
dqx_latest_wheel = max(dqx_wheel_files, key=os.path.getctime)
%pip install {dqx_latest_wheel}

# COMMAND ----------

dbutils.library.restartPython()
%restart_python

# COMMAND ----------

Expand Down
5 changes: 1 addition & 4 deletions demos/dqx_dlt_demo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# Databricks notebook source

%pip install databricks-labs-dqx

# COMMAND ----------

dbutils.library.restartPython()
%restart_python

# COMMAND ----------

Expand Down
65 changes: 61 additions & 4 deletions docs/dqx/docs/guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,19 @@ Example:
DQRule( # define rule for a single column
name="col3_is_null_or_empty",
criticality="error",
check=is_not_null_and_not_empty("col3")),
check_func=is_not_null_and_not_empty,
col_name="col3"),
DQRule( # define rule with a filter
name="col_4_is_null_or_empty",
criticality="error",
criticality="error",
filter="col1 < 3",
check=is_not_null_and_not_empty("col4")),
check_func=is_not_null_and_not_empty,
col_name="col4"),
DQRule( # name for the check auto-generated if not provided
criticality="warn",
check=is_in_list("col4", ["1", "2"]))
check_func=is_in_list,
col_name="col4",
check_func_args=[["1", "2"]])
] + DQRuleColSet( # define rule for multiple columns at once
columns=["col1", "col2"],
criticality="error",
Expand Down Expand Up @@ -440,6 +444,59 @@ The validation cannot be used for checks defined using [DQX classes](#method-1-u
Validating quality rules are typically done as part of the CI/CD process to ensure checks are ready to use in the application.
</Admonition>

## Quality Checks Output (Reporting Columns)

DQX quality checks output can be used to monitor and track data quality issues.
The quality issues are reported as additional columns in the output DataFrame.
The reporting columns are named `_error` and `_warning` by default, but you can customize them as described in the [Additional Configuration](#additional-configuration) section.
The reporting columns can be used for further processing, such as using in a dashboard, or other downstream applications.

Below is a sample output of a check as stored in a reporting column:
```python
[
{
"name": "col_city_is_null",
"message": "Column city is null",
"col_name": "city",
"filter": "country = 'Poland'",
"function": "is_not_null",
"run_time": "2025-01-01 14:31:21",
},
]
```

The structure of the reporting columns is an array containing the following fields
(see the exact structure [here](https://github.com/databrickslabs/dqx/blob/main/src/databricks/labs/dqx/schema/dq_result_schema.py)):
- `name`: name of the check (string type).
- `msg`: message describing the issue (string type).
- `col_name`: name of the column where the issue was found (string type).
- `filter`: filter applied to the column if any (string type).
- `function`: function used to check the column (string type).
- `run_time`: timestamp when the check was executed (timestamp type).

The below example demonstrates how to extract the results from a reporting column:
<Tabs>
<TabItem value="Python" label="Python" default>
```python
import pyspark.sql.functions as F

valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)

results_df = quarantined_df.select(
F.explode(F.col("_errors")).alias("result"),
).select(F.expr("result.*"))

# The results_df will contain the following columns:
# +------------------+---------------------+----------+--------------------+-------------+---------------------+
# | name | message | col_name | filter | function | run_time |
# +------------------+---------------------+----------+--------------------+-------------+---------------------+
# | col_city_is_null | Column city is null | city | country = 'Poland' | is_not_null | 2025-01-01 14:31:21 |
# | ... | ... | ... | ... | ... | ... |
# +------------------+---------------------+----------+--------------------+-------------+---------------------+
```
</TabItem>
</Tabs>

## Data Quality Dashboard

The data quality dashboard is automatically installed in the `dashboards` folder of the workspace installation directory when you install DQX in the Databricks workspace. For more details on the installation process, see the [Installation Guide](/docs/installation).
Expand Down
Loading