Skip to content

Commit

Permalink
Added unit test coverage for expectations in silver and bronze
Browse files Browse the repository at this point in the history
  • Loading branch information
ravi-databricks committed Mar 18, 2024
1 parent 27989a2 commit 965d075
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 8 deletions.
3 changes: 3 additions & 0 deletions tests/resources/dqe/product/products.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"expect":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_or_fail":{
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
Expand Down
11 changes: 11 additions & 0 deletions tests/resources/dqe/product/silver_data_quality_expectations.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,21 @@
"expect":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all":{
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all_or_fail":{
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
},
"expect_or_fail":{
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
},
"expect_all_or_drop": {
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
},
"expect_or_drop": {
"no_rescued_data": "_rescued_data IS NULL",
"valid_product_id": "product_id IS NOT NULL"
Expand Down
3 changes: 2 additions & 1 deletion tests/resources/onboarding_ac_type2.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
]
},
"silver_table_path_dev": "tests/resources/delta/silver/products",
"silver_transformation_json_dev": "tests/resources/silver_transformations_ac_type2.json"
"silver_transformation_json_dev": "tests/resources/silver_transformations_ac_type2.json",
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/product/silver_data_quality_expectations.json"
},
{
"data_flow_id": "202",
Expand Down
36 changes: 29 additions & 7 deletions tests/test_dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,11 @@ def test_dataflowpipeline_bronze_dqe(self,
view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputView"
pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None)
data_quality_expectations_json = json.loads(bronze_dataflow_spec.dataQualityExpectations)
if "expect" in data_quality_expectations_json:
expect_dict = data_quality_expectations_json["expect"]
expect_dict = {}
if "expect" in data_quality_expectations_json or "expect_all" in data_quality_expectations_json:
expect_dict.update(data_quality_expectations_json["expect"])
if "expect_all" in data_quality_expectations_json:
expect_dict.update(data_quality_expectations_json["expect_all"])
if "expect_or_fail" in data_quality_expectations_json:
expect_or_fail_dict = data_quality_expectations_json["expect_or_fail"]
if "expect_or_drop" in data_quality_expectations_json:
Expand Down Expand Up @@ -670,6 +673,22 @@ def test_dataflowpipeline_silver_cdc_apply_changes(self,
)
bronze_df_row = silver_dataflowSpec_df.filter(silver_dataflowSpec_df.dataFlowId == "201").collect()[0]
silver_dataflow_spec = SilverDataflowSpec(**bronze_df_row.asDict())
data_quality_expectations_json = json.loads(silver_dataflow_spec.dataQualityExpectations)
expect_dict = {}
expect_or_fail_dict = {}
expect_or_drop_dict = {}
if "expect" in data_quality_expectations_json or "expect_all" in data_quality_expectations_json:
expect_dict.update(data_quality_expectations_json["expect"])
if "expect_all" in data_quality_expectations_json:
expect_dict.update(data_quality_expectations_json["expect_all"])
if "expect_or_fail" in data_quality_expectations_json:
expect_or_fail_dict.update(data_quality_expectations_json["expect_or_fail"])
if "expect_all_or_fail" in data_quality_expectations_json:
expect_or_fail_dict.update(data_quality_expectations_json["expect_all_or_fail"])
if "expect_all_or_drop" in data_quality_expectations_json:
expect_or_drop_dict.update(data_quality_expectations_json["expect_all_or_drop"])
if "expect_or_drop" in data_quality_expectations_json:
expect_or_drop_dict.update(data_quality_expectations_json["expect_or_drop"])
view_name = f"{silver_dataflow_spec.targetDetails['table']}_inputView"
pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, view_name, None)
target_path = silver_dataflow_spec.targetDetails["path"]
Expand All @@ -689,7 +708,10 @@ def test_dataflowpipeline_silver_cdc_apply_changes(self,
name=f"{silver_dataflowSpec_df.targetDetails['table']}",
table_properties=silver_dataflowSpec_df.tableProperties,
path=target_path,
schema=struct_schema
schema=struct_schema,
expect_all=expect_dict,
expect_all_or_drop=expect_or_drop_dict,
expect_all_or_fail=expect_or_fail_dict
)
assert mock_apply_changes.called_once_with(
name=f"{silver_dataflowSpec_df.targetDetails['table']}",
Expand All @@ -707,12 +729,12 @@ def test_dataflowpipeline_silver_cdc_apply_changes(self,
track_history_except_column_list=cdc_apply_changes.track_history_except_column_list
)

@patch('dlt.create_streaming_live_table', new_callable=MagicMock)
@patch('dlt.create_streaming_table', new_callable=MagicMock)
@patch('dlt.apply_changes', new_callable=MagicMock)
def test_bronze_cdc_apply_changes(self,
mock_create_streaming_live_table,
mock_create_streaming_table,
mock_apply_changes):
mock_create_streaming_live_table.return_value = None
mock_create_streaming_table.return_value = None
mock_apply_changes.apply_changes.return_value = None
onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map)
onboarding_params_map['onboarding_file_path'] = self.onboarding_bronze_type2_json_file
Expand All @@ -736,7 +758,7 @@ def test_bronze_cdc_apply_changes(self,
apply_as_truncates = expr(cdc_apply_changes.apply_as_truncates)
struct_schema = json.loads(bronze_dataflow_spec.schema)
pipeline.write_bronze()
assert mock_create_streaming_live_table.called_once_with(
assert mock_create_streaming_table.called_once_with(
name=f"{bronze_dataflowSpec_df.targetDetails['table']}",
table_properties=bronze_dataflowSpec_df.tableProperties,
path=target_path,
Expand Down

0 comments on commit 965d075

Please sign in to comment.