Skip to content

Commit

Permalink
refactor VcdScope ; improve test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
cxzzzz committed Dec 1, 2024
1 parent 2fe1242 commit 76ba373
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 59 deletions.
16 changes: 7 additions & 9 deletions src/wavekit/fsdb_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import importlib
from collections import defaultdict
from typing import Optional
from functools import cached_property
from .waveform import Waveform
from .reader import Reader, Scope
from .npi_fsdb_reader import NpiFsdbReader, NpiFsdbScope
Expand All @@ -12,12 +13,15 @@ def __init__(self, handle: NpiFsdbScope, parent_scope: FsdbScope):
super().__init__(name=handle.name())
self.handle = handle
self.parent_scope = parent_scope
self.child_scope_list = [FsdbScope(c, self) for c in self.handle.child_scope_list()]

@property
@cached_property
def signal_list(self) -> list[str]:
return [s for s in self.handle.signal_list()]

@cached_property
def child_scope_list(self) -> list[FsdbScope]:
return [FsdbScope(c, self) for c in self.handle.child_scope_list()]

@property
def type(self) -> str:
if not hasattr(self, '_type'):
Expand All @@ -30,13 +34,7 @@ def def_name(self) -> Optional[str]:
self._def_name = self.handle.def_name()
return self._def_name

def find_module_scope(self, module_name: str, depth: int = 0) -> list[Scope]:
#if self.type == 'npiFsdbScopeSvModule' and self.def_name == module_name:
# return [self]
#elif depth == 1:
# return []
#else: # depth == 0 or depth > 1
# return list(reduce(lambda a, b: a + b, [c.find_module_scope(module_name, depth - 1) for c in self.child_scope_list], []))
def find_scope_by_module(self, module_name: str, depth: int = 0) -> list[Scope]:
if not hasattr(self, '_preloaded_module_scope'):
self.preload_module_scope()
return self._preloaded_module_scope[module_name]
Expand Down
19 changes: 12 additions & 7 deletions src/wavekit/reader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations
import numpy as np
import re
from functools import reduce
from functools import reduce, cached_property
from abc import abstractmethod
from typing import Optional
from .waveform import Waveform
Expand Down Expand Up @@ -34,7 +34,7 @@ def traverse_signal(
key = (match.groups(),)
if key in res:
raise Exception(
f"pattern {pattern} match more than one signal")
f"pattern {p[1:]} match more than one signal")
res[key] = f"{signal}{range_expr}"
elif p == signal:
key = k
Expand All @@ -61,7 +61,7 @@ def traverse_scope(
depth = 1

module_scopes = scope._module_cache[p] if (
p in scope._module_cache) else scope.find_module_scope(module_name=module_name, depth=depth)
p in scope._module_cache) else scope.find_scope_by_module(module_name=module_name, depth=depth)
scope._module_cache[p] = module_scopes

if len(module_scopes) == 1 and module_scopes[0] == scope:
Expand Down Expand Up @@ -100,15 +100,15 @@ def traverse_scope(
key = new_k + sk
if key in res:
raise Exception(
f"pattern {pattern} match more than one signal")
f"pattern {p} match more than one signal")
res[key] = f"{scope.name}.{ss}"

for child_scope in scope.child_scope_list:
for ck, cs in traverse_scope(child_scope, descendant_scope_pattern_list[1:]).items():
key = new_k + ck
if key in res:
raise Exception(
f"pattern {pattern} match more than one signal")
f"pattern {p} match more than one signal")
res[key] = f"{scope.name}.{cs}"
return res

Expand All @@ -120,11 +120,16 @@ def __init__(self, name: str):
self.name = name
self._module_cache = dict()

@property
@cached_property
@abstractmethod
def signal_list(self) -> list[str]:
pass

@cached_property
@abstractmethod
def child_scope_list(self) -> list[Scope]:
pass

def full_name(self, root: Scope = None) -> list[str]:
ancestors, parent = [], self
while parent is not None:
Expand All @@ -134,7 +139,7 @@ def full_name(self, root: Scope = None) -> list[str]:
parent = parent.parent_scope
return ".".join([x.name for x in reversed(ancestors)])

def find_module_scope(self, module_name: str, depth: int = 0) -> list[Scope]:
def find_scope_by_module(self, module_name: str, depth: int = 0) -> list[Scope]:
raise NotImplementedError()

@property
Expand Down
47 changes: 12 additions & 35 deletions src/wavekit/vcd_reader.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,35 @@
from __future__ import annotations
import re
import numpy as np
from functools import cached_property
from vcdvcd import VCDVCD, Scope as VcdVcdScope, Signal as VcdVcdSignal
from typing import Optional
from .waveform import Waveform
from .reader import Reader, Scope

