1
1
import json
2
2
import re
3
- from typing import Union
4
3
5
4
from databricks .labs .dqx .profiler .common import val_to_str
6
5
from databricks .labs .dqx .profiler .profiler import DQRule
7
6
8
7
__name_sanitize_re__ = re .compile (r"[^a-zA-Z0-9]+" )
9
8
10
9
11
- def dlt_generate_is_in (cl , ** params : dict ):
10
+ def dlt_generate_is_in (col_name , ** params : dict ):
12
11
in_str = ", " .join ([val_to_str (v ) for v in params ["in" ]])
13
- return f"{ cl } in ({ in_str } )"
12
+ return f"{ col_name } in ({ in_str } )"
14
13
15
14
16
- def dlt_generate_min_max (cl , ** params : dict ):
17
- mn = params .get ("min" )
18
- mx = params .get ("max" )
19
- if mn is not None and mx is not None :
15
+ def dlt_generate_min_max (col_name , ** params : dict ):
16
+ min_limit = params .get ("min" )
17
+ max_limit = params .get ("max" )
18
+ if min_limit is not None and max_limit is not None :
20
19
# We can generate `col between(min, max)`, but this one is easier to modify if you need to remove some of the bounds
21
- return f"{ cl } >= { val_to_str (mn )} and { cl } <= { val_to_str (mx )} "
22
- elif mx is not None :
23
- return f"{ cl } <= { val_to_str (mx )} "
24
- elif mn is not None :
25
- return f"{ cl } >= { val_to_str (mn )} "
20
+ return f"{ col_name } >= { val_to_str (min_limit )} and { col_name } <= { val_to_str (max_limit )} "
21
+
22
+ if max_limit is not None :
23
+ return f"{ col_name } <= { val_to_str (max_limit )} "
24
+
25
+ if min_limit is not None :
26
+ return f"{ col_name } >= { val_to_str (min_limit )} "
26
27
27
28
return ""
28
29
29
30
30
- def dlt_generate_is_not_null_or_empty (cl , ** params : dict ):
31
+ def dlt_generate_is_not_null_or_empty (col_name , ** params : dict ):
31
32
trim_strings = params .get ("trim_strings" , True )
32
- s = f"{ cl } is not null and "
33
+ msg = f"{ col_name } is not null and "
33
34
if trim_strings :
34
- s += "trim("
35
- s += cl
35
+ msg += "trim("
36
+ msg += col_name
36
37
if trim_strings :
37
- s += ")"
38
- s += " <> ''"
39
- return s
38
+ msg += ")"
39
+ msg += " <> ''"
40
+ return msg
40
41
41
42
42
43
dlt_mapping = {
43
- "is_not_null" : lambda cl , ** params : f"{ cl } is not null" ,
44
+ "is_not_null" : lambda col_name , ** params : f"{ col_name } is not null" ,
44
45
"is_in" : dlt_generate_is_in ,
45
46
"min_max" : dlt_generate_min_max ,
46
47
"is_not_null_or_empty" : dlt_generate_is_not_null_or_empty ,
@@ -53,34 +54,34 @@ def generate_dlt_rules_python(rules: list[DQRule], action: str | None = None) ->
53
54
54
55
expectations = {}
55
56
for rule in rules :
56
- nm = rule .name
57
- cl = rule .column
57
+ rule_name = rule .name
58
+ col_name = rule .column
58
59
params = rule .parameters or {}
59
- if nm not in dlt_mapping :
60
- print (f"No rule '{ nm } ' for column '{ cl } '. skipping..." )
60
+ if rule_name not in dlt_mapping :
61
+ print (f"No rule '{ rule_name } ' for column '{ col_name } '. skipping..." )
61
62
continue
62
- expr = dlt_mapping [nm ]( cl , ** params )
63
+ expr = dlt_mapping [rule_name ]( col_name , ** params )
63
64
if expr == "" :
64
65
print ("Empty expression was generated for rule '{nm}' for column '{cl}'" )
65
66
continue
66
- exp_name = re .sub (__name_sanitize_re__ , "_" , f"{ cl } _{ nm } " )
67
+ exp_name = re .sub (__name_sanitize_re__ , "_" , f"{ col_name } _{ rule_name } " )
67
68
expectations [exp_name ] = expr
68
69
69
70
if len (expectations ) == 0 :
70
71
return ""
71
72
72
- t = json .dumps (expectations )
73
+ json_expectations = json .dumps (expectations )
73
74
if action == "drop" :
74
75
exp_str = f"""@dlt.expect_all_or_drop(
75
- { t }
76
+ { json_expectations }
76
77
)"""
77
78
elif action == "fail" :
78
79
exp_str = f"""@dlt.expect_all_or_fail(
79
- { t }
80
+ { json_expectations }
80
81
)"""
81
82
else :
82
83
exp_str = f"""@dlt.expect_all(
83
- { t }
84
+ { json_expectations }
84
85
)"""
85
86
return exp_str
86
87
@@ -96,28 +97,30 @@ def generate_dlt_rules_sql(rules: list[DQRule], action: str | None = None) -> li
96
97
elif action == "fail" :
97
98
act_str = " ON VIOLATION FAIL UPDATE"
98
99
for rule in rules :
99
- nm = rule .name
100
- cl = rule .column
100
+ rule_name = rule .name
101
+ col_name = rule .column
101
102
params = rule .parameters or {}
102
- if nm not in dlt_mapping :
103
- print (f"No rule '{ nm } ' for column '{ cl } '. skipping..." )
103
+ if rule_name not in dlt_mapping :
104
+ print (f"No rule '{ rule_name } ' for column '{ col_name } '. skipping..." )
104
105
continue
105
- expr = dlt_mapping [nm ]( cl , ** params )
106
+ expr = dlt_mapping [rule_name ]( col_name , ** params )
106
107
if expr == "" :
107
108
print ("Empty expression was generated for rule '{nm}' for column '{cl}'" )
108
109
continue
109
110
# TODO: generate constraint name in lower_case, etc.
110
- dlt_rule = f"CONSTRAINT { cl } _{ nm } EXPECT ({ expr } ){ act_str } "
111
+ dlt_rule = f"CONSTRAINT { col_name } _{ rule_name } EXPECT ({ expr } ){ act_str } "
111
112
dlt_rules .append (dlt_rule )
112
113
113
114
return dlt_rules
114
115
115
116
116
- def generate_dlt_rules (rules : list [DQRule ], action : str | None = None , language : str = "SQL" ) -> Union [ list [str ], str ] :
117
+ def generate_dlt_rules (rules : list [DQRule ], action : str | None = None , language : str = "SQL" ) -> list [str ] | str :
117
118
lang = language .lower ()
119
+
118
120
if lang == "sql" :
119
121
return generate_dlt_rules_sql (rules , action )
120
- elif lang == "python" :
122
+
123
+ if lang == "python" :
121
124
return generate_dlt_rules_python (rules , action )
122
- else :
123
- raise Exception (f"Unsupported language '{ language } '" )
125
+
126
+ raise ValueError (f"Unsupported language '{ language } '" )
0 commit comments