Skip to content

Commit

Permalink
minor typehint updates
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinGeens committed Sep 28, 2024
1 parent ddc883c commit 6c32be4
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 51 deletions.
5 changes: 3 additions & 2 deletions stream/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from zigzag.stages.main import MainStage
from zigzag.utils import pickle_load, pickle_save

from stream.cost_model.cost_model import StreamCostModelEvaluation
from stream.stages.allocation.constraint_optimization_allocation import ConstraintOptimizationAllocationStage
from stream.stages.allocation.genetic_algorithm_allocation import GeneticAlgorithmAllocationStage
from stream.stages.estimation.zigzag_core_mapping_estimation import ZigZagCoreMappingEstimationStage
Expand Down Expand Up @@ -60,7 +61,7 @@ def optimize_allocation_ga(
experiment_id: str,
output_path: str,
skip_if_exists: bool = False,
):
) -> StreamCostModelEvaluation:
_sanity_check_inputs(hardware, workload, mapping, mode, output_path)

logger = _logging.getLogger(__name__)
Expand Down Expand Up @@ -112,7 +113,7 @@ def optimize_allocation_co(
experiment_id: str,
output_path: str,
skip_if_exists: bool = False,
):
) -> StreamCostModelEvaluation:
_sanity_check_inputs(hardware, workload, mapping, mode, output_path)
_sanity_check_gurobi_license()

