Skip to content

Commit f0a1475

Browse files
authored
Merge branch 'main' into integral_type_conversion
2 parents e1cf9a5 + d7cc651 commit f0a1475

File tree

7 files changed

+106
-74
lines changed

7 files changed

+106
-74
lines changed

demos/dqx_demo_library.py

+35-6
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
from databricks.labs.dqx.engine import DQEngine
7676
from databricks.sdk import WorkspaceClient
7777

78-
input_df = spark.createDataFrame([[1, 3, 3, 2], [2, 3, None, 1]], schema)
78+
input_df = spark.createDataFrame([[1, 3, 3, 2], [3, 3, None, 1]], schema)
7979

8080
# load checks
8181
dq_engine = DQEngine(WorkspaceClient())
@@ -263,7 +263,7 @@
263263
- check:
264264
function: sql_expression
265265
arguments:
266-
expression: pickup_datetime > dropoff_datetime
266+
expression: pickup_datetime <= dropoff_datetime
267267
msg: pickup time must not be greater than dropff time
268268
name: pickup_datetime_greater_than_dropoff_datetime
269269
criticality: error
@@ -321,7 +321,33 @@ def ends_with_foo(col_name: str) -> Column:
321321
# COMMAND ----------
322322

323323
# MAGIC %md
324-
# MAGIC ### Applying custom check function
324+
# MAGIC ### Applying custom check function using DQX classes
325+
326+
# COMMAND ----------
327+
328+
from databricks.labs.dqx.engine import DQEngine
329+
from databricks.sdk import WorkspaceClient
330+
from databricks.labs.dqx.col_functions import *
331+
332+
# use built-in, custom and sql expression checks
333+
checks = [
334+
DQRule(criticality="error", check=is_not_null_and_not_empty("col1")),
335+
DQRule(criticality="warn", check=ends_with_foo("col1")),
336+
DQRule(criticality="warn", check=sql_expression("col1 not like 'str%'", msg="col1 starts with 'str'")),
337+
]
338+
339+
schema = "col1: string, col2: string"
340+
input_df = spark.createDataFrame([[None, "foo"], ["foo", None], [None, None]], schema)
341+
342+
dq_engine = DQEngine(WorkspaceClient())
343+
344+
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
345+
display(valid_and_quarantined_df)
346+
347+
# COMMAND ----------
348+
349+
# MAGIC %md
350+
# MAGIC ### Applying custom check function using YAML definition
325351

326352
# COMMAND ----------
327353

