Skip to content

Commit

Permalink
Merge pull request #69 from causy-dev/track-all-proposed-actions
Browse files Browse the repository at this point in the history
feat(actions): track all proposed actions separatley
  • Loading branch information
this-is-sofia authored Jan 13, 2025
2 parents 899b4ef + e8732d6 commit 161d0d0
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 22 deletions.
20 changes: 14 additions & 6 deletions causy/common_pipeline_steps/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@ def execute(
actions_taken = []
for pipeline_step in self.pipeline_steps:
started = time.time()
result = graph_model_instance_.execute_pipeline_step(pipeline_step)
(
actions_taken,
all_proposed_actions,
) = graph_model_instance_.execute_pipeline_step(pipeline_step)
steps.append(
ActionHistoryStep(
name=pipeline_step.name,
actions=result,
actions=actions_taken,
all_proposed_actions=all_proposed_actions,
duration=time.time() - started,
)
)
actions_taken.extend(result)
actions_taken.extend(actions_taken)
n += 1
return ActionHistoryStep(
name=self.name,
Expand Down Expand Up @@ -80,17 +84,21 @@ def execute(
loop_started = time.time()
for pipeline_step in self.pipeline_steps:
started = time.time()
result = graph_model_instance_.execute_pipeline_step(
(
actions_taken,
all_proposed_actions,
) = graph_model_instance_.execute_pipeline_step(
pipeline_step, apply_to_graph=False
)
steps.append(
ActionHistoryStep(
name=pipeline_step.name,
actions=result,
actions=actions_taken,
all_proposed_actions=all_proposed_actions,
duration=time.time() - started,
)
)
actions.extend(result)
actions.extend(actions_taken)

graph_model_instance_._take_action(actions)

Expand Down
43 changes: 28 additions & 15 deletions causy/graph_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from abc import ABC
from copy import deepcopy
import time
from typing import Optional, List, Dict, Callable, Union, Any, Generator
from typing import Optional, List, Dict, Callable, Union, Any, Generator, Tuple

import torch.multiprocessing as mp

Expand Down Expand Up @@ -215,11 +215,12 @@ def execute_pipeline_step_with_progress(self) -> Generator:
"previous_duration": time.time() - started,
}
started = time.time()
actions_taken = self.execute_pipeline_step(filter)
actions_taken, all_actions = self.execute_pipeline_step(filter)
self.graph.graph.action_history.append(
ActionHistoryStep(
name=filter.name,
actions=actions_taken,
all_proposed_actions=all_actions,
duration=time.time() - started,
)
)
Expand Down Expand Up @@ -289,18 +290,22 @@ def _execute_hook(self, hook: str, i: TestResultAction):
except Exception as e:
logger.error(f"Error in hook ({str(hook)}): {e}")

def _take_action(self, results, dry_run=False):
def _take_action(
self, results, dry_run=False
) -> Tuple[List[TestResultAction], List[TestResultAction]]:
"""
Take the actions returned by the test
In causy changes on the graph are not executed directly. Instead, the test returns an action which should be executed on the graph.
This is done to make it possible to execute the tests in parallel as well as to decide proactively at which point in the decisions taken by the pipeline step should be executed.
Actions are returned by the test and are executed on the graph. The actions are stored in the action history to make it possible to revert the actions or use them in a later step.
:param results:
:return:
:param results: the results
:return: actions taken, all actions taken
"""
all_actions_taken = []
all_actions = []
for result_items in results:
if result_items is None:
continue
Expand All @@ -310,6 +315,7 @@ def _take_action(self, results, dry_run=False):
self.algorithm.pre_graph_update_hooks, self.graph, result_items
)
actions_taken = []
all_actions.extend(result_items)
for i in result_items:
if i.u is not None and i.v is not None:
logger.debug(f"Action: {i.action} on {i.u.name} and {i.v.name}")
Expand Down Expand Up @@ -421,11 +427,11 @@ def _take_action(self, results, dry_run=False):
self._execute_post_graph_update_hooks(
self.algorithm.post_graph_update_hooks, self.graph, actions_taken
)
return all_actions_taken
return all_actions_taken, all_actions

def execute_pipeline_step(
self, test_fn: PipelineStepInterface, apply_to_graph=True
):
) -> Tuple[List[TestResultAction], List[TestResultAction]]:
"""
Execute a single pipeline_step on the graph. either in parallel or in a single process depending on the test_fn.parallel flag
:param apply_to_graph: if the action should be applied to the graph
Expand All @@ -434,6 +440,7 @@ def execute_pipeline_step(
:return:
"""
actions_taken = []
all_actions = []
# initialize the worker pool (we currently use all available cores * 2)

# run all combinations in parallel except if the number of combinations is smaller then the chunk size
Expand All @@ -456,19 +463,24 @@ def execute_pipeline_step(
):
if not isinstance(result, list):
result = [result]
actions_taken.extend(
self._take_action(result, dry_run=not apply_to_graph)

actions_taken_current, all_actions_current = self._take_action(
result, dry_run=not apply_to_graph
)
actions_taken.extend(actions_taken_current)
all_actions.extend(all_actions_current)
else:
if test_fn.generator.chunked:
for chunk in test_fn.generator.generate(self.graph.graph, self):
iterator = [
unpack_run(i)
for i in [[test_fn, [*c], self.graph.graph] for c in chunk]
]
actions_taken.extend(
self._take_action(iterator, dry_run=not apply_to_graph)
actions_taken_current, all_actions_current = self._take_action(
iterator, dry_run=not apply_to_graph
)
actions_taken.extend(actions_taken_current)
all_actions.extend(all_actions_current)
else:
# this is the only mode which supports unapplied actions to be passed to the next pipeline step (for now)
# which are sometimes needed for e.g. conflict resolution
Expand All @@ -487,12 +499,13 @@ def execute_pipeline_step(
if rn_fn.needs_unapplied_actions:
i.append(local_results)
local_results.append(unpack_run(i))

actions_taken.extend(
self._take_action(local_results, dry_run=not apply_to_graph)
actions_taken_current, all_actions_current = self._take_action(
local_results, dry_run=not apply_to_graph
)
actions_taken.extend(actions_taken_current)
all_actions.extend(all_actions_current)

return actions_taken
return actions_taken, all_actions


def graph_model_factory(
Expand Down
3 changes: 3 additions & 0 deletions causy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class ActionHistoryStep(BaseModel):
name: str
duration: Optional[float] = None # seconds
actions: Optional[List[TestResult]] = []
all_proposed_actions: Optional[
List[TestResult]
] = [] # all actions and not only the ones that were executed
steps: Optional[List["ActionHistoryStep"]] = []


Expand Down
110 changes: 109 additions & 1 deletion tests/test_pc_e2e.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from causy.causal_discovery.constraint.algorithms.pc import PC_EDGE_TYPES
from causy.causal_discovery.constraint.algorithms.pc import PC_EDGE_TYPES, PC
from causy.common_pipeline_steps.calculation import CalculatePearsonCorrelations
from causy.graph_model import graph_model_factory
from causy.causal_discovery.constraint.independence_tests.common import (
Expand Down Expand Up @@ -29,6 +29,114 @@ def _sample_generator(self):
random=lambda: rdnv(0, 1),
)

def test_pc_number_of_all_proposed_actions_two_nodes(self):
"""
test if the number of all proposed actions is correct
"""
rdnv = self.seeded_random.normalvariate
sample_generator = IIDSampleGenerator(
edges=[
SampleEdge(NodeReference("X"), NodeReference("Y"), 5),
],
random=lambda: rdnv(0, 1),
)
test_data, graph = sample_generator.generate(1000)
tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
pc_results = tst.execute_pipeline_steps()
self.assertEqual(len(pc_results[0].all_proposed_actions), 1)
self.assertEqual(len(pc_results[1].all_proposed_actions), 1)
self.assertEqual(len(pc_results[2].all_proposed_actions), 0)

def test_pc_number_of_actions_two_nodes(self):
"""
test if the number of all actions is correct
"""
rdnv = self.seeded_random.normalvariate
sample_generator = IIDSampleGenerator(
edges=[
SampleEdge(NodeReference("X"), NodeReference("Y"), 5),
],
random=lambda: rdnv(0, 1),
)
test_data, graph = sample_generator.generate(1000)
tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
pc_results = tst.execute_pipeline_steps()
self.assertEqual(len(pc_results[0].actions), 1)
self.assertEqual(len(pc_results[1].actions), 0)
self.assertEqual(len(pc_results[2].actions), 0)

def test_pc_number_of_all_proposed_actions_three_nodes(self):
"""
test if the number of all proposed actions is correct
"""
rdnv = self.seeded_random.normalvariate
sample_generator = IIDSampleGenerator(
edges=[
SampleEdge(NodeReference("X"), NodeReference("Y"), 5),
SampleEdge(NodeReference("Y"), NodeReference("Z"), 6),
],
random=lambda: rdnv(0, 1),
)
test_data, graph = sample_generator.generate(10)
tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
pc_results = tst.execute_pipeline_steps()
self.assertEqual(len(pc_results[0].all_proposed_actions), 3)
self.assertEqual(len(pc_results[1].all_proposed_actions), 3)
# TODO: think about whether the pairs with neighbours generator returns what we want, but the counting seems correct
self.assertEqual(len(pc_results[2].all_proposed_actions), 4)

def test_pc_number_of_actions_three_nodes(self):
"""
test if the number of all proposed actions is correct
"""
rdnv = self.seeded_random.normalvariate
sample_generator = IIDSampleGenerator(
edges=[
SampleEdge(NodeReference("X"), NodeReference("Y"), 5),
SampleEdge(NodeReference("Y"), NodeReference("Z"), 6),
],
random=lambda: rdnv(0, 1),
)
test_data, graph = sample_generator.generate(1000)
tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
pc_results = tst.execute_pipeline_steps()
self.assertEqual(len(pc_results[0].actions), 3)
self.assertEqual(len(pc_results[1].actions), 0)
self.assertEqual(len(pc_results[2].actions), 1)

def test_pc_number_of_all_proposed_actions_four_nodes(self):
"""
test if the number of all proposed actions is correct
"""
rdnv = self.seeded_random.normalvariate
sample_generator = IIDSampleGenerator(
edges=[
SampleEdge(NodeReference("X"), NodeReference("Y"), 5),
SampleEdge(NodeReference("Y"), NodeReference("Z"), 6),
SampleEdge(NodeReference("X"), NodeReference("W"), 7),
SampleEdge(NodeReference("W"), NodeReference("Y"), 5),
SampleEdge(NodeReference("W"), NodeReference("Z"), 6),
],
random=lambda: rdnv(0, 1),
)
test_data, graph = sample_generator.generate(1000)
tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
pc_results = tst.execute_pipeline_steps()
self.assertEqual(len(pc_results[0].all_proposed_actions), 6)
self.assertEqual(len(pc_results[1].all_proposed_actions), 6)
# TODO: think about whether the pairs with neighbours generator returns what we want, but the counting seems correct
self.assertEqual(len(pc_results[3].all_proposed_actions), 7)

def test_pc_calculate_pearson_correlations(self):
"""
Test conditional independence of ordered pairs given pairs of other variables works.
Expand Down

0 comments on commit 161d0d0

Please sign in to comment.