From 78fc0788df9717a391e5617a8103a910a832937e Mon Sep 17 00:00:00 2001 From: Samuel Pastva Date: Sat, 24 Aug 2024 21:38:04 +0200 Subject: [PATCH] Proposal for skipping independent blocks. --- .../_sd_algorithms/expand_source_blocks.py | 134 +++++++++++++++++- 1 file changed, 129 insertions(+), 5 deletions(-) diff --git a/biobalm/_sd_algorithms/expand_source_blocks.py b/biobalm/_sd_algorithms/expand_source_blocks.py index 7621bb0..1900920 100644 --- a/biobalm/_sd_algorithms/expand_source_blocks.py +++ b/biobalm/_sd_algorithms/expand_source_blocks.py @@ -5,6 +5,7 @@ from biobalm.types import BooleanSpace from biobalm.interaction_graph_utils import source_nodes +from biobalm.space_utils import is_subspace, intersect if TYPE_CHECKING: from biobalm.succession_diagram import SuccessionDiagram @@ -15,6 +16,7 @@ def expand_source_blocks( check_maa: bool = True, size_limit: int | None = None, optimize_source_nodes: bool = True, + optimize_independent_blocks: bool = True, ) -> bool: """ Base correctness assumptions: @@ -168,6 +170,123 @@ def expand_source_blocks( f" > [{node}] Minimal blocks: {[(len(k), len(v)) for (k, v) in minimal_blocks]}" ) + if len(minimal_blocks) > 1: + + if sd.config["debug"]: + print( + f" > [{node}] Try to fast-forward {len(minimal_blocks)} independent minimal blocks." + ) + + all_min_traps: list[list[BooleanSpace]] = [] + sds = [ + sd.component_subdiagram(list(x[0]), node) for x in minimal_blocks + ] + for i, s in enumerate(sds): + if ( + s.network.variable_count() <= 3 + or s.network.variable_count() > 100 + ): + min_traps = [ + sd.node_data(x)["space"] for x in minimal_blocks[i][1] + ] + else: + s.expand_minimal_spaces() + min_traps = [ + (s.node_data(x)["space"] | node_space) + for x in s.minimal_trap_spaces() + ] + if sd.config["debug"]: + print( + f" > [{node}] Expanded {len(minimal_blocks[i][1])} block to {len(min_traps)} min traps." + ) + all_min_traps.append(min_traps) + + created: set[int] = set() + for combination in it.product(*all_min_traps, repeat=1): + combined_space: BooleanSpace = {} + for x in combination: + intersection = intersect(combined_space, x) + assert intersection is not None + combined_space = intersection + + combined_node = sd._ensure_node(node, combined_space) # type: ignore + created.add(combined_node) + # for x in combination: + sd._ensure_edge(node, combined_node, combined_space) # type: ignore + + # created: set[int] = set() + # block_sd_nodes = [ x[1] for x in minimal_blocks ] + # for combination in it.product(*block_sd_nodes, repeat=1): + # combined_space: BooleanSpace = {} + # for x in combination: + # intersection = intersect(combined_space, sd.node_data(x)["space"]) + # assert intersection is not None + # combined_space = intersection + + # combined_node = sd._ensure_node(combination[0], combined_space) # type: ignore + # created.add(combined_node) + # for x in combination: + # sd._ensure_edge(x, combined_node, combined_space) # type: ignore + + next_level = next_level | created + + if sd.config["debug"]: + print(f" > [{node}] Fast-forwarded to nodes {created}.") + + continue + + # if len(minimal_blocks) > 1: + # # We have multiple independent minimal blocks. We can try to fast-forward the smallest one + # # to reduce the succession diagram size. + + # block, block_nodes = minimal_blocks[0] + # block_sd = sd.component_subdiagram(list(block), node) + + # if sd.config["debug"]: + # print( + # f" > [{node}] Try to fast-forward minimal block with {len(block)} variables." + # ) + + # assert block_sd.expand_block(find_motif_avoidant_attractors=True, optimize_source_nodes=True) + + # if block_sd.node_data(block_sd.root())["attractor_seeds"] == []: + # sd.node_data(node)["attractor_seeds"] = [] + # sd.node_data(node)["attractor_sets"] = [] + + # is_clean = True + # for inner_node in block_sd.expanded_ids(): + # if block_sd.node_is_minimal(inner_node): + # continue + + # inner_data = block_sd.node_data(inner_node) + # if inner_data["attractor_seeds"] != []: + # is_clean = False + + # block_minimal_spaces = [ (block_sd.node_data(x)["space"] | node_space) for x in block_sd.minimal_trap_spaces() ] + + # if sd.config["debug"]: + # print( + # f" > [{node}] Skipped {len(block_sd) - 1 - len(block_nodes)} nodes by fast-forwarding into {len(block_minimal_spaces)} minimal trap spaces." + # ) + + # for block_node in block_nodes: + # block_node_data = sd.node_data(block_node) + # if block_node_data["space"] in block_minimal_spaces: + # # This space is already in here. + # continue + + # for successor in block_minimal_spaces: + # # Add links to all the *appropriate* minimal trap spaces. + # if is_subspace(successor, block_node_data["space"]): + # sd._ensure_node(block_node, successor) # type: ignore + + # block_node_data = sd.node_data(block_node) + # block_node_data["expanded"] = True + + # if is_clean: + # block_node_data["attractor_seeds"] = [] + # block_node_data["attractor_sets"] = [] + if not check_maa: # We will expand all nodes that are in the smallest block. to_expand = minimal_blocks[0][1] @@ -201,10 +320,15 @@ def expand_source_blocks( # MAAs in the problematic nodes while using the nice properties of the expansion to # still disprove MAAs in the remaining nodes. If we used `seeds`, the expansion could # just get stuck on this node and the "partial" results wouldn't be usable. - block_sd_candidates = block_sd.node_attractor_candidates( - block_sd.root(), compute=True - ) - if len(block_sd_candidates) == 0: + try: + block_sd_candidates = block_sd.node_attractor_candidates( + block_sd.root(), compute=True + ) + is_clean = len(block_sd_candidates) == 0 + except: + is_clean = False + + if is_clean: if sd.config["debug"]: print( f" > [{node}] Found clean block with no MAAs ({len(block_nodes)}): {block_nodes}" @@ -217,7 +341,7 @@ def expand_source_blocks( else: if sd.config["debug"]: print( - f"[{node}] > Found {len(block_sd_candidates)} MAA cnadidates in a block. Delaying expansion." + f"[{node}] > Found MAA candidates in a block or could not find candidates at all. Delaying expansion." ) if not clean_block_found: # If all blocks have MAAs, we expand all successors.