diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d786ccda3..28432b252 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,14 @@ Changelog ========= +1.9.0a1 (2025-01-20) +-------------------- + +Bug Fixes + +* Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466 + + 1.8.2 (2025-01-15) -------------------- diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 91263f96b..d2c6289ef 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.8.2" +__version__ = "1.9.0a1" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index ba8ab6e3f..15fb4b07e 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -166,7 +166,6 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: """ selected_nodes: set[str] = set() root_nodes: set[str] = set() - # Index nodes by name, we can improve performance by doing this once # for multiple GraphSelectors if PATH_SELECTOR in self.node_name: @@ -367,25 +366,36 @@ def select_nodes_ids_by_intersection(self) -> set[str]: selected_nodes: set[str] = set() self.visited_nodes: set[str] = set() - for node_id, node in self.nodes.items(): - if self._should_include_node(node_id, node): - selected_nodes.add(node_id) - if self.config.graph_selectors: - nodes_by_graph_selector = self.select_by_graph_operator() - selected_nodes = selected_nodes.intersection(nodes_by_graph_selector) + graph_selected_nodes = self.select_by_graph_operator() + for node_id in graph_selected_nodes: + node = self.nodes[node_id] + # Since the method below changes the tags of test nodes, it can lead to incorrect + # results during the application of graph selectors. Therefore, it is being run within + # nodes previously selected + # This solves https://github.com/astronomer/astronomer-cosmos/pull/1466 + if self._should_include_node(node_id, node): + selected_nodes.add(node_id) + else: + for node_id, node in self.nodes.items(): + if self._should_include_node(node_id, node): + selected_nodes.add(node_id) self.selected_nodes = selected_nodes return selected_nodes def _should_include_node(self, node_id: str, node: DbtNode) -> bool: - """Checks if a single node should be included. Only runs once per node with caching.""" + """ + Checks if a single node should be included. Only runs once per node with caching.""" logger.debug("Inspecting if the node <%s> should be included.", node_id) if node_id in self.visited_nodes: return node_id in self.selected_nodes self.visited_nodes.add(node_id) + # Disclaimer: this method currently copies the tags from parent nodes to children nodes + # that are tests. This can lead to incorrect results in graph node selectors such as reported in + # https://github.com/astronomer/astronomer-cosmos/pull/1466 if node.resource_type == DbtResourceType.TEST and node.depends_on and len(node.depends_on) > 0: node.tags = getattr(self.nodes.get(node.depends_on[0]), "tags", []) logger.debug( @@ -498,7 +508,6 @@ def select_nodes( exclude = exclude or [] if not select and not exclude: return nodes - validate_filters(exclude, select) subset_ids = apply_select_filter(nodes, project_dir, select) if select: diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 4574bd255..51725570d 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -64,6 +64,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected): config={"materialized": "view", "tags": ["has_child", "is_child"]}, ) + child_node = DbtNode( unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child", resource_type=DbtResourceType.MODEL, @@ -183,6 +184,94 @@ def test_select_nodes_by_select_intersection_config_tag(): assert selected == expected +def test_select_nodes_by_select_intersection_config_graph_selector_includes_ancestors(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child,+sibling1"]) + expected = { + grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, + parent_node.unique_id: parent_node, + } + assert selected == expected + + +def test_select_nodes_by_select_intersection_config_graph_selector_none(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child,+orphaned"]) + expected = {} + assert selected == expected + + +def test_select_nodes_by_intersection_and_tag_ancestry(): + parent_sibling_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling", + resource_type=DbtResourceType.MODEL, + depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id], + file_path=SAMPLE_PROJ_PATH / "gen2/models/parent_sibling.sql", + tags=["is_adopted"], + config={"materialized": "view", "tags": ["is_adopted"]}, + ) + sample_nodes_with_parent_sibling = dict(sample_nodes) + sample_nodes_with_parent_sibling[parent_sibling_node.unique_id] = parent_sibling_node + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes_with_parent_sibling, select=["+tag:is_child,+tag:is_adopted"] + ) + expected = { + grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, + } + assert selected == expected + + +def test_select_nodes_by_tag_ancestry(): + parent_sibling_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling", + resource_type=DbtResourceType.MODEL, + depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id], + file_path=SAMPLE_PROJ_PATH / "gen2/models/parent_sibling.sql", + tags=["is_adopted"], + config={"materialized": "view", "tags": ["is_adopted"]}, + ) + sample_nodes_with_parent_sibling = dict(sample_nodes) + sample_nodes_with_parent_sibling[parent_sibling_node.unique_id] = parent_sibling_node + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes_with_parent_sibling, select=["+tag:is_adopted"] + ) + expected = { + grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, + parent_sibling_node.unique_id: parent_sibling_node, + } + assert selected == expected + + +def test_select_nodes_with_test_by_intersection_and_tag_ancestry(): + parent_sibling_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling", + resource_type=DbtResourceType.MODEL, + depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id], + file_path="", + tags=["is_adopted"], + config={"materialized": "view", "tags": ["is_adopted"]}, + ) + test_node = DbtNode( + unique_id=f"{DbtResourceType.TEST.value}.{SAMPLE_PROJ_PATH.stem}.test", + resource_type=DbtResourceType.TEST, + depends_on=[parent_node.unique_id, parent_sibling_node.unique_id], + file_path="", + config={}, + ) + new_sample_nodes = dict(sample_nodes) + new_sample_nodes[parent_sibling_node.unique_id] = parent_sibling_node + new_sample_nodes[test_node.unique_id] = test_node + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=new_sample_nodes, select=["+tag:has_child"]) + # Expected must not include `parent_sibling_node` nor `test_node` + expected = { + parent_node.unique_id: parent_node, + grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, + } + assert selected == expected + + def test_select_nodes_by_select_path(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models"]) expected = {