@@ -347,7 +373,7 @@ def ends_with_foo(col_name: str) -> Column:
347373
check:
348374
function: sql_expression
349375
arguments:
350-
expression: col1 LIKE 'str%'
376+
expression: col1 not like 'str%'
351377
msg: col1 starts with 'str'
352378
"""
353379
)
@@ -358,8 +384,11 @@ def ends_with_foo(col_name: str) -> Column:
358384
dq_engine = DQEngine(WorkspaceClient())
359385

360386
custom_check_functions = {"ends_with_foo": ends_with_foo}
361-
# or include all functions with globals() for simplicity
362-
#custom_check_functions=globals()
387+
# alternatively, you can also use globals to include all available functions
388+
#custom_check_functions = globals()
389+
390+
status = dq_engine.validate_checks(checks, custom_check_functions)
391+
assert not status.has_errors
363392

364393
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, custom_check_functions)
365394
display(valid_and_quarantined_df)

demos/dqx_demo_tool.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@
143143
- check:
144144
function: sql_expression
145145
arguments:
146-
expression: pickup_datetime > dropoff_datetime
146+
expression: pickup_datetime <= dropoff_datetime
147147
msg: pickup time must not be greater than dropff time
148148
name: pickup_datetime_greater_than_dropoff_datetime
149149
criticality: error

docs/dqx/docs/reference/quality_rules.mdx

+26-26
Large diffs are not rendered by default.

src/databricks/labs/dqx/col_functions.py

+12-13
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,15 @@ def sql_expression(expression: str, msg: str | None = None, name: str | None = N
130130
expression_msg = expression
131131

132132
if negate:
133-
expr_col = ~expr_col
134133
expression_msg = "~(" + expression + ")"
134+
message = F.concat_ws("", F.lit(f"Value is matching expression: {expression_msg}"))
135+
else:
136+
expr_col = ~expr_col
137+
message = F.concat_ws("", F.lit(f"Value is not matching expression: {expression_msg}"))
135138

136139
name = name if name else re.sub(normalize_regex, "_", expression)
137140

138-
if msg:
139-
return make_condition(expr_col, msg, name)
140-
return make_condition(expr_col, F.concat_ws("", F.lit(f"Value matches expression: {expression_msg}")), name)
141+
return make_condition(expr_col, msg or message, name)
141142

142143

143144
def is_older_than_col2_for_n_days(col_name1: str, col_name2: str, days: int = 0) -> Column:
@@ -256,7 +257,7 @@ def is_not_less_than(
256257
"""Checks whether the values in the input column are not less than the provided limit.
257258
258259
:param col_name: column name
259-
:param limit: limit to use in the condition as number, date, timestamp, column name or expression
260+
:param limit: limit to use in the condition as number, date, timestamp, column name or sql expression
260261
:return: new Column
261262
"""
262263
limit_expr = _get_column_expr_limit(limit)
@@ -275,7 +276,7 @@ def is_not_greater_than(
275276
"""Checks whether the values in the input column are not greater than the provided limit.
276277
277278
:param col_name: column name
278-
:param limit: limit to use in the condition as number, date, timestamp, column name or expression
279+
:param limit: limit to use in the condition as number, date, timestamp, column name or sql expression
279280
:return: new Column
280281
"""
281282
limit_expr = _get_column_expr_limit(limit)
@@ -296,8 +297,8 @@ def is_in_range(
296297
"""Checks whether the values in the input column are in the provided limits (inclusive of both boundaries).
297298
298299
:param col_name: column name
299-
:param min_limit: min limit to use in the condition as number, date, timestamp, column name or expression
300-
:param max_limit: max limit to use in the condition as number, date, timestamp, column name or expression
300+
:param min_limit: min limit to use in the condition as number, date, timestamp, column name or sql expression
301+
:param max_limit: max limit to use in the condition as number, date, timestamp, column name or sql expression
301302
:return: new Column
302303
"""
303304
min_limit_expr = _get_column_expr_limit(min_limit)
@@ -329,8 +330,8 @@ def is_not_in_range(
329330
"""Checks whether the values in the input column are outside the provided limits (inclusive of both boundaries).
330331
331332
:param col_name: column name
332-
:param min_limit: min limit to use in the condition as number, date, timestamp, column name or expression
333-
:param max_limit: min limit to use in the condition as number, date, timestamp, column name or expression
333+
:param min_limit: min limit to use in the condition as number, date, timestamp, column name or sql expression
334+
:param max_limit: min limit to use in the condition as number, date, timestamp, column name or sql expression
334335
:return: new Column
335336
"""
336337
min_limit_expr = _get_column_expr_limit(min_limit)
@@ -364,11 +365,9 @@ def regex_match(col_name: str, regex: str, negate: bool = False) -> Column:
364365
"""
365366
if negate:
366367
condition = F.col(col_name).rlike(regex)
367-
368368
return make_condition(condition, f"Column {col_name} is matching regex", f"{col_name}_matching_regex")
369369

370370
condition = ~F.col(col_name).rlike(regex)
371-
372371
return make_condition(condition, f"Column {col_name} is not matching regex", f"{col_name}_not_matching_regex")
373372

374373

