Skip to content

Commit 7f46d4f

Browse files
committed
Merge remote-tracking branch 'origin/code_cov_new' into code_cov_new
2 parents 2bb872c + 5a9a026 commit 7f46d4f

18 files changed

+530
-146
lines changed

docs/dqx/docs/guide.mdx

+13
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ print(dlt_expectations)
6060
You can optionally install DQX in the workspace, see the [Installation Guide](/docs/installation#dqx-installation-in-a-databricks-workspace).
6161
As part of the installation, a config, dashboards and profiler workflow is installed. The workflow can be run manually in the workspace UI or using the CLI as below.
6262

63+
DQX operates at the moment exclusively at the pySpark dataframe level and does not interact directly with databases or storage systems.
64+
DQX does not persist data after performing quality checks, meaning users must handle data storage themselves.
65+
Since DQX does not manage the input location, output table, or quarantine table, it is the user's responsibility to store or persist the processed data as needed.
66+
6367
Open the config to check available run configs and adjust the settings if needed:
6468
```commandline
6569
databricks labs dqx open-remote-config
@@ -164,6 +168,11 @@ Fields:
164168

165169
### Loading and execution methods
166170

171+
Checks can be loaded from a file in the installation folder, workspace, or local file system. If the checks file contains invalid json or yaml syntax, the engine will raise an error.
172+
The checks can be applied using `apply_checks_by_metadata_and_split` or `apply_checks_by_metadata` methods. The checks are validated automatically as part of these methods.
173+
If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_by_metadata_and_split`.
174+
If you want to report issues as additional columns, use `apply_checks_by_metadata`.
175+
167176
#### Method 1: Loading checks from a workspace file in the installation folder
168177

169178
If DQX is installed in the workspace, you can load checks based on the run configuration:
@@ -229,6 +238,10 @@ valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
229238

230239
### Quality rules defined as code
231240

241+
Check can be defined in the code and applied using `apply_checks_and_split` or `apply_checks` methods.
242+
If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_and_split`.
243+
If you want to report issues as additional columns, use `apply_checks`.
244+
232245
#### Method 1: Using DQX classes
233246

234247
```python

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ argument-naming-style = "snake_case"
243243

244244
# Regular expression matching correct argument names. Overrides argument-naming-
245245
# style. If left empty, argument names will be checked with the set naming style.
246-
argument-rgx = "[a-z_][a-z0-9_]{2,30}$"
246+
argument-rgx = "[a-z_][a-z0-9_]{2,40}$"
247247

248248
# Naming style matching correct attribute names.
249249
attr-naming-style = "snake_case"

src/databricks/labs/dqx/base.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -131,22 +131,22 @@ def get_valid(self, df: DataFrame) -> DataFrame:
131131

132132
@staticmethod
133133
@abc.abstractmethod
134-
def load_checks_from_local_file(path: str) -> list[dict]:
134+
def load_checks_from_local_file(filepath: str) -> list[dict]:
135135
"""
136136
Load checks (dq rules) from a file (json or yml) in the local file system.
137137
This does not require installation of DQX in the workspace.
138138
The returning checks can be used as input for `apply_checks_by_metadata` function.
139139
140-
:param path: path to a file containing the checks.
140+
:param filepath: path to a file containing the checks.
141141
:return: list of dq rules
142142
"""
143143

144144
@staticmethod
145145
@abc.abstractmethod
146-
def save_checks_in_local_file(checks: list[dict], path: str):
146+
def save_checks_in_local_file(checks: list[dict], filepath: str):
147147
"""
148148
Save checks (dq rules) to yml file in the local file system.
149149
150150
:param checks: list of dq rules to save
151-
:param path: path to a file containing the checks.
151+
:param filepath: path to a file containing the checks.
152152
"""

src/databricks/labs/dqx/engine.py

+98-16
Original file line numberDiff line numberDiff line change
@@ -116,27 +116,27 @@ def get_valid(self, df: DataFrame) -> DataFrame:
116116
)
117117

118118
@staticmethod
119-
def load_checks_from_local_file(path: str) -> list[dict]:
120-
if not path:
121-
raise ValueError("filename must be provided")
119+
def load_checks_from_local_file(filepath: str) -> list[dict]:
120+
if not filepath:
121+
raise ValueError("filepath must be provided")
122122

123123
try:
124-
checks = Installation.load_local(list[dict[str, str]], Path(path))
124+
checks = Installation.load_local(list[dict[str, str]], Path(filepath))
125125
return deserialize_dicts(checks)
126126
except FileNotFoundError:
127-
msg = f"Checks file {path} missing"
127+
msg = f"Checks file {filepath} missing"
128128
raise FileNotFoundError(msg) from None
129129

