Skip to content

Commit

Permalink
use different bw fraction for evictions
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinGeens committed Jan 4, 2025
1 parent 6ec30c8 commit fec0f5c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 91 deletions.
143 changes: 58 additions & 85 deletions stream/cost_model/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,6 @@ def __init__(
self.link_energy: dict[TransferCause, float] = defaultdict(lambda: 0)
self.memory_energy: dict[TransferCause, float] = defaultdict(lambda: 0)

# self.total_cn_offchip_link_energy = 0
# self.total_cn_offchip_memory_energy = 0
# self.total_eviction_to_offchip_link_energy = 0
# self.total_eviction_to_offchip_memory_energy = 0
# self.total_sink_layer_output_offchip_link_energy = 0
# self.total_sink_layer_output_offchip_memory_energy = 0
# self.total_core_to_core_link_energy = 0
# self.total_core_to_core_memory_energy = 0

# Remains constant throughout the scheduling
self.sink_layer_nodes = self.get_sink_layer_nodes()
self.offchip_core = accelerator.get_offchip_core()
Expand All @@ -77,6 +68,7 @@ def __init__(
# Initialize bookkeeping
self.nb_scheduled_nodes = 0
self.scheduled_nodes: set[ComputationNode] = set()
self.bw_fraction_to_use_for_tensor: dict[Tensor, float] = {}
self.candidates = self.get_initial_candidates()
self.initialize_tensor_priorities()
self.initialize_offchip_tensors()
Expand Down Expand Up @@ -132,7 +124,7 @@ def run(self):
while not done:
best_candidate, preds_end = self.pop_best_candidate()
tensors_this_candidate_needs, tensors_operands = self.get_tensors_needed_for_node(best_candidate)
core = self.get_core_for_node(best_candidate)
core = self.get_allocated_core(best_candidate)
transfer_bw_fraction = self.get_transfer_bandwidth_fraction(best_candidate)

# Step 0: get the start time: when core is available or predecessors finished
Expand Down Expand Up @@ -176,7 +168,6 @@ def run(self):
output_memory_operand,
timestep,
tensors_this_candidate_needs,
transfer_bandwidth_fraction=transfer_bw_fraction,
)
timestep = transfer_complete_timestep

Expand Down Expand Up @@ -214,23 +205,20 @@ def run(self):
)

# Step 7: finish this round
self.bw_fraction_to_use_for_tensor[output_tensor] = transfer_bw_fraction
self.extend_candidates(best_candidate)
nb_scheduled_nodes += 1
done = nb_scheduled_nodes == self.nb_graph_nodes

self.latency = self.get_total_latency()
return self.latency

def get_transfer_bandwidth_fraction(self, node: ComputationNode):
"""Get the fraction of the off-chip bandwidth to be used for the tensor transfers related to this node"""
return 1 / node.get_total_inter_core_splits()

