Skip to content

Commit 5b06bd0

Browse files
authored
Merge branch 'main' into dependencies_update
2 parents 91aa7d1 + 3ca34dc commit 5b06bd0

22 files changed

+591
-147
lines changed

.github/scripts/setup_spark_remote.sh

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env bash
2+
3+
set -xve
4+
echo "Setting up spark-connect"
5+
6+
mkdir -p "$HOME"/spark
7+
cd "$HOME"/spark || exit 1
8+
9+
version=$(wget -O - https://dlcdn.apache.org/spark/ | grep 'href="spark' | grep -v 'preview' | sed 's:</a>:\n:g' | sed -n 's/.*>//p' | tr -d spark- | tr -d / | sort -r --version-sort | head -1)
10+
if [ -z "$version" ]; then
11+
echo "Failed to extract Spark version"
12+
exit 1
13+
fi
14+
15+
spark=spark-${version}-bin-hadoop3
16+
spark_connect="spark-connect_2.12"
17+
18+
mkdir -p "${spark}"
19+
20+
21+
SERVER_SCRIPT=$HOME/spark/${spark}/sbin/start-connect-server.sh
22+
23+
## check the spark version already exist, if not download the respective version
24+
if [ -f "${SERVER_SCRIPT}" ];then
25+
echo "Spark Version already exists"
26+
else
27+
if [ -f "${spark}.tgz" ];then
28+
echo "${spark}.tgz already exists"
29+
else
30+
wget "https://dlcdn.apache.org/spark/spark-${version}/${spark}.tgz"
31+
fi
32+
tar -xvf "${spark}.tgz"
33+
fi
34+
35+
cd "${spark}" || exit 1
36+
## check spark remote is running,if not start the spark remote
37+
result=$(${SERVER_SCRIPT} --packages org.apache.spark:${spark_connect}:"${version}" > "$HOME"/spark/log.out; echo $?)
38+
39+
if [ "$result" -ne 0 ]; then
40+
count=$(tail "${HOME}"/spark/log.out | grep -c "SparkConnectServer running as process")
41+
if [ "${count}" == "0" ]; then
42+
echo "Failed to start the server"
43+
exit 1
44+
fi
45+
# Wait for the server to start by pinging localhost:4040
46+
echo "Waiting for the server to start..."
47+
for i in {1..30}; do
48+
if nc -z localhost 4040; then
49+
echo "Server is up and running"
50+
break
51+
fi
52+
echo "Server not yet available, retrying in 5 seconds..."
53+
sleep 5
54+
done
55+
56+
if ! nc -z localhost 4040; then
57+
echo "Failed to start the server within the expected time"
58+
exit 1
59+
fi
60+
fi
61+
echo "Started the Server"

.github/workflows/acceptance.yml

-10
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,6 @@ jobs:
4242
- name: Install hatch
4343
run: pip install hatch==1.9.4
4444

45-
- name: Fetch relevant branches
46-
run: |
47-
git fetch origin $GITHUB_BASE_REF:$GITHUB_BASE_REF
48-
git fetch origin $GITHUB_HEAD_REF:$GITHUB_HEAD_REF
49-
5045
- name: Run integration tests
5146
uses: databrickslabs/sandbox/acceptance@acceptance/v0.4.2
5247
with:
@@ -82,11 +77,6 @@ jobs:
8277
- name: Install hatch
8378
run: pip install hatch==1.9.4
8479

85-
- name: Fetch relevant branches
86-
run: |
87-
git fetch origin $GITHUB_BASE_REF:$GITHUB_BASE_REF
88-
git fetch origin $GITHUB_HEAD_REF:$GITHUB_HEAD_REF
89-
9080
- name: Run integration tests on serverless cluster
9181
uses: databrickslabs/sandbox/acceptance@acceptance/v0.4.2
9282
with:

.github/workflows/push.yml

+6-1
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,15 @@ jobs:
3535
cache-dependency-path: '**/pyproject.toml'
3636
python-version: ${{ matrix.pyVersion }}
3737

38+
- name: Setup Spark Remote
39+
run: |
40+
pip install hatch==1.9.4
41+
make setup_spark_remote
42+
3843
- name: Run unit tests
3944
run: |
4045
pip install hatch==1.9.4
41-
make test
46+
make ci-test
4247
4348
- name: Publish test coverage
4449
uses: codecov/codecov-action@v5

Makefile

+6-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@ lint:
1717
fmt:
1818
hatch run fmt
1919

20-
test:
20+
ci-test:
2121
hatch run test
2222

2323
integration:
2424
hatch run integration
2525

26+
setup_spark_remote:
27+
.github/scripts/setup_spark_remote.sh
28+
29+
test: setup_spark_remote ci-test
30+
2631
coverage:
2732
hatch run coverage && open htmlcov/index.html
2833

demos/dqx_demo_library.py

+53-7
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
print(dlt_expectations)
6060

6161
# save generated checks in a workspace file
62-
user_name = spark.sql('select current_user() as user').collect()[0]['user']
62+
user_name = spark.sql("select current_user() as user").collect()[0]["user"]
6363
checks_file = f"/Workspace/Users/{user_name}/dqx_demo_checks.yml"
6464
dq_engine = DQEngine(ws)
6565
dq_engine.save_checks_in_workspace_file(checks, workspace_path=checks_file)
@@ -142,6 +142,13 @@
142142
arguments:
143143
col_name: col3
144144
145+
- criticality: error
146+
filter: col1 < 3
147+
check:
148+
function: is_not_null_and_not_empty
149+
arguments:
150+
col_name: col4
151+
145152
- criticality: warn
146153
check:
147154
function: value_is_in_list
@@ -186,12 +193,17 @@
186193
criticality="error",
187194
check_func=is_not_null).get_rules() + [
188195
DQRule( # define rule for a single column
189-
name='col3_is_null_or_empty',
190-
criticality='error',
191-
check=is_not_null_and_not_empty('col3')),
196+
name="col3_is_null_or_empty",
197+
criticality="error",
198+
check=is_not_null_and_not_empty("col3")),
199+
DQRule( # define rule with a filter
200+
name="col_4_is_null_or_empty",
201+
criticality="error",
202+
filter="col1 < 3",
203+
check=is_not_null_and_not_empty("col4")),
192204
DQRule( # name auto-generated if not provided
193-
criticality='warn',
194-
check=value_is_in_list('col4', ['1', '2']))
205+
criticality="warn",
206+
check=value_is_in_list("col4", ["1", "2"]))
195207
]
196208