class VcdScope(Scope):
@staticmethod
def from_signal_list(signal_list: list, scope_list: list) -> list[VcdScope]:
scopes = {}
for scope in scope_list:
ancestors = scope.split(".")
full_name = ""
parent_scope = None
for scope_name in ancestors:
full_name = full_name + scope_name
if full_name not in scopes:
if full_name not in scopes:
new_scope = VcdScope(
scope_name, parent_scope=parent_scope)
if parent_scope is not None:
parent_scope._child_scopes[scope_name] = new_scope
scopes[full_name] = new_scope

parent_scope = scopes[full_name]
full_name = full_name + "."

for signal in signal_list:
scope_name = ".".join(signal.split(".")[:-1])
scopes[scope_name]._signals.add(signal.split(".")[-1])

return [scope for scope in scopes.values() if scope.parent_scope is None]

def __init__(self, name: str, parent_scope: VcdScope):
super().__init__(name=name)
def __init__(self, vcdvcd_scope: VcdVcdScope, parent_scope: Scope):
super().__init__(name = vcdvcd_scope.name.split(".")[-1])
self.vcdvcd_scope = vcdvcd_scope
self.parent_scope = parent_scope
self._child_scopes = {}
self._signals = set()
self.child_scope_list = list(self._child_scopes.values())

@property
@cached_property
def signal_list(self) -> list[str]:
return list(self._signals)
return [k for k,v in self.vcdvcd_scope.subElements.items() if isinstance(v, str)]

@cached_property
def child_scope_list(self) -> list[Scope]:
return [VcdScope(v, self) for k,v in self.vcdvcd_scope.subElements.items() if isinstance(v, VcdVcdScope)]

class VcdReader(Reader):

def __init__(self, file: str):
super().__init__()
self.file = file
self.file_handle = VCDVCD(file, store_scopes=True)
self._top_scope_list = VcdScope.from_signal_list(
self.file_handle.signals, self.file_handle.scopes
)
self._top_scope_list = [VcdScope(v, None) for k,v in self.file_handle.scopes.items() if '.' not in k]

def top_scope_list(self) -> list[Scope]:
return self._top_scope_list
Expand Down Expand Up @@ -81,8 +58,8 @@ def load_wave(
if end_time is not None:
raise NotImplementedError("end_time is not supported")

width = int(signal_handle.size)
signal_handle = self.file_handle[signal]
width = int(signal_handle.size)
signal_value_change = np.array([
(v[0], int(re.sub(r"[xXzZ]", str(xz_value), v[1]), 2))
for v in signal_handle.tv
Expand Down
6 changes: 5 additions & 1 deletion tests/test_vcdreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ def test_vcdreader():
assert (j_state.signed == True)

j_regex = vcd_reader.load_waves(
"tb.u0.@J_([a-z]+)\[3:0\]", "tb.tck", signed=True, sample_on_posedge=False
r"tb.u0.@J_([a-z]+\[3:0\])", "tb.tck", signed=True, sample_on_posedge=False
)

print(j_regex)
assert len(j_regex) == 2
for k,v in j_regex.items():
assert(len(k) == 1 and len(k[0]) == 1)
assert(k[0][0] in ['next[3:0]','state[3:0]'])
assert(v.width == 4)
14 changes: 7 additions & 7 deletions tests/test_waveform.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
import numpy as np
from src.wavekit.waveform import Waveform
from wavekit.waveform import Waveform


def test_waveform():
Expand Down Expand Up @@ -162,14 +162,14 @@ def test_waveform():
time = clock*10
a = Waveform(value, clock, time, width=width, signed=False)
b = a.rise()
assert np.all(b.value == np.array([1, 1, 1]))
assert np.all(b.clock == np.array([1, 3, 10]))
assert np.all(b.time == np.array([1, 3, 10]) * 10)
assert np.all(b.value == np.array([0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0]))
assert np.all(b.clock == a.clock)
assert np.all(b.time == a.time)

b = a.fall()
assert np.all(b.value == np.array([0, 0, 0]))
assert np.all(b.clock == np.array([2, 7, 11]))
assert np.all(b.time == np.array([2, 7, 11]) * 10)
assert np.all(b.value == np.array([0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1]))
assert np.all(b.clock == a.clock)
assert np.all(b.time == a.time)

assert (np.all(a.compress().value == np.array([0, 1, 0, 1, 0, 1, 0])))

Expand Down

0 comments on commit 76ba373

Please sign in to comment.