def prefetch_constant_operands(self):
"""Load the `operands_to_prefetch` to the cores they belong to."""
for n in self.G.node_list:
for op, tensor in n.operand_tensors.items():
if op in n.constant_operands and op in self.operands_to_prefetch:
core = self.get_core_for_node(n)
core = self.get_allocated_core(n)
memory_op = n.memory_operand_links.layer_to_mem_op(op)
if not self.accelerator.core_contains_tensor(tensor, core):
self.schedule_tensor_transfer(
Expand Down Expand Up @@ -279,12 +267,6 @@ def sync_cores_idle_from(
for core_id in self.cores_idle_from:
self.cores_idle_from[core_id] = max_idle_time

def get_core_for_node(self, node: ComputationNode):
"""Get the core this candidate will be scheduled on"""
core_id = node.chosen_core_allocation
assert core_id is not None
return self.accelerator.get_core(core_id)

def get_tensors_needed_for_node(self, node: ComputationNode):
"""Determine all the tensors needed to compute a node.
The node might need multiple outputs from previous nodes, depending on the graph.
Expand Down Expand Up @@ -331,51 +313,29 @@ def clear_memories(
exceptions: list[Tensor] = [],
transfer_bandwidth_fraction: float = 1,
):
for too_large_operand in memory_operands:
timestep = self.remove_all(
core=core,
memory_operand=too_large_operand,
timestep=timestep,
exceptions=exceptions,
transfer_bandwidth_fraction=transfer_bandwidth_fraction,
write_back_to_offchip=True,
)
return timestep

def remove_all(
self,
core: Core,
memory_operand: MemoryOperand,
timestep: int,
exceptions: list[Tensor] = [],
transfer_bandwidth_fraction: float = 1,
write_back_to_offchip: bool = False,
):
"""Remove all tensors from a core's memory with the given memory operand.
If required, the tensors are written back to offchip before removal.
"""Remove all tensors from a core's memory for the given memory operands.
All tensors are written back to offchip before removal.
Args:
core (Core): The Core to remove the tensor from
core: The Core to remove the tensor from
memory_operand: The memory operand for which all tensors should be evicted.
timestep: The timestep to remove the tensor at.
exceptions: A list of tensors that should not be evicted.
transfer_bandwidth_fraction: Fraction of the bandwidth to use for the transfers.
write_back_to_offchip (bool, optional): Write the tensor to offchip before removal. Defaults to False.
"""
stored_tensors = self.accelerator.get_tensors_stored_in_core(core, memory_operand, timestep)

for tensor in stored_tensors:
if tensor not in exceptions:
timestep = self.schedule_tensor_removal(
tensor_to_remove=tensor,
core_to_remove_from=core,
memory_op=memory_operand,
timestep=timestep,
transfer_bandwidth_fraction=transfer_bandwidth_fraction,
write_back_to_offchip=write_back_to_offchip,
transfer_cause=TransferCause.EVICTION,
)

for memory_operand in memory_operands:
stored_tensors = self.accelerator.get_tensors_stored_in_core(core, memory_operand, timestep)
for tensor in stored_tensors:
if tensor not in exceptions:
timestep = self.schedule_tensor_removal(
tensor_to_remove=tensor,
core_to_remove_from=core,
memory_op=memory_operand,
timestep=timestep,
transfer_bandwidth_fraction=transfer_bandwidth_fraction,
write_back_to_offchip=True,
transfer_cause=TransferCause.EVICTION,
)
return timestep

def schedule_tensor_removal(
Expand Down Expand Up @@ -432,16 +392,6 @@ def schedule_tensor_transfer(
):
"""Find the earliest time to transfer the tensor to the receiving core, and register the transfer.
Evictions of older tensors might be necessary
Args:
tensor
receiving_core
tensor_operand
transfer_cause
non_evictable_tensors
sending_core
earliest_t
transfer_bandwidth_fraction
"""

if self.accelerator.core_contains_tensor(tensor, receiving_core):
Expand All @@ -457,7 +407,6 @@ def schedule_tensor_transfer(
memory_op=tensor_operand,
timestep=earliest_tensor_addition_t,
tensors_to_avoid_evicting=non_evictable_tensors,
transfer_bandwidth_fraction=transfer_bandwidth_fraction,
)

# Find idle window between sender and receiver cores
Expand Down Expand Up @@ -502,44 +451,41 @@ def make_space_for_tensor(
memory_op: MemoryOperand,
timestep: int,
tensors_to_avoid_evicting: list[Tensor] = [],
transfer_bandwidth_fraction: float = 1,
):
"""Make space for the given tensor on the given core by evicting already stored tensors if necessary.
Args:
tensor: The tensor to make space for.
core (Core): The core where the tensor will be stored.
memory_operand: The memory operand on the core.
core: The core where the tensor will be stored.
memory_op: The memory operand on the core.
timestep: The timestep at which to make space for.
transfer_bandwidth_fraction: Fraction of the bandwidth to use for the transfer.
tensors_to_avoid_evicting: A list of tensors that should not be evicted.
"""

top_instance = self.accelerator.get_top_instance_of_core(core, memory_op)

# Earliest timestep when the core has enough space, or the latest timestep if this is never the case
enough_space_timestep = self.accelerator.memory_manager.get_timestep_for_tensor_addition(
tensor=tensor,
core=core,
timestep=timestep,
memory_op=tensor.memory_operand,
memory_op=memory_op,
)

tensors_to_evict = self.accelerator.memory_manager.find_best_tensor_combination_to_evict_fast(
top_instance,
tensor,
enough_space_timestep,
tensors_to_evict = self.accelerator.find_best_tensor_combination_to_evict_fast(
tensor=tensor,
core=core,
timestep=enough_space_timestep,
exceptions=tensors_to_avoid_evicting,
)

if core == self.offchip_core and tensors_to_evict:
raise ValueError("Evictions required in offchip memory. Consider making offchip larger.")

for tensor_to_evict in tensors_to_evict:
transfer_bandwidth_fraction = self.get_transfer_bandwidth_fraction_for_eviction(tensor_to_evict, timestep)
t_eviction_complete = self.schedule_tensor_removal(
tensor_to_remove=tensor_to_evict,
core_to_remove_from=core,
memory_op=memory_op,
timestep=timestep,
timestep=timestep, # TODO should this be `enough_space_timestep`?
transfer_bandwidth_fraction=transfer_bandwidth_fraction,
write_back_to_offchip=True,
transfer_cause=TransferCause.EVICTION,
Expand Down Expand Up @@ -648,7 +594,7 @@ def check_for_removal(
for n in self.G.successors(origin)
if n.chosen_core_allocation in core_ids_of_instance and n.id != origin.id
]
end_times = [n.end for n in nodes_that_needed_tensor if n.end is not None]
end_times = [n.end for n in nodes_that_needed_tensor if n.end >= 0]
max_end_time = max(end_times, default=timestep_for_removal)
# assert max_end_time != -1, "There should be at least one successor."
timestep_for_removal = max_end_time
Expand Down Expand Up @@ -680,6 +626,33 @@ def get_total_latency(self):
links_end_time = max([event.end for event in self.accelerator.communication_manager.events], default=0)
return max(cns_end_time, links_end_time)

def get_allocated_core(self, node: ComputationNode):
"""Get the core this candidate will be scheduled on"""
core_id = node.chosen_core_allocation
assert core_id is not None
return self.accelerator.get_core(core_id)

def get_transfer_bandwidth_fraction(self, node: ComputationNode):
"""Get the fraction of the off-chip bandwidth to be used for the tensor transfers related to this node"""
return 1 / node.get_total_inter_core_splits()

def get_transfer_bandwidth_fraction_for_eviction(self, tensor: Tensor, timestep: int):
"""Get the fraction of the off-chip bandwidth to be used to evict this tensor at the given timestep.
Instead of using the total inter-core splits of the current node, we use the number of cores that store a tensor
of the same layer and memory operand at the given timestep.
# TODO check for given timestep
"""

def contains_related_tensor(tensors: list[Tensor]):
return any(t.origin.id == tensor.origin.id and t.memory_operand == tensor.memory_operand for t in tensors)

instances_storing_related_tensor = [
instance
for instance, tensors in self.accelerator.memory_manager.top_instance_stored_tensors.items()
if contains_related_tensor(tensors)
]
return 1 / len(instances_storing_related_tensor)

@property
def total_cn_offchip_link_energy(self):
return self.link_energy[TransferCause.OFF_CHIP]
Expand Down
12 changes: 12 additions & 0 deletions stream/hardware/architecture/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ def find_tensor(self, tensor: Tensor):
def find_tensor_in_top_instances(self, tensor: Tensor):
return self.memory_manager.find_tensor_in_top_instances(tensor)

def find_best_tensor_combination_to_evict_fast(
self, tensor: Tensor, core: Core, timestep: int, exceptions: list[Tensor] = []
):
top_instance = self.get_top_instance_of_core(core, tensor.memory_operand)
tensors_to_evict = self.memory_manager.find_best_tensor_combination_to_evict_fast(
top_instance=top_instance,
tensor_to_add=tensor,
timestep=timestep,
exceptions=exceptions,
)
return tensors_to_evict

def remove_tensor(
self,
tensor: Tensor,
Expand Down
9 changes: 3 additions & 6 deletions stream/workload/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,10 @@ def __init__(
self.core_allocation_is_fixed = core_allocation_is_fixed
self.chosen_core_allocation = chosen_core_allocation
self.input_names = input_names
# will be set by the scheduler
self.start = None
# will be set by the scheduler
self.end = None
# number of data (in bits) only this node consumes (not consumed by any other node)
self.start = -1
self.end = -1
# number of data (in bits) only this node produces/consumes (not produced/consumed by any other node)
self.data_consumed_unique = 0
# number of data (in bits) only this node produces (not produced by any other node)
self.data_produced_unique = 0

def get_total_energy(self) -> float:
Expand Down

0 comments on commit fec0f5c

Please sign in to comment.