197209
schema = "col1: int, col2: int, col3: int, col4 int"
@@ -344,4 +356,38 @@ def ends_with_foo(col_name: str) -> Column:
344356
dq_engine = DQEngine(WorkspaceClient())
345357

346358
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
347-
display(valid_and_quarantined_df)
359+
display(valid_and_quarantined_df)
360+
361+
# COMMAND ----------
362+
363+
# MAGIC %md
364+
# MAGIC ## Applying custom column names
365+
366+
# COMMAND ----------
367+
368+
from databricks.sdk import WorkspaceClient
369+
from databricks.labs.dqx.engine import (
370+
DQEngine,
371+
ExtraParams,
372+
DQRule
373+
)
374+
375+
from databricks.labs.dqx.col_functions import is_not_null_and_not_empty
376+
377+
# using ExtraParams class to configure the engine with custom column names
378+
extra_parameters = ExtraParams(column_names={"errors": "dq_errors", "warnings": "dq_warnings"})
379+
380+
ws = WorkspaceClient()
381+
dq_engine = DQEngine(ws, extra_params=extra_parameters)
382+
383+
schema = "col1: string"
384+
input_df = spark.createDataFrame([["str1"], ["foo"], ["str3"]], schema)
385+
386+
checks = [ DQRule(
387+
name="col_1_is_null_or_empty",
388+
criticality="error",
389+
check=is_not_null_and_not_empty("col1")),
390+
]
391+
392+
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
393+
display(valid_and_quarantined_df)

demos/dqx_demo_tool.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import glob
4646
import os
4747

48-
user_name = spark.sql('select current_user() as user').collect()[0]['user']
48+
user_name = spark.sql("select current_user() as user").collect()[0]["user"]
4949
dqx_wheel_files = glob.glob(f"/Workspace/Users/{user_name}/.dqx/wheels/databricks_labs_dqx-*.whl")
5050
dqx_latest_wheel = max(dqx_wheel_files, key=os.path.getctime)
5151
%pip install {dqx_latest_wheel}
@@ -210,7 +210,7 @@
210210
# COMMAND ----------
211211

212212
print(f"Saving quarantined data to {run_config.quarantine_table}")
213-
quarantine_catalog, quarantine_schema, _ = run_config.quarantine_table.split('.')
213+
quarantine_catalog, quarantine_schema, _ = run_config.quarantine_table.split(".")
214214

215215
spark.sql(f"CREATE CATALOG IF NOT EXISTS {quarantine_catalog}")
216216
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {quarantine_catalog}.{quarantine_schema}")

docs/dqx/docs/dev/contributing.mdx

+8-3
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,18 @@ Before every commit, apply the consistent formatting of the code, as we want our
8686
make fmt
8787
```
8888

89-
Before every commit, run automated bug detector (`make lint`) and unit tests (`make test`) to ensure that automated
90-
pull request checks do pass, before your code is reviewed by others:
89+
Before every commit, run automated bug detector and unit tests to ensure that automated
90+
pull request checks do pass, before your code is reviewed by others:
9191
```shell
9292
make lint
93+
make setup_spark_remote
9394
make test
9495
```
9596

97+
The command `make setup_spark_remote` sets up the environment for running unit tests and is required one time only.
98+
DQX uses Databricks Connect as a test dependency, which restricts the creation of a Spark session in local mode.
99+
To enable spark local execution for unit testing, the command install spark remote.
100+
96101
### Local setup for integration tests and code coverage
97102

98103
Note that integration tests and code coverage are run automatically when you create a Pull Request in Github.
@@ -215,7 +220,7 @@ Here are the example steps to submit your first contribution:
215220
7. `make fmt`
216221
8. `make lint`
217222
9. .. fix if any
218-
10. `make test` and `make integration`, optionally `make coverage` to get test coverage report
223+
10. `make setup_spark_remote`, make test` and `make integration`, optionally `make coverage` to get test coverage report
219224
11. .. fix if any issues
220225
12. `git commit -S -a -m "message"`
221226

