Skip to content

Commit fd7b1dc

Browse files
committed
fix: add run_time to DQEngine extra_params
1 parent 553e02d commit fd7b1dc

File tree

7 files changed

+439
-411
lines changed

7 files changed

+439
-411
lines changed

src/databricks/labs/dqx/engine.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from datetime import datetime
21
import copy
32
import logging
43
import os
@@ -55,6 +54,8 @@ def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams
5554
),
5655
}
5756

57+
self.run_time = extra_params.run_time
58+
5859
def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame:
5960
if not checks:
6061
return self._append_empty_checks(df)
@@ -254,8 +255,7 @@ def _append_empty_checks(self, df: DataFrame) -> DataFrame:
254255
F.lit(None).cast(validation_result_schema).alias(self._column_names[ColumnArguments.WARNINGS]),
255256
)
256257

257-
@staticmethod
258-
def _create_results_map(df: DataFrame, checks: list[DQRule], dest_col: str) -> DataFrame:
258+
def _create_results_map(self, df: DataFrame, checks: list[DQRule], dest_col: str) -> DataFrame:
259259
""" ""Create a map from the values of the specified columns, using the column names as a key. This function is
260260
used to collect individual check columns into corresponding errors and/or warnings columns.
261261
@@ -267,7 +267,6 @@ def _create_results_map(df: DataFrame, checks: list[DQRule], dest_col: str) -> D
267267

268268
if len(checks) == 0:
269269
return df.select("*", empty_type)
270-
current_date = datetime.now()
271270
check_cols = []
272271
for check in checks:
273272
result = F.struct(
@@ -276,7 +275,7 @@ def _create_results_map(df: DataFrame, checks: list[DQRule], dest_col: str) -> D
276275
F.lit(check.col_name).alias("col_name"),
277276
F.lit(check.filter or None).cast("string").alias("filter"),
278277
F.lit(check.check_func.__name__).alias("function"),
279-
F.lit(current_date).alias("run_time"),
278+
F.lit(self.run_time).alias("run_time"),
280279
)
281280
check_cols.append(result)
282281

src/databricks/labs/dqx/rule.py

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import functools as ft
44
from typing import Any
55
from collections.abc import Callable
6+
from datetime import datetime
67
from pyspark.sql import Column
78
import pyspark.sql.functions as F
89
from databricks.labs.dqx.utils import get_column_name
@@ -34,6 +35,7 @@ class ExtraParams:
3435
"""Class to represent extra parameters for DQEngine."""
3536

3637
column_names: dict[str, str] = field(default_factory=dict)
38+
run_time: datetime = datetime.now()
3739

3840

3941
@dataclass(frozen=True)

tests/conftest.py

-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import os
2-
import datetime as dt
3-
from unittest.mock import patch
42
import pytest
53

64
from databricks.labs.pytester.fixtures.baseline import factory
@@ -274,11 +272,3 @@ def delete(workspace_file_path: str) -> None:
274272
ws.workspace.delete(workspace_file_path)
275273

276274
yield from factory("file", create, delete)
277-
278-
279-
@pytest.fixture
280-
def run_time_date():
281-
run_datetime = dt.datetime(2025, 1, 1, 0, 0, 0, 0)
282-
with patch("databricks.labs.dqx.engine.datetime") as mock_date: # pylint: disable=explicit-dependency-required
283-
mock_date.now.return_value = run_datetime
284-
yield run_datetime

tests/integration/conftest.py

-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import os
22
import logging
3-
import datetime as dt
43

54
from collections.abc import Callable, Generator
65
from functools import cached_property
@@ -185,14 +184,6 @@ def installation_ctx(
185184
ctx.workspace_installation.uninstall()
186185

187186

188-
@pytest.fixture
189-
def run_time_date():
190-
run_datetime = dt.datetime(2025, 1, 1, 0, 0, 0, 0)
191-
with patch("databricks.labs.dqx.engine.datetime") as mock_date: # pylint: disable=explicit-dependency-required
192-
mock_date.now.return_value = run_datetime
193-
yield run_datetime
194-
195-
196187
@pytest.fixture
197188
def webbrowser_open():
198189
with patch("webbrowser.open") as mock_open:

0 commit comments

Comments
 (0)