|
5 | 5 | import pytest
|
6 | 6 | from pyspark.sql import Column
|
7 | 7 | from chispa.dataframe_comparer import assert_df_equality # type: ignore
|
8 |
| -from databricks.labs.dqx.col_functions import is_not_null_and_not_empty, make_condition |
| 8 | +from databricks.labs.dqx.col_functions import ( |
| 9 | + is_not_null_and_not_empty, |
| 10 | + make_condition, |
| 11 | + sql_expression, |
| 12 | + regex_match, |
| 13 | + is_unique, |
| 14 | + is_older_than_col2_for_n_days, |
| 15 | + is_older_than_n_days, |
| 16 | + is_not_in_near_future, |
| 17 | + is_not_in_future, |
| 18 | + is_valid_timestamp, |
| 19 | + is_valid_date, |
| 20 | + is_not_greater_than, |
| 21 | + is_not_less_than, |
| 22 | + is_not_in_range, |
| 23 | + is_in_range, |
| 24 | + is_not_null_and_not_empty_array, |
| 25 | + is_not_null_and_is_in_list, |
| 26 | + is_in_list, |
| 27 | + is_not_empty, |
| 28 | + is_not_null, |
| 29 | +) |
9 | 30 | from databricks.labs.dqx.engine import (
|
10 | 31 | DQEngine,
|
11 | 32 | ExtraParams,
|
12 | 33 | )
|
13 | 34 | from databricks.labs.dqx.rule import DQRule, DQRuleColSet, ColumnArguments
|
14 | 35 | from databricks.labs.dqx.schema import validation_result_schema
|
15 | 36 |
|
16 |
| - |
17 | 37 | SCHEMA = "a: int, b: int, c: int"
|
18 | 38 | REPORTING_COLUMNS = (
|
19 | 39 | f", _errors: {validation_result_schema.simpleString()}, _warnings: {validation_result_schema.simpleString()}"
|
@@ -1866,3 +1886,106 @@ def test_apply_checks_all_checks_as_yaml(ws, spark):
|
1866 | 1886 | expected_schema,
|
1867 | 1887 | )
|
1868 | 1888 | assert_df_equality(checked, expected, ignore_nullable=True)
|
| 1889 | + |
| 1890 | + |
| 1891 | +def test_apply_checks_all_checks_using_classes(ws, spark): |
| 1892 | + checks = [ |
| 1893 | + DQRule(criticality="error", check=is_not_null("col1")), |
| 1894 | + DQRule(criticality="error", check=is_not_empty("col1")), |
| 1895 | + DQRule(criticality="error", check=is_not_null_and_not_empty("col1", trim_strings=True)), |
| 1896 | + DQRule(criticality="error", check=is_in_list("col2", [1, 2, 3])), |
| 1897 | + DQRule(criticality="error", check=is_not_null_and_is_in_list("col2", [1, 2, 3])), |
| 1898 | + DQRule(criticality="error", check=is_not_null_and_not_empty_array("col4")), |
| 1899 | + DQRule(criticality="error", check=is_in_range("col2", min_limit=1, max_limit=10)), |
| 1900 | + DQRule( |
| 1901 | + criticality="error", |
| 1902 | + check=is_in_range("col5", min_limit=datetime(2025, 1, 1).date(), max_limit=datetime(2025, 2, 24).date()), |
| 1903 | + ), |
| 1904 | + DQRule( |
| 1905 | + criticality="error", |
| 1906 | + check=is_in_range( |
| 1907 | + "col6", min_limit=datetime(2025, 1, 1, 0, 0, 0), max_limit=datetime(2025, 2, 24, 1, 0, 0) |
| 1908 | + ), |
| 1909 | + ), |
| 1910 | + DQRule(criticality="error", check=is_in_range("col3", min_limit="col2", max_limit="col2 * 2")), |
| 1911 | + DQRule(criticality="error", check=is_not_in_range("col2", min_limit=11, max_limit=20)), |
| 1912 | + DQRule( |
| 1913 | + criticality="error", |
| 1914 | + check=is_not_in_range( |
| 1915 | + "col5", min_limit=datetime(2025, 2, 25).date(), max_limit=datetime(2025, 2, 26).date() |
| 1916 | + ), |
| 1917 | + ), |
| 1918 | + DQRule( |
| 1919 | + criticality="error", |
| 1920 | + check=is_not_in_range( |
| 1921 | + "col6", min_limit=datetime(2025, 2, 25, 0, 0, 0), max_limit=datetime(2025, 2, 26, 1, 0, 0) |
| 1922 | + ), |
| 1923 | + ), |
| 1924 | + DQRule(criticality="error", check=is_not_in_range("col3", min_limit="col2 + 10", max_limit="col2 * 10")), |
| 1925 | + DQRule(criticality="error", check=is_not_less_than("col2", limit=0)), |
| 1926 | + DQRule(criticality="error", check=is_not_less_than("col5", limit=datetime(2025, 1, 1).date())), |
| 1927 | + DQRule(criticality="error", check=is_not_less_than("col6", limit=datetime(2025, 1, 1, 1, 0, 0))), |
| 1928 | + DQRule(criticality="error", check=is_not_less_than("col3", limit="col2 - 10")), |
| 1929 | + DQRule(criticality="error", check=is_not_greater_than("col2", limit=10)), |
| 1930 | + DQRule(criticality="error", check=is_not_greater_than("col5", limit=datetime(2025, 3, 1).date())), |
| 1931 | + DQRule(criticality="error", check=is_not_greater_than("col6", limit=datetime(2025, 3, 24, 1, 0, 0))), |
| 1932 | + DQRule(criticality="error", check=is_not_greater_than("col3", limit="col2 + 10")), |
| 1933 | + DQRule(criticality="error", check=is_valid_date("col5")), |
| 1934 | + DQRule( |
| 1935 | + criticality="error", check=is_valid_date("col5", date_format="yyyy-MM-dd"), name="col5_is_not_valid_date2" |
| 1936 | + ), |
| 1937 | + DQRule(criticality="error", check=is_valid_timestamp("col6")), |
| 1938 | + DQRule( |
| 1939 | + criticality="error", |
| 1940 | + check=is_valid_timestamp("col6", timestamp_format="yyyy-MM-dd HH:mm:ss"), |
| 1941 | + name="col6_is_not_valid_timestamp2", |
| 1942 | + ), |
| 1943 | + DQRule(criticality="error", check=is_not_in_future("col6", offset=86400)), |
| 1944 | + DQRule(criticality="error", check=is_not_in_near_future("col6", offset=36400)), |
| 1945 | + DQRule(criticality="error", check=is_older_than_n_days("col5", days=10000)), |
| 1946 | + DQRule(criticality="error", check=is_older_than_col2_for_n_days("col5", "col6", days=2)), |
| 1947 | + DQRule(criticality="error", check=is_unique("col1")), |
| 1948 | + DQRule( |
| 1949 | + criticality="error", |
| 1950 | + name="col1_is_not_unique2", |
| 1951 | + # provide default value for NULL in the time column of the window spec using coalesce() |
| 1952 | + # to prevent rows exclusion! |
| 1953 | + check=is_unique( |
| 1954 | + "col1", window_spec=F.window(F.coalesce(F.col("col6"), F.lit(datetime(1970, 1, 1))), "10 minutes") |
| 1955 | + ), |
| 1956 | + ), |
| 1957 | + DQRule(criticality="error", check=regex_match("col2", regex="[0-9]+", negate=False)), |
| 1958 | + DQRule( |
| 1959 | + criticality="error", |
| 1960 | + check=sql_expression( |
| 1961 | + expression="col3 > col2 and col3 < 10", |
| 1962 | + msg="col3 is greater than col2 and col3 less than 10", |
| 1963 | + name="custom_output_name", |
| 1964 | + negate=False, |
| 1965 | + ), |
| 1966 | + ), |
| 1967 | + ] |
| 1968 | + dq_engine = DQEngine(ws) |
| 1969 | + |
| 1970 | + schema = "col1: string, col2: int, col3: int, col4 array<int>, col5: date, col6: timestamp" |
| 1971 | + test_df = spark.createDataFrame( |
| 1972 | + [ |
| 1973 | + ["val1", 1, 1, [1], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 1, 0, 0)], |
| 1974 | + ["val2", 2, 2, [2], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 2, 0, 0)], |
| 1975 | + ["val3", 3, 3, [3], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 3, 0, 0)], |
| 1976 | + ], |
| 1977 | + schema, |
| 1978 | + ) |
| 1979 | + |
| 1980 | + checked = dq_engine.apply_checks(test_df, checks) |
| 1981 | + |
| 1982 | + expected_schema = schema + REPORTING_COLUMNS |
| 1983 | + expected = spark.createDataFrame( |
| 1984 | + [ |
| 1985 | + ["val1", 1, 1, [1], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 1, 0, 0), None, None], |
| 1986 | + ["val2", 2, 2, [2], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 2, 0, 0), None, None], |
| 1987 | + ["val3", 3, 3, [3], datetime(2025, 1, 2).date(), datetime(2025, 1, 2, 3, 0, 0), None, None], |
| 1988 | + ], |
| 1989 | + expected_schema, |
| 1990 | + ) |
| 1991 | + assert_df_equality(checked, expected, ignore_nullable=True) |
0 commit comments