@@ -116,27 +116,27 @@ def get_valid(self, df: DataFrame) -> DataFrame:
116
116
)
117
117
118
118
@staticmethod
119
- def load_checks_from_local_file (path : str ) -> list [dict ]:
120
- if not path :
121
- raise ValueError ("filename must be provided" )
119
+ def load_checks_from_local_file (filepath : str ) -> list [dict ]:
120
+ if not filepath :
121
+ raise ValueError ("filepath must be provided" )
122
122
123
123
try :
124
- checks = Installation .load_local (list [dict [str , str ]], Path (path ))
124
+ checks = Installation .load_local (list [dict [str , str ]], Path (filepath ))
125
125
return deserialize_dicts (checks )
126
126
except FileNotFoundError :
127
- msg = f"Checks file { path } missing"
127
+ msg = f"Checks file { filepath } missing"
128
128
raise FileNotFoundError (msg ) from None
129
129
130
130
@staticmethod
131
- def save_checks_in_local_file (checks : list [dict ], path : str ):
132
- if not path :
133
- raise ValueError ("filename must be provided" )
131
+ def save_checks_in_local_file (checks : list [dict ], filepath : str ):
132
+ if not filepath :
133
+ raise ValueError ("filepath must be provided" )
134
134
135
135
try :
136
- with open (path , 'w' , encoding = "utf-8" ) as file :
136
+ with open (filepath , 'w' , encoding = "utf-8" ) as file :
137
137
yaml .safe_dump (checks , file )
138
138
except FileNotFoundError :
139
- msg = f"Checks file { path } missing"
139
+ msg = f"Checks file { filepath } missing"
140
140
raise FileNotFoundError (msg ) from None
141
141
142
142
@staticmethod
@@ -391,49 +391,128 @@ def __init__(
391
391
self ._engine = engine or DQEngineCore (workspace_client , extra_params )
392
392
393
393
def apply_checks (self , df : DataFrame , checks : list [DQRule ]) -> DataFrame :
394
+ """Applies data quality checks to a given dataframe.
395
+
396
+ :param df: dataframe to check
397
+ :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class.
398
+ :return: dataframe with errors and warning reporting columns
399
+ """
394
400
return self ._engine .apply_checks (df , checks )
395
401
396
402
def apply_checks_and_split (self , df : DataFrame , checks : list [DQRule ]) -> tuple [DataFrame , DataFrame ]:
403
+ """Applies data quality checks to a given dataframe and split it into two ("good" and "bad"),
404
+ according to the data quality checks.
405
+
406
+ :param df: dataframe to check
407
+ :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class.
408
+ :return: two dataframes - "good" which includes warning rows but no reporting columns, and "data" having
409
+ error and warning rows and corresponding reporting columns
410
+ """
397
411
return self ._engine .apply_checks_and_split (df , checks )
398
412
399
413
def apply_checks_by_metadata_and_split (
400
414
self , df : DataFrame , checks : list [dict ], glbs : dict [str , Any ] | None = None
401
415
) -> tuple [DataFrame , DataFrame ]:
416
+ """Wrapper around `apply_checks_and_split` for use in the metadata-driven pipelines. The main difference
417
+ is how the checks are specified - instead of using functions directly, they are described as function name plus
418
+ arguments.
419
+
420
+ :param df: dataframe to check
421
+ :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields:
422
+ * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true -
423
+ it will be used as an error/warning message, or `null` if it's evaluated to `false`
424
+ * `name` - name that will be given to a resulting column. Autogenerated if not provided
425
+ * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe),
426
+ and `warn` (data is going into both dataframes)
427
+ :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module).
428
+ If not specified, then only built-in functions are used for the checks.
429
+ :return: two dataframes - "good" which includes warning rows but no reporting columns, and "bad" having
430
+ error and warning rows and corresponding reporting columns
431
+ """
402
432
return self ._engine .apply_checks_by_metadata_and_split (df , checks , glbs )
403
433
404
434
def apply_checks_by_metadata (
405
435
self , df : DataFrame , checks : list [dict ], glbs : dict [str , Any ] | None = None
406
436
) -> DataFrame :
437
+ """Wrapper around `apply_checks` for use in the metadata-driven pipelines. The main difference
438
+ is how the checks are specified - instead of using functions directly, they are described as function name plus
439
+ arguments.
440
+
441
+ :param df: dataframe to check
442
+ :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields:
443
+ * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true -
444
+ it will be used as an error/warning message, or `null` if it's evaluated to `false`
445
+ * `name` - name that will be given to a resulting column. Autogenerated if not provided
446
+ * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe),
447
+ and `warn` (data is going into both dataframes)
448
+ :param glbs: dictionary with functions mapping (eg. ``globals()`` of calling module).
449
+ If not specified, then only built-in functions are used for the checks.
450
+ :return: dataframe with errors and warning reporting columns
451
+ """
407
452
return self ._engine .apply_checks_by_metadata (df , checks , glbs )
408
453
409
454
@staticmethod
410
455
def validate_checks (checks : list [dict ], glbs : dict [str , Any ] | None = None ) -> ChecksValidationStatus :
456
+ """
457
+ Validate the input dict to ensure they conform to expected structure and types.
458
+
459
+ Each check can be a dictionary. The function validates
460
+ the presence of required keys, the existence and callability of functions, and the types
461
+ of arguments passed to these functions.
462
+
463
+ :param checks: List of checks to apply to the dataframe. Each check should be a dictionary.
464
+ :param glbs: Optional dictionary of global functions that can be used in checks.
465
+
466
+ :return ValidationStatus: The validation status.
467
+ """
411
468
return DQEngineCore .validate_checks (checks , glbs )
412
469
413
470
def get_invalid (self , df : DataFrame ) -> DataFrame :
471
+ """
472
+ Get records that violate data quality checks (records with warnings and errors).
473
+ @param df: input DataFrame.
474
+ @return: dataframe with error and warning rows and corresponding reporting columns.
475
+ """
414
476
return self ._engine .get_invalid (df )
415
477
416
478
def get_valid (self , df : DataFrame ) -> DataFrame :
479
+ """
480
+ Get records that don't violate data quality checks (records with warnings but no errors).
481
+ @param df: input DataFrame.
482
+ @return: dataframe with warning rows but no reporting columns.
483
+ """
417
484
return self ._engine .get_valid (df )
418
485
419
486
@staticmethod
420
- def load_checks_from_local_file (path : str ) -> list [dict ]:
421
- return DQEngineCore .load_checks_from_local_file (path )
487
+ def load_checks_from_local_file (filepath : str ) -> list [dict ]:
488
+ """
489
+ Load checks (dq rules) from a file (json or yml) in the local filesystem.
490
+
491
+ :param filepath: path to the file containing the checks.
492
+ :return: list of dq rules or raise an error if checks file is missing or is invalid.
493
+ """
494
+ parsed_checks = DQEngineCore .load_checks_from_local_file (filepath )
495
+ if not parsed_checks :
496
+ raise ValueError (f"Invalid or no checks in file: { filepath } " )
497
+ return parsed_checks
422
498
423
499
def load_checks_from_workspace_file (self , workspace_path : str ) -> list [dict ]:
424
500
"""Load checks (dq rules) from a file (json or yml) in the workspace.
425
501
This does not require installation of DQX in the workspace.
426
502
The returning checks can be used as input for `apply_checks_by_metadata` function.
427
503
428
504
:param workspace_path: path to the file in the workspace.
429
- :return: list of dq rules.
505
+ :return: list of dq rules or raise an error if checks file is missing or is invalid .
430
506
"""
431
507
workspace_dir = os .path .dirname (workspace_path )
432
508
filename = os .path .basename (workspace_path )
433
509
installation = Installation (self .ws , "dqx" , install_folder = workspace_dir )
434
510
435
511
logger .info (f"Loading quality rules (checks) from { workspace_path } in the workspace." )
436
- return self ._load_checks_from_file (installation , filename )
512
+ parsed_checks = self ._load_checks_from_file (installation , filename )
513
+ if not parsed_checks :
514
+ raise ValueError (f"Invalid or no checks in workspace file: { workspace_path } " )
515
+ return parsed_checks
437
516
438
517
def load_checks_from_installation (
439
518
self , run_config_name : str | None = "default" , product_name : str = "dqx" , assume_user : bool = True
@@ -445,14 +524,17 @@ def load_checks_from_installation(
445
524
:param run_config_name: name of the run (config) to use
446
525
:param product_name: name of the product/installation directory
447
526
:param assume_user: if True, assume user installation
448
- :return: list of dq rules
527
+ :return: list of dq rules or raise an error if checks file is missing or is invalid.
449
528
"""
450
529
installation = self ._get_installation (assume_user , product_name )
451
530
run_config = self ._load_run_config (installation , run_config_name )
452
531
filename = run_config .checks_file or "checks.yml"
453
532
454
533
logger .info (f"Loading quality rules (checks) from { installation .install_folder ()} /{ filename } in the workspace." )
455
- return self ._load_checks_from_file (installation , filename )
534
+ parsed_checks = self ._load_checks_from_file (installation , filename )
535
+ if not parsed_checks :
536
+ raise ValueError (f"Invalid or no checks in workspace file: { installation .install_folder ()} /{ filename } " )
537
+ return parsed_checks
456
538
457
539
@staticmethod
458
540
def save_checks_in_local_file (checks : list [dict ], path : str ):
0 commit comments