Skip to content

Commit

Permalink
Fix custom selector behaviour when the model name contains periods (a…
Browse files Browse the repository at this point in the history
…stronomer#1499)

## Description

If we use select with DBT_MANIFEST load method and model name contains
full stop characters (.), the following listing returns empty task group
(assuming I've added public.customer2 model to altered_jaffle_shop
project).

```
customers_ods = DbtTaskGroup(
        group_id="customers_ods",
        project_config=ProjectConfig((DBT_ROOT_PATH / "altered_jaffle_shop").as_posix(), 
                                     manifest_path = os.path.join(DBT_ROOT_PATH, "altered_jaffle_shop", 'target', 'manifest.json'),
                                     dbt_vars={"var": "2"}),
        render_config=RenderConfig(
            select=["+public.customer2"],
            load_method=LoadMode.DBT_MANIFEST,
            enable_mock_profile=False,
            env_vars={"PURGE": os.getenv("PURGE", "0")},
            airflow_vars_to_purge_dbt_ls_cache=["purge"],
        ),
        execution_config=shared_execution_config,
        operator_args={"install_deps": True},
        profile_config=profile_config,
        default_args={"retries": 2},
    )
```

## Related Issue(s)

Closes astronomer#1498 

Co-authored-by: 60098727 <Vladislav.Iakovlev@lemanapro.ru>
  • Loading branch information
yakovlevvs and 60098727 authored Feb 4, 2025
1 parent 002c919 commit c344eb4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 2 deletions.
6 changes: 4 additions & 2 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,10 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
for node_id, node in nodes.items():
node_by_name[node.name] = node_id

if self.node_name in node_by_name:
root_id = node_by_name[self.node_name]
node_name_patched = self.node_name.replace(".", "_")

if node_name_patched in node_by_name:
root_id = node_by_name[node_name_patched]
root_nodes.add(root_id)
else:
logger.warning(f"Selector {self.node_name} not found.")
Expand Down
86 changes: 86 additions & 0 deletions tests/dbt/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected):
config={"materialized": "table", "tags": ["nightly", "deprecated", "test2"]},
)

sibling3_node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.public.sibling3",
resource_type=DbtResourceType.MODEL,
depends_on=[parent_node.unique_id],
file_path=SAMPLE_PROJ_PATH / "gen3/models/public.sibling3.sql",
tags=["nightly", "deprecated", "test3"],
config={"materialized": "table", "tags": ["nightly", "deprecated", "test3"]},
)

orphaned_node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.orphaned",
resource_type=DbtResourceType.MODEL,
Expand All @@ -108,6 +117,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected):
child_node.unique_id: child_node,
sibling1_node.unique_id: sibling1_node,
sibling2_node.unique_id: sibling2_node,
sibling3_node.unique_id: sibling3_node,
orphaned_node.unique_id: orphaned_node,
}

Expand All @@ -124,6 +134,7 @@ def test_select_nodes_by_select_config():
child_node.unique_id: child_node,
sibling1_node.unique_id: sibling1_node,
sibling2_node.unique_id: sibling2_node,
sibling3_node.unique_id: sibling3_node,
}
assert selected == expected

Expand Down Expand Up @@ -296,6 +307,7 @@ def test_select_nodes_by_select_union():
child_node.unique_id: child_node,
sibling1_node.unique_id: sibling1_node,
sibling2_node.unique_id: sibling2_node,
sibling3_node.unique_id: sibling3_node,
}
assert selected == expected

Expand All @@ -311,6 +323,7 @@ def test_select_nodes_by_exclude_tag():
child_node.unique_id: child_node,
sibling1_node.unique_id: sibling1_node,
sibling2_node.unique_id: sibling2_node,
sibling3_node.unique_id: sibling3_node,
another_grandparent_node.unique_id: another_grandparent_node,
orphaned_node.unique_id: orphaned_node,
}
Expand Down Expand Up @@ -341,6 +354,7 @@ def test_select_nodes_by_exclude_union_config_test_tags():
parent_node.unique_id: parent_node,
child_node.unique_id: child_node,
orphaned_node.unique_id: orphaned_node,
sibling3_node.unique_id: sibling3_node,
}
assert selected == expected