@@ -460,7 +459,7 @@ def _get_column_expr_limit(
460459
) -> Column:
461460
"""Helper function to generate a column expression limit based on the provided limit value.
462461
463-
:param limit: limit to use in the condition (literal value or Column expression)
462+
:param limit: limit to use in the condition (literal value or column expression)
464463
:return: column expression.
465464
:raises ValueError: if limit is not provided.
466465
"""

tests/conftest.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ def checks_yml_content():
3232
check:
3333
function: sql_expression
3434
arguments:
35-
expression: col1 like "Team %"
35+
expression: col1 not like "Team %"
3636
- criticality: error
3737
check:
3838
function: sql_expression
3939
arguments:
40-
expression: col2 like 'Team %'
40+
expression: col2 not like 'Team %'
4141
"""
4242

4343

@@ -76,11 +76,11 @@ def checks_json_content():
7676
},
7777
{
7878
"criticality": "error",
79-
"check": {"function": "sql_expression", "arguments": {"expression": "col1 like \\"Team %\\""}}
79+
"check": {"function": "sql_expression", "arguments": {"expression": "col1 not like \\"Team %\\""}}
8080
},
8181
{
8282
"criticality": "error",
83-
"check": {"function": "sql_expression", "arguments": {"expression": "col2 like 'Team %'"}}
83+
"check": {"function": "sql_expression", "arguments": {"expression": "col2 not like 'Team %'"}}
8484
}
8585
]
8686
"""
@@ -134,11 +134,11 @@ def expected_checks():
134134
},
135135
{
136136
"criticality": "error",
137-
"check": {"function": "sql_expression", "arguments": {"expression": "col1 like \"Team %\""}},
137+
"check": {"function": "sql_expression", "arguments": {"expression": "col1 not like \"Team %\""}},
138138
},
139139
{
140140
"criticality": "error",
141-
"check": {"function": "sql_expression", "arguments": {"expression": "col2 like 'Team %'"}},
141+
"check": {"function": "sql_expression", "arguments": {"expression": "col2 not like 'Team %'"}},
142142
},
143143
]
144144

tests/integration/test_apply_checks.py

+8-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from datetime import datetime
2-
32
import yaml
43
import pyspark.sql.functions as F
54
import pytest
@@ -747,11 +746,11 @@ def test_apply_checks_with_sql_expression(ws, spark):
747746
checks = [
748747
{
749748
"criticality": "error",
750-
"check": {"function": "sql_expression", "arguments": {"expression": "col1 like \"val%\""}},
749+
"check": {"function": "sql_expression", "arguments": {"expression": "col1 not like \"val%\""}},
751750
},
752751
{
753752
"criticality": "error",
754-
"check": {"function": "sql_expression", "arguments": {"expression": "col2 like 'val%'"}},
753+
"check": {"function": "sql_expression", "arguments": {"expression": "col2 not like 'val%'"}},
755754
},
756755
]
757756

