1
1
import functools as ft
2
2
import itertools
3
3
import re
4
+ from collections .abc import Callable
4
5
from dataclasses import dataclass , field
5
6
from enum import Enum
6
- from typing import Any , Callable , Dict , List , Tuple
7
+ from typing import Any
7
8
8
9
import pyspark .sql .functions as F
9
10
from pyspark .sql import Column , DataFrame
10
11
11
12
from databricks .labs .dqx import dq_functions
12
- from databricks .labs .dqx .dq_functions import * # noqa: F403
13
+ from databricks .labs .dqx .dq_functions import * # noqa: F403 # pylint: disable=wildcard-import,unused-wildcard-import
13
14
from databricks .labs .dqx .utils import get_column_name
14
15
15
16
@@ -47,7 +48,7 @@ def rule_criticality(self) -> str:
47
48
:return: string describing criticality - `warn` or `error`. Raises exception if it's something else
48
49
"""
49
50
criticality = self .criticality
50
- if criticality != Criticality .WARN .value and criticality != Criticality .ERROR .value :
51
+ if criticality not in ( Criticality .WARN .value , Criticality .ERROR .value ) :
51
52
criticality = Criticality .ERROR .value
52
53
53
54
return criticality
@@ -79,13 +80,13 @@ class DQRuleColSet:
79
80
* `check_func_kwargs` - keyword /named arguments for the check function after the col_name
80
81
"""
81
82
82
- columns : List [str ]
83
+ columns : list [str ]
83
84
check_func : Callable
84
85
criticality : str = "error"
85
- check_func_args : List [Any ] = field (default_factory = list )
86
- check_func_kwargs : Dict [str , Any ] = field (default_factory = dict )
86
+ check_func_args : list [Any ] = field (default_factory = list )
87
+ check_func_kwargs : dict [str , Any ] = field (default_factory = dict )
87
88
88
- def get_rules (self ) -> List [DQRule ]:
89
+ def get_rules (self ) -> list [DQRule ]:
89
90
"""Build a list of rules for a set of columns.
90
91
91
92
:return: list of dq rules
@@ -99,7 +100,7 @@ def get_rules(self) -> List[DQRule]:
99
100
]
100
101
101
102
102
- def _perform_checks (df : DataFrame , checks : List [DQRule ]) -> DataFrame :
103
+ def _perform_checks (df : DataFrame , checks : list [DQRule ]) -> DataFrame :
103
104
"""Applies a list of checks to a given dataframe and append results at the end of the dataframe.
104
105
105
106
:param df: dataframe to check
@@ -110,7 +111,7 @@ def _perform_checks(df: DataFrame, checks: List[DQRule]) -> DataFrame:
110
111
return df .select ("*" , * checks_cols )
111
112
112
113
113
- def _make_null_filter (cols : List [str ]) -> Column :
114
+ def _make_null_filter (cols : list [str ]) -> Column :
114
115
"""Creates a filter condition that check if all specified columns are null.
115
116
116
117
:param cols: names of the columns to check
@@ -122,14 +123,14 @@ def _make_null_filter(cols: List[str]) -> Column:
122
123
def update_nullability_func (func , col ):
123
124
return func & F .col (col ).isNull ()
124
125
125
- f1 = F .col (cols [0 ]).isNull ()
126
- return ft .reduce (update_nullability_func , cols [1 :], f1 )
126
+ initial = F .col (cols [0 ]).isNull ()
127
+ return ft .reduce (update_nullability_func , cols [1 :], initial )
127
128
128
129
129
130
remove_criticality_re = re .compile ("^(.*)_(error|warn)$" )
130
131
131
132
132
- def _with_checks_as_map (df : DataFrame , dest_col : str , cols : List [str ]) -> DataFrame :
133
+ def _with_checks_as_map (df : DataFrame , dest_col : str , cols : list [str ]) -> DataFrame :
133
134
"""Collect individual check columns into corresponding map<string, string> errors or warnings columns.
134
135
135
136
:param df: dataframe with added check columns of type map<string, string>
@@ -168,7 +169,7 @@ def _with_checks_as_map(df: DataFrame, dest_col: str, cols: List[str]) -> DataFr
168
169
return ndf
169
170
170
171
171
- def _get_check_columns (checks : List [DQRule ], criticality : str ) -> List [str ]:
172
+ def _get_check_columns (checks : list [DQRule ], criticality : str ) -> list [str ]:
172
173
"""Get check columns based on criticality.
173
174
174
175
:param checks: list of checks to apply to the dataframe
@@ -191,7 +192,7 @@ def _append_empty_checks(df: DataFrame) -> DataFrame:
191
192
)
192
193
193
194
194
- def apply_checks (df : DataFrame , checks : List [DQRule ]) -> DataFrame :
195
+ def apply_checks (df : DataFrame , checks : list [DQRule ]) -> DataFrame :
195
196
"""Applies data quality checks to a given dataframe.
196
197
197
198
:param df: dataframe to check
@@ -214,7 +215,7 @@ def apply_checks(df: DataFrame, checks: List[DQRule]) -> DataFrame:
214
215
return checked_df_map
215
216
216
217
217
- def apply_checks_and_split (df : DataFrame , checks : List [DQRule ]) -> Tuple [DataFrame , DataFrame ]:
218
+ def apply_checks_and_split (df : DataFrame , checks : list [DQRule ]) -> tuple [DataFrame , DataFrame ]:
218
219
"""Applies data quality checks to a given dataframe and split it into two ("good" and "bad"),
219
220
according to the data quality checks.
220
221
@@ -234,7 +235,7 @@ def apply_checks_and_split(df: DataFrame, checks: List[DQRule]) -> Tuple[DataFra
234
235
return good_df , bad_df
235
236
236
237
237
- def build_checks_by_metadata (checks : List [dict ], glbs = None ) -> List [DQRule ]:
238
+ def build_checks_by_metadata (checks : list [dict ], glbs : dict [ str , Any ] | None = None ) -> list [DQRule ]:
238
239
"""Build checks based on check specification, i.e. function name plus arguments.
239
240
240
241
:param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields:
@@ -248,25 +249,25 @@ def build_checks_by_metadata(checks: List[dict], glbs=None) -> List[DQRule]:
248
249
:return: list of data quality check rules
249
250
"""
250
251
dq_rule_checks = []
251
- for ch in checks :
252
- check = ch .get ("check" )
252
+ for check_def in checks :
253
+ check = check_def .get ("check" )
253
254
if not check :
254
- raise Exception (f"'check' block should be provided in the check: { ch } " )
255
+ raise ValueError (f"'check' block should be provided in the check: { check } " )
255
256
256
257
func_name = check .get ("function" )
257
258
if not func_name :
258
- raise Exception (f"'function' argument should be provided in the check: { ch } " )
259
+ raise ValueError (f"'function' argument should be provided in the check: { check } " )
259
260
260
261
if glbs :
261
262
func = glbs .get (func_name )
262
263
else :
263
264
func = getattr (dq_functions , func_name )
264
265
265
266
if not func or not callable (func ):
266
- raise Exception (f"function { func_name } is not defined" )
267
+ raise ValueError (f"function { func_name } is not defined" )
267
268
268
269
func_args = check .get ("arguments" , {})
269
- criticality = ch .get ("criticality" , "error" )
270
+ criticality = check_def .get ("criticality" , "error" )
270
271
271
272
if "col_names" in func_args :
272
273
dq_rule_checks += DQRuleColSet (
@@ -277,14 +278,16 @@ def build_checks_by_metadata(checks: List[dict], glbs=None) -> List[DQRule]:
277
278
check_func_kwargs = {k : func_args [k ] for k in func_args .keys () - {"col_names" }},
278
279
).get_rules ()
279
280
else :
280
- name = ch .get ("name" , None )
281
+ name = check_def .get ("name" , None )
281
282
check_func = func (** func_args )
282
283
dq_rule_checks .append (DQRule (check = check_func , name = name , criticality = criticality ))
283
284
284
285
return dq_rule_checks
285
286
286
287
287
- def apply_checks_by_metadata_and_split (df : DataFrame , checks : List [dict ], glbs = None ) -> Tuple [DataFrame , DataFrame ]:
288
+ def apply_checks_by_metadata_and_split (
289
+ df : DataFrame , checks : list [dict ], glbs : dict [str , Any ] | None = None
290
+ ) -> tuple [DataFrame , DataFrame ]:
288
291
"""Wrapper around `apply_checks_and_split` for use in the metadata-driven pipelines. The main difference
289
292
is how the checks are specified - instead of using functions directly, they are described as function name plus
290
293
arguments.
@@ -308,7 +311,7 @@ def apply_checks_by_metadata_and_split(df: DataFrame, checks: List[dict], glbs=N
308
311
return good_df , bad_df
309
312
310
313
311
- def apply_checks_by_metadata (df : DataFrame , checks : List [dict ], glbs = None ) -> DataFrame :
314
+ def apply_checks_by_metadata (df : DataFrame , checks : list [dict ], glbs : dict [ str , Any ] | None = None ) -> DataFrame :
312
315
"""Wrapper around `apply_checks` for use in the metadata-driven pipelines. The main difference
313
316
is how the checks are specified - instead of using functions directly, they are described as function name plus
314
317
arguments.
@@ -329,7 +332,7 @@ def apply_checks_by_metadata(df: DataFrame, checks: List[dict], glbs=None) -> Da
329
332
return apply_checks (df , dq_rule_checks )
330
333
331
334
332
- def build_checks (* rules_col_set : DQRuleColSet ) -> List [DQRule ]:
335
+ def build_checks (* rules_col_set : DQRuleColSet ) -> list [DQRule ]:
333
336
"""
334
337
Build rules from dq rules and rule sets.
335
338
0 commit comments