Expand All @@ -351,6 +365,7 @@ def test_select_nodes_by_path_dir():
child_node.unique_id: child_node,
sibling1_node.unique_id: sibling1_node,
sibling2_node.unique_id: sibling2_node,
sibling3_node.unique_id: sibling3_node,
orphaned_node.unique_id: orphaned_node,
}
assert selected == expected
Expand Down Expand Up @@ -431,6 +446,7 @@ def test_select_node_by_descendants():
"model.dbt-proj.child",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
]
Expand All @@ -452,6 +468,7 @@ def test_select_node_by_descendants_union():
"model.dbt-proj.child",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
]
Expand Down Expand Up @@ -489,6 +506,7 @@ def test_exclude_by_graph_selector():
expected = [
"model.dbt-proj.child",
"model.dbt-proj.orphaned",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
]
Expand Down Expand Up @@ -547,6 +565,7 @@ def test_should_include_node_without_depends_on(selector_config):
[
"model.dbt-proj.child",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
Expand All @@ -556,6 +575,7 @@ def test_should_include_node_without_depends_on(selector_config):
[
"model.dbt-proj.child",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
Expand All @@ -572,6 +592,7 @@ def test_should_include_node_without_depends_on(selector_config):
["1+tag:deprecated"],
[
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
Expand All @@ -580,6 +601,7 @@ def test_should_include_node_without_depends_on(selector_config):
["1+config.tags:deprecated"],
[
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
Expand All @@ -588,6 +610,7 @@ def test_should_include_node_without_depends_on(selector_config):
["config.materialized:table+"],
[
"model.dbt-proj.child",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
Expand All @@ -607,6 +630,7 @@ def test_select_nodes_by_at_operator():
"model.dbt-proj.child",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
]
Expand All @@ -633,6 +657,7 @@ def test_select_nodes_by_at_operator_root_node():
"model.dbt-proj.child",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
]
Expand All @@ -659,6 +684,7 @@ def test_select_nodes_by_at_operator_with_path():
"model.dbt-proj.child",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
]
Expand All @@ -677,3 +703,63 @@ def test_exclude_with_at_operator():
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["@parent"])
expected = ["model.dbt-proj.orphaned"]
assert sorted(selected.keys()) == expected


def test_select_nodes_with_period():
"""Test @ operator with a node that doesn't exist"""
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["public.sibling3"])
expected = ["model.dbt-proj.public.sibling3"]
assert sorted(selected.keys()) == expected


def test_exclude_nodes_with_period():
"""Test @ operator with a node that doesn't exist"""
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["public.sibling3"])
expected = [
"model.dbt-proj.another_grandparent_node",
"model.dbt-proj.child",
"model.dbt-proj.grandparent",
"model.dbt-proj.orphaned",
"model.dbt-proj.parent",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
]
assert sorted(selected.keys()) == expected


def test_select_nodes_with_period_by_graph():
"""Test @ operator with a node that doesn't exist"""
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+public.sibling3"])
expected = [
"model.dbt-proj.another_grandparent_node",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
]
assert sorted(selected.keys()) == expected


def test_exclude_nodes_with_period_by_graph():
"""Test @ operator with a node that doesn't exist"""
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["+public.sibling3"])
expected = ["model.dbt-proj.child", "model.dbt-proj.orphaned", "model.dbt-proj.sibling1", "model.dbt-proj.sibling2"]
assert sorted(selected.keys()) == expected


def test_select_nodes_with_period_with_at_operator():
"""Test @ operator with a node that doesn't exist"""
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["@public.sibling3"])
expected = [
"model.dbt-proj.another_grandparent_node",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
"model.dbt-proj.public.sibling3",
]
assert sorted(selected.keys()) == expected


def test_exclude_nodes_with_period_with_at_operator():
"""Test @ operator with a node that doesn't exist"""
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["@public.sibling3"])
expected = ["model.dbt-proj.child", "model.dbt-proj.orphaned", "model.dbt-proj.sibling1", "model.dbt-proj.sibling2"]
assert sorted(selected.keys()) == expected

0 comments on commit c344eb4

Please sign in to comment.