130130
@staticmethod
131-
def save_checks_in_local_file(checks: list[dict], path: str):
132-
if not path:
133-
raise ValueError("filename must be provided")
131+
def save_checks_in_local_file(checks: list[dict], filepath: str):
132+
if not filepath:
133+
raise ValueError("filepath must be provided")
134134

135135
try:
136-
with open(path, 'w', encoding="utf-8") as file:
136+
with open(filepath, 'w', encoding="utf-8") as file:
137137
yaml.safe_dump(checks, file)
138138
except FileNotFoundError:
139-
msg = f"Checks file {path} missing"
139+
msg = f"Checks file {filepath} missing"
140140
raise FileNotFoundError(msg) from None
141141

142142
@staticmethod
@@ -391,49 +391,128 @@ def __init__(
391391
self._engine = engine or DQEngineCore(workspace_client, extra_params)
392392

393393
def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame:
394+
"""Applies data quality checks to a given dataframe.
395+
396+
:param df: dataframe to check
397+
:param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class.
398+
:return: dataframe with errors and warning reporting columns
399+
"""
394400
return self._engine.apply_checks(df, checks)
395401

396402
def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]:
403+
"""Applies data quality checks to a given dataframe and split it into two ("good" and "bad"),
404+
according to the data quality checks.
405+
406+
:param df: dataframe to check
407+
:param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class.
408+
:return: two dataframes - "good" which includes warning rows but no reporting columns, and "data" having
409+
error and warning rows and corresponding reporting columns
410+
"""
397411
return self._engine.apply_checks_and_split(df, checks)
398412

399413
def apply_checks_by_metadata_and_split(
400414
self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None
401415
) -> tuple[DataFrame, DataFrame]:
416+
"""Wrapper around `apply_checks_and_split` for use in the metadata-driven pipelines. The main difference
417+
is how the checks are specified - instead of using functions directly, they are described as function name plus
418+
arguments.
419+
420+
:param df: dataframe to check
421+
:param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields:
422+
* `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true -
423+
it will be used as an error/warning message, or `null` if it's evaluated to `false`
424+
* `name` - name that will be given to a resulting column. Autogenerated if not provided
425+
* `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe),
426+
and `warn` (data is going into both dataframes)
427+
:param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module).
428+
If not specified, then only built-in functions are used for the checks.
429+
:return: two dataframes - "good" which includes warning rows but no reporting columns, and "bad" having
430+
error and warning rows and corresponding reporting columns
431+
"""
402432
return self._engine.apply_checks_by_metadata_and_split(df, checks, glbs)
403433

404434
def apply_checks_by_metadata(
405435
self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None
406436
) -> DataFrame:
437+
"""Wrapper around `apply_checks` for use in the metadata-driven pipelines. The main difference
438+
is how the checks are specified - instead of using functions directly, they are described as function name plus
439+
arguments.
440+
441+
:param df: dataframe to check
442+
:param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields:
443+
* `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true -
444+
it will be used as an error/warning message, or `null` if it's evaluated to `false`
445+
* `name` - name that will be given to a resulting column. Autogenerated if not provided
446+
* `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe),
447+
and `warn` (data is going into both dataframes)
448+
:param glbs: dictionary with functions mapping (eg. ``globals()`` of calling module).
449+
If not specified, then only built-in functions are used for the checks.
450+
:return: dataframe with errors and warning reporting columns
451+
"""
407452
return self._engine.apply_checks_by_metadata(df, checks, glbs)
408453

409454
@staticmethod
410455
def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus:
456+
"""
457+
Validate the input dict to ensure they conform to expected structure and types.
458+
459+
Each check can be a dictionary. The function validates
460+
the presence of required keys, the existence and callability of functions, and the types
461+
of arguments passed to these functions.
462+
463+
:param checks: List of checks to apply to the dataframe. Each check should be a dictionary.
464+
:param glbs: Optional dictionary of global functions that can be used in checks.
465+
466+
:return ValidationStatus: The validation status.
467+
"""
411468
return DQEngineCore.validate_checks(checks, glbs)
412469

413470
def get_invalid(self, df: DataFrame) -> DataFrame:
471+
"""
472+
Get records that violate data quality checks (records with warnings and errors).
473+
@param df: input DataFrame.
474+
@return: dataframe with error and warning rows and corresponding reporting columns.
475+
"""
414476
return self._engine.get_invalid(df)
415477