docs/dqx/docs/guide.mdx

+50-14
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_D
5555
print(dlt_expectations)
5656
```
5757

58-
### Using CLI
58+
### Using CLI
5959

6060
You can optionally install DQX in the workspace, see the [Installation Guide](/docs/installation#dqx-installation-in-a-databricks-workspace).
6161
As part of the installation, a config, dashboards and profiler workflow is installed. The workflow can be run manually in the workspace UI or using the CLI as below.
@@ -116,7 +116,7 @@ print(status)
116116
```
117117

118118
Note that checks are validated automatically when applied as part of the
119-
`apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods
119+
`apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods
120120
(see [Quality rules defined as config](#quality-rules-defined-as-config)).
121121

122122
### Using CLI
@@ -178,7 +178,7 @@ checks = dq_engine.load_checks_from_installation(assume_user=True, run_config_na
178178
179179
input_df = spark.read.table("catalog1.schema1.table1")
180180
181-
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
181+
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
182182
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
183183
184184
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
@@ -198,7 +198,7 @@ checks = dq_engine.load_checks_from_workspace_file(workspace_path="/Shared/App1/
198198

199199
input_df = spark.read.table("catalog1.schema1.table1")
200200

201-
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
201+
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
202202
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
203203

204204
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
@@ -220,7 +220,7 @@ dq_engine = DQEngine(WorkspaceClient())
220220

221221
input_df = spark.read.table("catalog1.schema1.table1")
222222

223-
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
223+
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
224224
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
225225

226226
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
@@ -241,21 +241,26 @@ from databricks.sdk import WorkspaceClient
241241
dq_engine = DQEngine(WorkspaceClient())
242242

243243
checks = DQRuleColSet( # define rule for multiple columns at once
244-
columns=["col1", "col2"],
245-
criticality="error",
244+
columns=["col1", "col2"],
245+
criticality="error",
246246
check_func=is_not_null).get_rules() + [
247247
DQRule( # define rule for a single column
248-
name='col3_is_null_or_empty',
249-
criticality='error',
250-
check=is_not_null_and_not_empty('col3')),
248+
name="col3_is_null_or_empty",
249+
criticality="error",
250+
check=is_not_null_and_not_empty("col3")),
251+
DQRule( # define rule with a filter
252+
name="col_4_is_null_or_empty",
253+
criticality="error",
254+
filter="col1 < 3",
255+
check=is_not_null_and_not_empty("col4")),
251256
DQRule( # name auto-generated if not provided
252-
criticality='warn',
253-
check=value_is_in_list('col4', ['1', '2']))
257+
criticality="warn",
258+
check=value_is_in_list("col4", ["1", "2"]))
254259
]
255260

256261
input_df = spark.read.table("catalog1.schema1.table1")
257262

258-
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
263+
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
259264
valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)
260265

261266
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
@@ -288,6 +293,13 @@ checks = yaml.safe_load("""
288293
arguments:
289294
col_name: col3
290295
296+
- criticality: error
297+
filter: col1 < 3
298+
check:
299+
function: is_not_null_and_not_empty
300+
arguments:
301+
col_name: col4
302+
291303
- criticality: warn
292304
check:
293305
function: value_is_in_list
@@ -300,7 +312,7 @@ checks = yaml.safe_load("""
300312

301313
input_df = spark.read.table("catalog1.schema1.table1")
302314

303-
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
315+
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
304316
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
305317

306318
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
@@ -411,3 +423,27 @@ dq_engine = DQEngine(ws)
411423
For details on the specific methods available in the engine, visit to the [reference](/docs/reference#dq-engine-methods) section.
412424

413425
Information on testing applications that use `DQEngine` can be found [here](/docs/reference#testing-applications-using-dqx).
426+
427+
## Additional Configuration
428+
429+
### Customizing Reporting Error and Warning Columns
430+
431+
By default, DQX appends `_error` and `_warning` reporting columns to the output DataFrame to flag quality issues.
432+
433+
You can customize the names of these reporting columns by specifying additional configurations in the engine.
434+
435+
```python
436+
from databricks.sdk import WorkspaceClient
437+
from databricks.labs.dqx.engine import (
438+
DQEngine,
439+
ExtraParams,
440+
)
441+
442+
# customize reporting column names
443+
extra_parameters = ExtraParams(column_names={"errors": "dq_errors", "warnings": "dq_warnings"})
444+
445+
ws = WorkspaceClient()
446+
dq_engine = DQEngine(ws, extra_params=extra_parameters)
447+
```
448+
449+

0 commit comments

Comments
 (0)