Skip to content

Commit

Permalink
pre-allocate tensor_cns
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinGeens committed Aug 19, 2024
1 parent 4d3d1b3 commit a548718
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions stream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,32 @@ class NodeTensor(np.ndarray[Any, Any]):
the workload size.
"""

def __new__(cls, x: np.ndarray[Any, Any]):
def __new__(cls, x: np.ndarray[Any, Any], pre_allocation_size: int):
return x.view(cls)

def __init__(self, x: np.ndarray[Any, Any], pre_allocation_size: int):
self.__pre_allocation_size = pre_allocation_size
self.__node_count = 0

def as_ndarray(self) -> NDArray[Any]:
"""Typecast to superclass. This is necessary because numpy dispatches methods based on the instance's type"""
return self.view(np.ndarray) # type: ignore

@staticmethod
def initialize_empty(shape: tuple[int, ...]):
# Elements will be concatenated within the last dimension, so it is set to 0 for now
return NodeTensor(np.zeros(shape + (0,), dtype=object)) # type: ignore
def initialize_empty(shape: tuple[int, ...], pre_allocation_size: int = 8):
"""Initialize a NodeTensor to store ComputationNodes. The tensor has shape `(shape, pre_allocation_size)`.
ComputationNodes are accumulated in the last dimension and space is pre-allocated in memory for performance"""
return NodeTensor(np.zeros(shape + (pre_allocation_size,), dtype=object), pre_allocation_size)

def _get_and_increment_pointer(self):
"""Get the index pointer in the last dimension. which points to the next free spot to allocate nodes.
Automatically increments the pointer after each use. If the index exceeds the allocated space, an error is
raised."""
pointer = self.__node_count
if pointer >= self.__pre_allocation_size:
raise IndexError
self.__node_count += 1
return pointer

@property
def shape(self) -> None: # type: ignore
Expand Down Expand Up @@ -158,25 +173,46 @@ def convert_to_full_shape(self, tensor_shape: tuple[int, ...]):
def get_nb_empty_elements(self, slices: tuple[slice, ...]):
"""Returns the number of points for which there are no ComputationNodes."""
assert self.is_valid_shape_dimension(slices), "Last dimension of tensor is reserved for CNs"
tensor_slice = self.as_ndarray()[slices][:]
extended_slices = slices + (slice(0, self.__node_count),)
# tensor_slice = self.as_ndarray()[slices][: self.__node_count]
tensor_slice = self.as_ndarray()[extended_slices]
# all_empty_mask = np.all(tensor_slice == 0, axis=-1)
all_empty_mask = np.logical_and.reduce(tensor_slice == 0, axis=-1)
return int(np.sum(all_empty_mask))

def extend_with_node(self, slices: tuple[slice, ...], node: object):
def extend_with_node(self, slices: tuple[slice, ...], node: object) -> "NodeTensor":
assert self.is_valid_shape_dimension(slices), "Last dimension of tensor is reserved for CNs"
# Slice of thickness 1
new_tensor_slice = np.zeros(self.tensor_shape + (1,), dtype=object)
new_tensor_slice[slices] = node
return NodeTensor(np.concat((self, new_tensor_slice), axis=-1))

def reshape(self, new_shape: tuple[int, ...] | None): # type: ignore
try:
idx = self._get_and_increment_pointer()
extended_slices = slices + (slice(idx, idx + 1),)
self[extended_slices] = node
return self
except IndexError:
# Happens when all allocated space has been used up. Create new one and double allocated space
new_tensor_np = np.concat((self, np.zeros(self.full_shape, dtype=object)), axis=-1)
assert new_tensor_np.shape[-1] == 2 * self.__pre_allocation_size
new_tensor = NodeTensor(new_tensor_np, pre_allocation_size=2 * self.__pre_allocation_size)
# Update the node pointer
assert self.__node_count == self.__pre_allocation_size
new_tensor.__node_count = self.__node_count
new_tensor = new_tensor.extend_with_node(slices, node)
print(f"EXTENDING TENSORNODE: {self.__pre_allocation_size}->{new_tensor.__pre_allocation_size}")
return new_tensor

# # Slice of thickness 1
# new_tensor_slice = np.zeros(self.tensor_shape + (1,), dtype=object)
# new_tensor_slice[slices] = node

# return NodeTensor(np.concat((self, new_tensor_slice), axis=-1))

def reshape(self, new_shape: tuple[int, ...] | None) -> "NodeTensor": # type: ignore
"""Wrap the numpy reshape method such that the user is agnostic to the last dimension on which nodes are
accumulated"""
if not new_shape:
return self
new_tensor_shape = self.convert_to_full_shape(new_shape)
return NodeTensor(np.reshape(self.as_ndarray(), new_tensor_shape))
return np.reshape(self.as_ndarray(), new_tensor_shape).view(NodeTensor)

def transpose(self, axes: list[int] | None):
if axes is not None:
Expand All @@ -186,14 +222,14 @@ def transpose(self, axes: list[int] | None):
# Leave last dimension unchanged
axes = axes + [len(self.tensor_shape)]

return NodeTensor(np.transpose(self.as_ndarray(), axes=axes))
return (np.transpose(self.as_ndarray(), axes=axes)).view(NodeTensor)

def gather(self, gather_indices: int | list[int], axis: int) -> "NodeTensor":
axis = axis - 1 if axis < 0 else axis
return NodeTensor(np.take(self.as_ndarray(), gather_indices, axis=axis))
return (np.take(self.as_ndarray(), gather_indices, axis=axis)).view(NodeTensor)

def __repr__(self):
return f"TensorNode{self.tensor_shape}[depth={self.full_shape[-1]}]"
return f"TensorNode{self.tensor_shape}[depth={self.__node_count}]"

def __reduce__(self):
return self.as_ndarray().__reduce__()
Expand Down

0 comments on commit a548718

Please sign in to comment.