diff --git a/causy/common_pipeline_steps/logic.py b/causy/common_pipeline_steps/logic.py index 11c4ed8..e50e355 100644 --- a/causy/common_pipeline_steps/logic.py +++ b/causy/common_pipeline_steps/logic.py @@ -42,15 +42,19 @@ def execute( actions_taken = [] for pipeline_step in self.pipeline_steps: started = time.time() - result = graph_model_instance_.execute_pipeline_step(pipeline_step) + ( + actions_taken, + all_proposed_actions, + ) = graph_model_instance_.execute_pipeline_step(pipeline_step) steps.append( ActionHistoryStep( name=pipeline_step.name, - actions=result, + actions=actions_taken, + all_proposed_actions=all_proposed_actions, duration=time.time() - started, ) ) - actions_taken.extend(result) + actions_taken.extend(actions_taken) n += 1 return ActionHistoryStep( name=self.name, @@ -80,17 +84,21 @@ def execute( loop_started = time.time() for pipeline_step in self.pipeline_steps: started = time.time() - result = graph_model_instance_.execute_pipeline_step( + ( + actions_taken, + all_proposed_actions, + ) = graph_model_instance_.execute_pipeline_step( pipeline_step, apply_to_graph=False ) steps.append( ActionHistoryStep( name=pipeline_step.name, - actions=result, + actions=actions_taken, + all_proposed_actions=all_proposed_actions, duration=time.time() - started, ) ) - actions.extend(result) + actions.extend(actions_taken) graph_model_instance_._take_action(actions) diff --git a/causy/graph_model.py b/causy/graph_model.py index 8bc197c..074ef81 100644 --- a/causy/graph_model.py +++ b/causy/graph_model.py @@ -4,7 +4,7 @@ from abc import ABC from copy import deepcopy import time -from typing import Optional, List, Dict, Callable, Union, Any, Generator +from typing import Optional, List, Dict, Callable, Union, Any, Generator, Tuple import torch.multiprocessing as mp @@ -215,11 +215,12 @@ def execute_pipeline_step_with_progress(self) -> Generator: "previous_duration": time.time() - started, } started = time.time() - actions_taken = self.execute_pipeline_step(filter) + actions_taken, all_actions = self.execute_pipeline_step(filter) self.graph.graph.action_history.append( ActionHistoryStep( name=filter.name, actions=actions_taken, + all_proposed_actions=all_actions, duration=time.time() - started, ) ) @@ -289,7 +290,9 @@ def _execute_hook(self, hook: str, i: TestResultAction): except Exception as e: logger.error(f"Error in hook ({str(hook)}): {e}") - def _take_action(self, results, dry_run=False): + def _take_action( + self, results, dry_run=False + ) -> Tuple[List[TestResultAction], List[TestResultAction]]: """ Take the actions returned by the test @@ -297,10 +300,12 @@ def _take_action(self, results, dry_run=False): This is done to make it possible to execute the tests in parallel as well as to decide proactively at which point in the decisions taken by the pipeline step should be executed. Actions are returned by the test and are executed on the graph. The actions are stored in the action history to make it possible to revert the actions or use them in a later step. - :param results: - :return: + :param results: the results + + :return: actions taken, all actions taken """ all_actions_taken = [] + all_actions = [] for result_items in results: if result_items is None: continue @@ -310,6 +315,7 @@ def _take_action(self, results, dry_run=False): self.algorithm.pre_graph_update_hooks, self.graph, result_items ) actions_taken = [] + all_actions.extend(result_items) for i in result_items: if i.u is not None and i.v is not None: logger.debug(f"Action: {i.action} on {i.u.name} and {i.v.name}") @@ -421,11 +427,11 @@ def _take_action(self, results, dry_run=False): self._execute_post_graph_update_hooks( self.algorithm.post_graph_update_hooks, self.graph, actions_taken ) - return all_actions_taken + return all_actions_taken, all_actions def execute_pipeline_step( self, test_fn: PipelineStepInterface, apply_to_graph=True - ): + ) -> Tuple[List[TestResultAction], List[TestResultAction]]: """ Execute a single pipeline_step on the graph. either in parallel or in a single process depending on the test_fn.parallel flag :param apply_to_graph: if the action should be applied to the graph @@ -434,6 +440,7 @@ def execute_pipeline_step( :return: """ actions_taken = [] + all_actions = [] # initialize the worker pool (we currently use all available cores * 2) # run all combinations in parallel except if the number of combinations is smaller then the chunk size @@ -456,9 +463,12 @@ def execute_pipeline_step( ): if not isinstance(result, list): result = [result] - actions_taken.extend( - self._take_action(result, dry_run=not apply_to_graph) + + actions_taken_current, all_actions_current = self._take_action( + result, dry_run=not apply_to_graph ) + actions_taken.extend(actions_taken_current) + all_actions.extend(all_actions_current) else: if test_fn.generator.chunked: for chunk in test_fn.generator.generate(self.graph.graph, self): @@ -466,9 +476,11 @@ def execute_pipeline_step( unpack_run(i) for i in [[test_fn, [*c], self.graph.graph] for c in chunk] ] - actions_taken.extend( - self._take_action(iterator, dry_run=not apply_to_graph) + actions_taken_current, all_actions_current = self._take_action( + iterator, dry_run=not apply_to_graph ) + actions_taken.extend(actions_taken_current) + all_actions.extend(all_actions_current) else: # this is the only mode which supports unapplied actions to be passed to the next pipeline step (for now) # which are sometimes needed for e.g. conflict resolution @@ -487,12 +499,13 @@ def execute_pipeline_step( if rn_fn.needs_unapplied_actions: i.append(local_results) local_results.append(unpack_run(i)) - - actions_taken.extend( - self._take_action(local_results, dry_run=not apply_to_graph) + actions_taken_current, all_actions_current = self._take_action( + local_results, dry_run=not apply_to_graph ) + actions_taken.extend(actions_taken_current) + all_actions.extend(all_actions_current) - return actions_taken + return actions_taken, all_actions def graph_model_factory( diff --git a/causy/models.py b/causy/models.py index 8acf543..96f5f13 100644 --- a/causy/models.py +++ b/causy/models.py @@ -88,6 +88,9 @@ class ActionHistoryStep(BaseModel): name: str duration: Optional[float] = None # seconds actions: Optional[List[TestResult]] = [] + all_proposed_actions: Optional[ + List[TestResult] + ] = [] # all actions and not only the ones that were executed steps: Optional[List["ActionHistoryStep"]] = [] diff --git a/tests/test_pc_e2e.py b/tests/test_pc_e2e.py index 35fc21e..362cd6b 100644 --- a/tests/test_pc_e2e.py +++ b/tests/test_pc_e2e.py @@ -1,4 +1,4 @@ -from causy.causal_discovery.constraint.algorithms.pc import PC_EDGE_TYPES +from causy.causal_discovery.constraint.algorithms.pc import PC_EDGE_TYPES, PC from causy.common_pipeline_steps.calculation import CalculatePearsonCorrelations from causy.graph_model import graph_model_factory from causy.causal_discovery.constraint.independence_tests.common import ( @@ -29,6 +29,114 @@ def _sample_generator(self): random=lambda: rdnv(0, 1), ) + def test_pc_number_of_all_proposed_actions_two_nodes(self): + """ + test if the number of all proposed actions is correct + """ + rdnv = self.seeded_random.normalvariate + sample_generator = IIDSampleGenerator( + edges=[ + SampleEdge(NodeReference("X"), NodeReference("Y"), 5), + ], + random=lambda: rdnv(0, 1), + ) + test_data, graph = sample_generator.generate(1000) + tst = PC() + tst.create_graph_from_data(test_data) + tst.create_all_possible_edges() + pc_results = tst.execute_pipeline_steps() + self.assertEqual(len(pc_results[0].all_proposed_actions), 1) + self.assertEqual(len(pc_results[1].all_proposed_actions), 1) + self.assertEqual(len(pc_results[2].all_proposed_actions), 0) + + def test_pc_number_of_actions_two_nodes(self): + """ + test if the number of all actions is correct + """ + rdnv = self.seeded_random.normalvariate + sample_generator = IIDSampleGenerator( + edges=[ + SampleEdge(NodeReference("X"), NodeReference("Y"), 5), + ], + random=lambda: rdnv(0, 1), + ) + test_data, graph = sample_generator.generate(1000) + tst = PC() + tst.create_graph_from_data(test_data) + tst.create_all_possible_edges() + pc_results = tst.execute_pipeline_steps() + self.assertEqual(len(pc_results[0].actions), 1) + self.assertEqual(len(pc_results[1].actions), 0) + self.assertEqual(len(pc_results[2].actions), 0) + + def test_pc_number_of_all_proposed_actions_three_nodes(self): + """ + test if the number of all proposed actions is correct + """ + rdnv = self.seeded_random.normalvariate + sample_generator = IIDSampleGenerator( + edges=[ + SampleEdge(NodeReference("X"), NodeReference("Y"), 5), + SampleEdge(NodeReference("Y"), NodeReference("Z"), 6), + ], + random=lambda: rdnv(0, 1), + ) + test_data, graph = sample_generator.generate(10) + tst = PC() + tst.create_graph_from_data(test_data) + tst.create_all_possible_edges() + pc_results = tst.execute_pipeline_steps() + self.assertEqual(len(pc_results[0].all_proposed_actions), 3) + self.assertEqual(len(pc_results[1].all_proposed_actions), 3) + # TODO: think about whether the pairs with neighbours generator returns what we want, but the counting seems correct + self.assertEqual(len(pc_results[2].all_proposed_actions), 4) + + def test_pc_number_of_actions_three_nodes(self): + """ + test if the number of all proposed actions is correct + """ + rdnv = self.seeded_random.normalvariate + sample_generator = IIDSampleGenerator( + edges=[ + SampleEdge(NodeReference("X"), NodeReference("Y"), 5), + SampleEdge(NodeReference("Y"), NodeReference("Z"), 6), + ], + random=lambda: rdnv(0, 1), + ) + test_data, graph = sample_generator.generate(1000) + tst = PC() + tst.create_graph_from_data(test_data) + tst.create_all_possible_edges() + pc_results = tst.execute_pipeline_steps() + self.assertEqual(len(pc_results[0].actions), 3) + self.assertEqual(len(pc_results[1].actions), 0) + self.assertEqual(len(pc_results[2].actions), 1) + + def test_pc_number_of_all_proposed_actions_four_nodes(self): + """ + test if the number of all proposed actions is correct + """ + rdnv = self.seeded_random.normalvariate + sample_generator = IIDSampleGenerator( + edges=[ + SampleEdge(NodeReference("X"), NodeReference("Y"), 5), + SampleEdge(NodeReference("Y"), NodeReference("Z"), 6), + SampleEdge(NodeReference("X"), NodeReference("W"), 7), + SampleEdge(NodeReference("W"), NodeReference("Y"), 5), + SampleEdge(NodeReference("W"), NodeReference("Z"), 6), + ], + random=lambda: rdnv(0, 1), + ) + test_data, graph = sample_generator.generate(1000) + tst = PC() + tst.create_graph_from_data(test_data) + tst.create_all_possible_edges() + pc_results = tst.execute_pipeline_steps() + self.assertEqual(len(pc_results[0].all_proposed_actions), 6) + self.assertEqual(len(pc_results[1].all_proposed_actions), 6) + # TODO: think about whether the pairs with neighbours generator returns what we want, but the counting seems correct + self.assertEqual(len(pc_results[3].all_proposed_actions), 7) + def test_pc_calculate_pearson_correlations(self): """ Test conditional independence of ordered pairs given pairs of other variables works.