416478
def get_valid(self, df: DataFrame) -> DataFrame:
479+
"""
480+
Get records that don't violate data quality checks (records with warnings but no errors).
481+
@param df: input DataFrame.
482+
@return: dataframe with warning rows but no reporting columns.
483+
"""
417484
return self._engine.get_valid(df)
418485

419486
@staticmethod
420-
def load_checks_from_local_file(path: str) -> list[dict]:
421-
return DQEngineCore.load_checks_from_local_file(path)
487+
def load_checks_from_local_file(filepath: str) -> list[dict]:
488+
"""
489+
Load checks (dq rules) from a file (json or yml) in the local filesystem.
490+
491+
:param filepath: path to the file containing the checks.
492+
:return: list of dq rules or raise an error if checks file is missing or is invalid.
493+
"""
494+
parsed_checks = DQEngineCore.load_checks_from_local_file(filepath)
495+
if not parsed_checks:
496+
raise ValueError(f"Invalid or no checks in file: {filepath}")
497+
return parsed_checks
422498

423499
def load_checks_from_workspace_file(self, workspace_path: str) -> list[dict]:
424500
"""Load checks (dq rules) from a file (json or yml) in the workspace.
425501
This does not require installation of DQX in the workspace.
426502
The returning checks can be used as input for `apply_checks_by_metadata` function.
427503
428504
:param workspace_path: path to the file in the workspace.
429-
:return: list of dq rules.
505+
:return: list of dq rules or raise an error if checks file is missing or is invalid.
430506
"""
431507
workspace_dir = os.path.dirname(workspace_path)
432508
filename = os.path.basename(workspace_path)
433509
installation = Installation(self.ws, "dqx", install_folder=workspace_dir)
434510

435511
logger.info(f"Loading quality rules (checks) from {workspace_path} in the workspace.")
436-
return self._load_checks_from_file(installation, filename)
512+
parsed_checks = self._load_checks_from_file(installation, filename)
513+
if not parsed_checks:
514+
raise ValueError(f"Invalid or no checks in workspace file: {workspace_path}")
515+
return parsed_checks
437516

438517
def load_checks_from_installation(
439518
self, run_config_name: str | None = "default", product_name: str = "dqx", assume_user: bool = True
@@ -445,14 +524,17 @@ def load_checks_from_installation(
445524
:param run_config_name: name of the run (config) to use
446525
:param product_name: name of the product/installation directory
447526
:param assume_user: if True, assume user installation
448-
:return: list of dq rules
527+
:return: list of dq rules or raise an error if checks file is missing or is invalid.
449528
"""
450529
installation = self._get_installation(assume_user, product_name)
451530
run_config = self._load_run_config(installation, run_config_name)
452531
filename = run_config.checks_file or "checks.yml"
453532

454533
logger.info(f"Loading quality rules (checks) from {installation.install_folder()}/{filename} in the workspace.")
455-
return self._load_checks_from_file(installation, filename)
534+
parsed_checks = self._load_checks_from_file(installation, filename)
535+
if not parsed_checks:
536+
raise ValueError(f"Invalid or no checks in workspace file: {installation.install_folder()}/{filename}")
537+
return parsed_checks
456538

457539
@staticmethod
458540
def save_checks_in_local_file(checks: list[dict], path: str):

src/databricks/labs/dqx/utils.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import re
2-
import yaml
2+
import ast
33
from pyspark.sql import Column
44
from pyspark.sql import SparkSession
55

@@ -49,12 +49,21 @@ def read_input_data(spark: SparkSession, input_location: str | None, input_forma
4949

5050
def deserialize_dicts(checks: list[dict[str, str]]) -> list[dict]:
5151
"""
52-
deserialize string fields instances containing dictionaries
52+
Deserialize string fields instances containing dictionaries.
53+
This is needed as nested dictionaries from installation files are loaded as strings.
5354
@param checks: list of checks
5455
@return:
5556
"""
56-
for item in checks:
57-
for key, value in item.items():
58-
if value.startswith("{") and value.endswith("}"):
59-
item[key] = yaml.safe_load(value.replace("'", '"'))
60-
return checks
57+
58+
def parse_nested_fields(obj):
59+
"""Recursively parse all string representations of dictionaries."""
60+
if isinstance(obj, str):
61+
if obj.startswith("{") and obj.endswith("}"):
62+
parsed_obj = ast.literal_eval(obj)
63+
return parse_nested_fields(parsed_obj)
64+
return obj
65+
if isinstance(obj, dict):
66+
return {k: parse_nested_fields(v) for k, v in obj.items()}
67+
return obj
68+
69+
return [parse_nested_fields(check) for check in checks]

tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)