@@ -765,8 +764,8 @@ def test_apply_checks_with_sql_expression(ws, spark):
765764
"val1",
766765
"val2",
767766
{
768-
"col_col1_like_val_": "Value matches expression: col1 like \"val%\"",
769-
"col_col2_like_val_": "Value matches expression: col2 like 'val%'",
767+
"col_col1_not_like_val_": "Value is not matching expression: col1 not like \"val%\"",
768+
"col_col2_not_like_val_": "Value is not matching expression: col2 not like 'val%'",
770769
},
771770
None,
772771
],
@@ -1081,8 +1080,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark):
10811080
check:
10821081
function: sql_expression
10831082
arguments:
1084-
expression: col3 > col2 and col3 < 10
1085-
msg: col3 is greater than col2 and col3 less than 10
1083+
expression: col3 >= col2 and col3 <= 10
1084+
msg: ol3 is less than col2 and col3 is greater than 10
10861085
name: custom_output_name
10871086
negate: false
10881087
"""
@@ -1186,8 +1185,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark):
11861185
DQRule(
11871186
criticality="error",
11881187
check=sql_expression(
1189-
expression="col3 > col2 and col3 < 10",
1190-
msg="col3 is greater than col2 and col3 less than 10",
1188+
expression="col3 >= col2 and col3 <= 10",
1189+
msg="col3 is less than col2 and col3 is greater than 10",
11911190
name="custom_output_name",
11921191
negate=False,
11931192
),

tests/integration/test_col_functions.py

+18-13
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
from decimal import Decimal
33
import pyspark.sql.functions as F
44
from chispa.dataframe_comparer import assert_df_equality # type: ignore
5-
6-
75
from databricks.labs.dqx.col_functions import (
86
is_in_range,
97
is_not_empty,
@@ -104,21 +102,21 @@ def test_col_is_not_in_list(spark):
104102

105103

106104
def test_col_sql_expression(spark):
107-
test_df = spark.createDataFrame([["str1", 1, 1], ["str2", None, None], ["", 3, 2]], SCHEMA + ", c: string")
105+
test_df = spark.createDataFrame([["str1", 1, 1], ["str2", None, None], ["", 2, 3]], SCHEMA + ", c: string")
108106

109107
actual = test_df.select(
110-
sql_expression("a = 'str1'"),
111-
sql_expression("b is not null", name="test", negate=True),
112-
sql_expression("c is not null", msg="failed validation", negate=True),
113-
sql_expression("b > c", msg="b is not greater than c", negate=True),
108+
sql_expression("a = 'str2'"),
109+
sql_expression("b is null", name="test", negate=True),
110+
sql_expression("c is null", msg="failed validation", negate=True),
111+
sql_expression("b < c", msg="b is greater or equal c", negate=False),
114112
)
115113

116-
checked_schema = "a_str1_: string, test: string, c_is_not_null: string, b_c: string"
114+
checked_schema = "a_str2_: string, test: string, c_is_null: string, b_c: string"
117115
expected = spark.createDataFrame(
118116
[
119-
["Value matches expression: a = 'str1'", None, None, "b is not greater than c"],
120-
[None, "Value matches expression: ~(b is not null)", "failed validation", None],
121-
[None, None, None, None],
117+
["Value is not matching expression: a = 'str2'", None, None, "b is greater or equal c"],
118+
[None, "Value is matching expression: ~(b is null)", "failed validation", None],
119+
["Value is not matching expression: a = 'str2'", None, None, None],
122120
],
123121
checked_schema,
124122
)
@@ -497,7 +495,10 @@ def test_col_is_not_in_near_future_cur(spark):
497495

498496

499497
def test_col_is_not_null_and_not_empty_array(spark):
500-
schema_array = "str_col: array<string>, int_col: array<int> , timestamp_col: array<timestamp>, date_col: array<string>, struct_col: array<struct<a: string, b: int>>"
498+
schema_array = (
499+
"str_col: array<string>, int_col: array<int> , timestamp_col: array<timestamp>, "
500+
"date_col: array<string>, struct_col: array<struct<a: string, b: int>>"
501+
)
501502
data = [
502503
(
503504
["a", "b", None],
@@ -527,7 +528,11 @@ def test_col_is_not_null_and_not_empty_array(spark):
527528
is_not_null_and_not_empty_array("struct_col"),
528529
)
529530

530-
checked_schema = "str_col_is_null_or_empty_array: string, int_col_is_null_or_empty_array: string, timestamp_col_is_null_or_empty_array: string, date_col_is_null_or_empty_array: string, struct_col_is_null_or_empty_array: string"
531+
checked_schema = (
532+
"str_col_is_null_or_empty_array: string, int_col_is_null_or_empty_array: string, "
533+
"timestamp_col_is_null_or_empty_array: string, date_col_is_null_or_empty_array: string, "
534+
"struct_col_is_null_or_empty_array: string"
535+
)
531536
# Create the data
532537
checked_data = [
533538
(None, None, None, None, None),

0 commit comments

Comments
 (0)