Expand Down
4 changes: 2 additions & 2 deletions stream/cost_model/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def initialize_offchip_tensors(workload: ComputationNodeWorkload, accelerator: "
for offchip_top_instance in offchip_top_instances
)
):
memory_op = n.memory_operand_links[op]
memory_op = n.memory_operand_links.layer_to_mem_op(op)
accelerator.spawn(
tensor=tensor,
core=offchip_core,
Expand Down Expand Up @@ -113,7 +113,7 @@ def get_tensors_needed_for_node(node: ComputationNode, G: ComputationNodeWorkloa
tensors_operands: list[MemoryOperand] = []
# Constant operands
for layer_op in node.constant_operands:
memory_op = node.memory_operand_links[layer_op]
memory_op = node.memory_operand_links.layer_to_mem_op(layer_op)
if memory_op in node.too_large_operands:
continue
tensors_this_candidate_needs.append(node.operand_tensors[layer_op])
Expand Down
6 changes: 4 additions & 2 deletions stream/hardware/architecture/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from zigzag.datatypes import MemoryOperand

from stream.hardware.architecture.accelerator import Accelerator


def intersections(a, b):
"""Get the intersections of two lists of ranges.
Expand Down Expand Up @@ -38,8 +40,8 @@ def intersections(a, b):
return ranges


def get_core_capacities(accelerator, mem_op: MemoryOperand, core_ids: list):
core_capacities = {}
def get_core_capacities(accelerator: Accelerator, mem_op: MemoryOperand, core_ids: list[int]):
core_capacities: dict[str, int] = {}
for core_id in core_ids:
core_name = f"Core {core_id}"
core = accelerator.get_core(core_id)
Expand Down
28 changes: 13 additions & 15 deletions stream/opt/allocation/constraint_optimization/allocation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import sys
from typing import Dict, List, Tuple

import gurobipy as gp
from gurobipy import GRB
Expand All @@ -24,12 +23,11 @@ def get_optimal_allocations(
iterations: int,
gap: float = 0.5,
time_limit: int = 600,
latency_attr: str = "latency_total1",
) -> List[Tuple[int, int, Tuple[int, int]]]:
core_ids = sorted((core.id for core in accelerator.cores.nodes() if core.id != accelerator.offchip_core_id))
) -> list[tuple[int, int, tuple[int, int]]]:
core_ids = sorted((core.id for core in accelerator.cores.node_list if core.id != accelerator.offchip_core_id))
core_capacities = get_core_capacities(accelerator, MemoryOperand("I2"), core_ids)

nodes = sorted(workload.nodes())
nodes = sorted(workload.node_list)
ids = convert_ids(nodes)

latencies, possible_allocation_splits = get_latencies(
Expand All @@ -42,14 +40,14 @@ def get_optimal_allocations(
}

layer_ids = sorted(set(n.id for n in nodes))
groups = {layer_id: [] for layer_id in layer_ids}
groups: dict[int, list] = {layer_id: [] for layer_id in layer_ids}

for node in nodes:
groups[node.id].append(ids[node])

weights = {}

for layer_id, group in groups.items():
for group in groups.values():
assert len(group) > 0, "Empty group given"
for i, node_id in enumerate(group):
node = next(k for k, v in ids.items() if v == node_id)
Expand Down Expand Up @@ -84,17 +82,17 @@ def get_optimal_allocations(


def constraint_allocation_optimization(
latencies: Dict[Tuple[int, str, int], int],
energies: Dict[Tuple[int, str], float],
weights_per_id: Dict[int, int],
dependencies: Dict[Tuple[int, int], int],
core_capacities: Dict[str, float],
groups: Dict[int, List[int]],
possible_allocation_splits: Dict[int, Dict[str, Dict[int, int]]],
latencies: dict[tuple[int, str, int], int],
energies: dict[tuple[int, str], float],
weights_per_id: dict[int, int],
dependencies: dict[tuple[int, int], int],
core_capacities: dict[str, float],
groups: dict[int, list[int]],
possible_allocation_splits: dict[int, dict[str, dict[int, int]]],
N: int = 1,
gap: float = 0.5,
time_limit: int = 600,
) -> List[Tuple[int, int, int]]:
) -> list[tuple[int, int, int]]:
"""Get the optimal node-core allocation using constraint optimization.
The timeline is divided into a number of slots. Each node will be assigned to one slot.
Expand Down
28 changes: 14 additions & 14 deletions stream/opt/allocation/constraint_optimization/utils.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
from math import prod
from typing import Dict, List, Tuple, Union

from zigzag.datatypes import LayerDim, LayerOperand, UnrollFactor

from stream.hardware.architecture.accelerator import Accelerator
from stream.utils import CostModelEvaluationLUT
from stream.workload.computation.computation_node import ComputationNode


def convert_id(i: int, j: int) -> int:
k = 1000 * i + j
return k


def invert_id(k: int) -> Tuple[int, int]:
def invert_id(k: int) -> tuple[int, int]:
i, j = divmod(k, 1000)
return i, j


def convert_ids(nodes: List) -> Dict:
ids = {}
def convert_ids(nodes: list[ComputationNode]):
ids: dict[ComputationNode, int] = {}
for node in nodes:
i, j = node.id, node.sub_id
new_id = convert_id(i, j)
ids[node] = new_id
return ids


def invert_ids_list(ids_list: list[tuple[int, int, int]]) -> list[tuple[int, int, Tuple[int, int]]]:
new_l = []
def invert_ids_list(ids_list: list[tuple[int, int, int]]) -> list[tuple[int, int, tuple[int, int]]]:
new_l: list[tuple[int, int, tuple[int, int]]] = []
for slot, core, k in ids_list:
new_l.append((slot, core, invert_id(k)))
return new_l


def invert_ids_dict(d: dict[int, Union[int, float]]) -> dict[Tuple[int, int], Union[int, float]]:
new_d = {}
def invert_ids_dict(d: dict[int, int | float]):
new_d: dict[tuple[int, int], int | float] = {}
for id, val in d.items():
new_d[invert_id(id)] = val
return new_d
Expand All @@ -45,12 +45,12 @@ def get_loop_size(loops: list[tuple[LayerDim, UnrollFactor]], dims: list[LayerDi


def get_latencies(
nodes: list,
nodes: list[ComputationNode],
core_ids: list[int],
accelerator: Accelerator,
node_hw_performances: CostModelEvaluationLUT,
impossible_lat: float = 1e11,
ids: Dict = {},
ids: dict[ComputationNode, int] = {},
) -> tuple[dict[tuple[int, str, int], int], dict]:
if not ids:
ids = {node: node.id for node in nodes}
Expand Down Expand Up @@ -107,13 +107,13 @@ def get_latencies(


def get_energies(
nodes: List,
core_ids: List[int],
nodes: list[ComputationNode],
core_ids: list[int],
accelerator: Accelerator,
node_hw_performances: CostModelEvaluationLUT,
impossible_energy: float = 1e11,
ids: Dict = {},
) -> Dict[Tuple[int, str], float]:
ids: dict[ComputationNode, int] = {},
) -> dict[tuple[int, str], float]:
if not ids:
ids = {node.id: node.id for node in nodes}
core_names = [f"Core {id}" for id in core_ids]
Expand Down
15 changes: 8 additions & 7 deletions stream/stages/allocation/constraint_optimization_allocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from stream.stages.set_fixed_allocation_performance import SetFixedAllocationPerformanceStage
from stream.utils import CostModelEvaluationLUT
from stream.visualization.constraint_optimization import visualize_waco
from stream.workload.computation.computation_node import ComputationNode
from stream.workload.onnx_workload import ComputationNodeWorkload

logger = logging.getLogger(__name__)
Expand All @@ -40,10 +41,10 @@ def __init__(
workload: ComputationNodeWorkload,
accelerator: Accelerator,
node_hw_performances: CostModelEvaluationLUT,
layer_stacks: list[tuple],
layer_stacks: list[tuple[range, ...]],
hint_loops: Any,
node_hw_performances_path_with_split: str,
**kwargs,
**kwargs: dict[str, Any],
):
"""Initialize the ResourceAllocationStage.
Expand Down Expand Up @@ -108,7 +109,7 @@ def run(self):
def run_coala(self):
combined_allocation = []
timestep_offset = 0
for stack, optimal_allocation in self.optimal_allocation_per_stack.items():
for optimal_allocation in self.optimal_allocation_per_stack.values():
# Update all timesteps in this allocation with the offset and add it to the combined allocation
for t, a, id in optimal_allocation:
new_t = t + timestep_offset
Expand Down Expand Up @@ -296,10 +297,10 @@ def get_order_steady_state(self, to_compute, layer_order_steady_state):
order.append((first_node.id, first_node.sub_id))
return order

def get_order_non_steady_state(self, to_compute):
def get_order_non_steady_state(self, to_compute: list[ComputationNode]):
return [(n.id, n.sub_id) for n in sorted(to_compute, key=lambda x: (-x.id, -x.sub_id))]

def schedule_allocation(self, allocation):
def schedule_allocation(self, allocation) -> StreamCostModelEvaluationStage:
# Get the involved layer ids we want to schedule and their core allocations
layer_ids = sorted(set(id[0] for _, _, id in allocation))
core_strs = [sorted(set((c for _, c, id in allocation if id[0] == layer_id))) for layer_id in layer_ids]
Expand Down Expand Up @@ -347,7 +348,7 @@ def set_fixed_allocations_for_workload(
"""! Modify the workload to fix the core allocations to the given core_ids for the given layer_ids."""
assert len(layer_ids) == len(core_ids)
for layer_id, cores in zip(layer_ids, core_ids):
n = next(n for n in workload.nodes() if n.id == layer_id)
n = next(n for n in workload.node_list if n.id == layer_id)
n.chosen_core_allocation = list(cores)
n.possible_core_allocation = list(cores)
n.core_allocation_is_fixed = True
Expand All @@ -358,7 +359,7 @@ def set_fixed_allocations_for_workload(
]
for layer_id_not_in_ss in layer_ids_not_in_ss:
layer_ids_idx = np.searchsorted(layer_ids, layer_id_not_in_ss)
for n in filter(lambda n: n.id == layer_id_not_in_ss, workload.nodes()):
for n in filter(lambda n: n.id == layer_id_not_in_ss, workload.node_list):
assert isinstance(n.core_allocation, list)
n.chosen_core_allocation = n.core_allocation
n.possible_core_allocation = n.core_allocation
Expand Down
4 changes: 2 additions & 2 deletions stream/stages/estimation/stream_cost_model_evaluation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any
from typing import Any, Generator

from zigzag.datatypes import LayerOperand
from zigzag.stages.stage import Stage, StageCallable
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(

self.check_chosen_core_allocation()

def run(self):
def run(self) -> Generator[tuple[StreamCostModelEvaluation, Any], None, None]:
"""! Run the StreamCostModelEvaluation."""
logger.info("Start StreamCostModelEvaluationStage.")
scme = StreamCostModelEvaluation(
Expand Down
4 changes: 2 additions & 2 deletions stream/stages/set_fixed_allocation_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def run(self):
yield cme, extra_info

def set_fixed_allocation_performance(self):
for node in self.workload.nodes():
for node in self.workload.node_list:
if isinstance(node.core_allocation, list) and len(node.core_allocation) == 1:
node.core_allocation_is_fixed = True
node.set_chosen_core_allocation(node.core_allocation[0])
Expand Down Expand Up @@ -90,7 +90,7 @@ def get_offchip_bandwidth(
# If there was offchip memory added for some operands, get the offchip bandwidth required
assert self.accelerator.offchip_core_id is not None, "Off-chip core id is not set."
offchip_core = self.accelerator.get_core(self.accelerator.offchip_core_id)
offchip_instance = next(v for k, v in offchip_core.mem_hierarchy_dict.items())[-1].memory_instance
offchip_instance = next(iter(offchip_core.mem_hierarchy_dict.values()))[-1].memory_instance
offchip_bandwidth = cme.get_total_inst_bandwidth(offchip_instance)
return offchip_bandwidth

Expand Down
4 changes: 2 additions & 2 deletions stream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ def get_too_large_operands(cme: CostModelEvaluation, accelerator: "Accelerator",
return too_large_operands


# TODO: Update this function to work with new mapping definition
def save_core_allocation(
workload: "ComputationNodeWorkload", path: str, type: str = "fixed", format: str = "py"
) -> dict:
"""Saves the core allocations of a workload to a python or pickle file.
In fixed mode: if a layer has been split into multiple groups, the allocation of each group is saved to a tuple.
In flexible mode: for each layer, the possible allocations are saved to a list.
# TODO: Update this function to work with new mapping definition
Args:
workload (DiGraph): The graph of CNs
Expand Down Expand Up @@ -177,7 +177,7 @@ def get_nodes(self):
def get_cores(self, node: "ComputationNode"):
return list(self.lut.get(node, {}).keys())

def remove_cores_with_same_id(self, node, core):
def remove_cores_with_same_id(self, node: "ComputationNode", core: Core):
"""! Removes cores with the same id as core for node from the look-up table."""
if node in self.lut:
self.lut[node] = {c: v for c, v in self.lut[node].items() if c.id != core.id}
Expand Down
8 changes: 5 additions & 3 deletions stream/visualization/activation_distribution.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import networkx as nx
import pandas as pd
import plotly.express as px
from networkx import DiGraph

from stream.workload.computation.computation_node import ComputationNode
from stream.workload.onnx_workload import ComputationNodeWorkload
from stream.workload.utils import prune_workload


def plot_activation_distribution(workload: DiGraph, order: list = None, fig_path="outputs/distribution.html"):
def plot_activation_distribution(
workload: ComputationNodeWorkload, order: list = None, fig_path: str = "outputs/distribution.html"
):
"""
Plot the output tensor sizes throughout the network depth.
The depth is determined through the topological generations sort of the workload.
The output tensor size at depth d is defined as the alive tensors before processing
the nodes in the d'th topological generation.
"""
# Generate order of processing if not provided
order = order or [node.id for gen in nx.topological_generations(workload) for node in gen]
order = order or [node.id for gen in nx.toposlogical_generations(workload) for node in gen]
# Get the activation size per processed node
df, max_size = get_sizes_per_node(workload, order=order)
# Plot the sizes
Expand Down

0 comments on commit 6c32be4

Please sign in to comment.