diff --git a/src/finn/custom_op/fpgadataflow/rtl/thresholding_rtl.py b/src/finn/custom_op/fpgadataflow/rtl/thresholding_rtl.py index dda04f70f3..ec875858ff 100644 --- a/src/finn/custom_op/fpgadataflow/rtl/thresholding_rtl.py +++ b/src/finn/custom_op/fpgadataflow/rtl/thresholding_rtl.py @@ -175,20 +175,23 @@ def prepare_codegen_rtl_values(self, model): o_bitwidth = DataType[output_data_type].bitwidth() # The RTL expects 2^N-1 thresholds, but narrow range quantization will result in - # one less threshold, prepending a dummy threshold and reducing bias by 1 to compensate. + # one less threshold, prepending a dummy threshold (minimal possible value determined by + # input data type) and decrease the bias by 1. + # Additionally, increase number of threshold steps to reflect new shape expected_thresholds = 2**o_bitwidth - 1 n_thres_steps = self.get_nodeattr("numSteps") - if expected_thresholds != n_thres_steps and DataType[input_data_type].signed() is not True: - min_val = np.amin(thresholds, axis=1) + if expected_thresholds != n_thres_steps: + min_val = DataType[input_data_type].min() thresholds = np.insert(thresholds, 0, min_val, axis=1) bias = bias - 1 + n_thres_steps += 1 # add dummy dimension as final dimension (that's what gets packed with next call) - thresholds = np.expand_dims(thresholds, axis=-1) + t_expand = np.expand_dims(thresholds, axis=-1) wdt = self.get_weight_datatype() bw_hexdigit = roundup_to_integer_multiple(wdt.bitwidth(), 4) t_packed = pack_innermost_dim_as_hex_string( - thresholds, + t_expand, wdt, bw_hexdigit, prefix="", @@ -199,8 +202,8 @@ def prepare_codegen_rtl_values(self, model): num_channels = self.get_nodeattr("NumChannels") # number of channels # If a single threshold value is found, broadcast the value - expected_shape = (num_channels, n_thres_steps) - if t_packed.shape == (1, 1): + expected_shape = (num_channels, expected_thresholds) + if t_packed.shape != expected_shape: t_packed = np.broadcast_to(t_packed, expected_shape) channel_fold = int(num_channels / pe) @@ -224,6 +227,10 @@ def prepare_codegen_rtl_values(self, model): f.write(val + "\n") code_gen_dict["$THRESHOLDS_PATH$"] = ['"./%s_"' % self.onnx_node.name] + if self.get_nodeattr("runtime_writeable_weights") == 1: + thresh_file_name = f"{t_path}/memblock.dat" + self.make_weight_file(thresholds, "decoupled", thresh_file_name) + # Identify the module name code_gen_dict["$MODULE_NAME_AXI_WRAPPER$"] = [ self.get_verilog_top_module_name() + "_axi_wrapper" @@ -255,7 +262,6 @@ def prepare_codegen_rtl_values(self, model): o_bits = 1 + math.ceil( math.log2(-bias if -bias >= 2 ** (o_bitwidth - 1) else 2**o_bitwidth + bias) ) - code_gen_dict["$O_BITS$"] = [str(int(o_bits))] rt_weights = self.get_nodeattr("runtime_writeable_weights") @@ -322,10 +328,6 @@ def generate_hdl(self, model, fpgapart, clk): # by PyVerilator and IPI generation self.set_nodeattr("gen_top_module", code_gen_dict["$TOP_MODULE$"][0]) - weights = model.get_initializer(self.onnx_node.input[1]) - weights_fname = f"{code_gen_dir}/memblock.dat" - self.make_weight_file(weights, "decoupled", weights_fname) - for rtl_file_path in self.get_rtl_file_paths(): # read in original RTL template file template_data = self.get_rtl_template_data(rtl_file_path) @@ -513,27 +515,16 @@ def make_weight_file(self, weights, weight_file_mode, weight_file_name): * weight_file_name : filename for the weight file to be generated """ - threshold_tensor = self.get_hw_compatible_threshold_tensor(weights) - tdt = self.get_weight_datatype() - assert np.vectorize(tdt.allowed)( - threshold_tensor - ).all(), "Thresholds can't be expressed with type %s" % str(tdt) - + thresholds = weights pe = self.get_nodeattr("PE") ch = self.get_nodeattr("NumChannels") - n_thres_steps = self.get_nodeattr("numSteps") - - # If a single threshold value is found, broadcast the value - n_thres_steps = self.get_nodeattr("numSteps") - expected_shape = (ch, n_thres_steps) - if weights.shape == (1, 1): - weights = np.broadcast_to(weights, expected_shape) - - odt = self.get_output_datatype().bitwidth() - width_padded = roundup_to_integer_multiple(weights.shape[1], 2**odt) - weight_padded = np.zeros((weights.shape[0], width_padded)) - weight_padded[: weights.shape[0], :n_thres_steps] = weights - weight_stream = [] + output_data_type = self.get_nodeattr("outputDataType") # output precision + o_bitwidth = DataType[output_data_type].bitwidth() + n_thres_steps = 2**o_bitwidth - 1 + width_padded = roundup_to_integer_multiple(thresholds.shape[1], 2**o_bitwidth) + thresh_padded = np.zeros((thresholds.shape[0], width_padded)) + thresh_padded[: thresholds.shape[0], :n_thres_steps] = thresholds + thresh_stream = [] wdt = self.get_weight_datatype() bw_hexdigit = roundup_to_integer_multiple(wdt.bitwidth(), 32) padding = np.zeros(width_padded, dtype=np.int32) @@ -543,18 +534,18 @@ def make_weight_file(self, weights, weight_file_mode, weight_file_name): for fold in range(cf): for c in range(2 ** (pe - 1).bit_length()): if (c == 0 or c % pe != 0) and c < pe: - for w in weight_padded[chan_ind]: - w_packed = pack_innermost_dim_as_hex_string( - [w], wdt, bw_hexdigit, prefix="" + for t in thresh_padded[chan_ind]: + t_packed = pack_innermost_dim_as_hex_string( + [t], wdt, bw_hexdigit, prefix="" ).item() - weight_stream.append(w_packed) + thresh_stream.append(t_packed) chan_ind += 1 else: for z in padding: - w_packed = pack_innermost_dim_as_hex_string( + t_packed = pack_innermost_dim_as_hex_string( [z], wdt, bw_hexdigit, prefix="" ).item() - weight_stream.append(w_packed) + thresh_stream.append(t_packed) with open(weight_file_name, "w") as f: - for val in weight_stream: + for val in thresh_stream: f.write(val + "\n") diff --git a/src/finn/custom_op/fpgadataflow/thresholding.py b/src/finn/custom_op/fpgadataflow/thresholding.py index dde813a293..12cb76be4e 100644 --- a/src/finn/custom_op/fpgadataflow/thresholding.py +++ b/src/finn/custom_op/fpgadataflow/thresholding.py @@ -242,6 +242,7 @@ def execute_node(self, context, graph): node = self.onnx_node inp_values = context[node.input[0]] th_val = context[node.input[1]] + out_bias = self.get_nodeattr("ActVal") # MT expects inputs to be in the shape (N,C,H,W) or (N, C) # if 4D then input values in context are (N,H,W,C) and need to # be transposed. @@ -249,16 +250,13 @@ def execute_node(self, context, graph): is_4d = len(inp_values.shape) == 4 if is_4d: inp_values = np.transpose(inp_values, (0, 3, 1, 2)) - y = multithreshold(inp_values, th_val) + y = multithreshold(inp_values, th_val, out_bias=out_bias) if is_4d: y = y.transpose(0, 2, 3, 1) act = DataType[self.get_nodeattr("outputDataType")] if act == DataType["BIPOLAR"]: # binary to bipolar y = 2 * y - 1 - else: - # signed offset - y += act.min() context[node.output[0]] = y def calc_tmem(self): diff --git a/tests/fpgadataflow/test_convert_to_hw_thresholding.py b/tests/fpgadataflow/test_convert_to_hw_thresholding.py deleted file mode 100755 index 63cb5986e1..0000000000 --- a/tests/fpgadataflow/test_convert_to_hw_thresholding.py +++ /dev/null @@ -1,205 +0,0 @@ -# Copyright (C) 2024, Advanced Micro Devices, Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# * Neither the name of FINN nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import pytest - -import numpy as np -from onnx import TensorProto, helper -from qonnx.core.datatype import DataType -from qonnx.core.modelwrapper import ModelWrapper -from qonnx.custom_op.general.multithreshold import multithreshold -from qonnx.custom_op.registry import getCustomOp -from qonnx.transformation.general import GiveUniqueNodeNames -from qonnx.transformation.infer_datatypes import InferDataTypes -from qonnx.transformation.infer_shapes import InferShapes -from qonnx.util.basic import gen_finn_dt_tensor - -import finn.core.onnx_exec as oxe -from finn.transformation.fpgadataflow.convert_to_hw_layers import InferThresholdingLayer -from finn.transformation.fpgadataflow.specialize_layers import SpecializeLayers - -test_fpga_part = "xczu3eg-sbva484-1-e" -target_clk_ns = 5 - - -# Helper functions -def sort_thresholds_increasing(thresholds): - return np.sort(thresholds, axis=1) - - -def prepare_inputs(input_tensor): - return {"inp": input_tensor} - - -# n = batch, c = channel, h = height, w = width of feature map -# Standard = NCHW; FINN = NHWC -# Convert from NHWC(FINN) to NCHW(Standard) -def layout_FINN2NCHW(data): - return np.transpose(data, (0, 3, 1, 2)) - - -# Convert from NCHW(Standard) to NHWC(FINN) -def layout_NCHW2FINN(data): - return np.transpose(data, (0, 2, 3, 1)) - - -def generate_random_threshold_values(input_data_type, num_input_channels, num_steps): - return np.random.randint( - input_data_type.min(), - input_data_type.max() + 1, - (num_input_channels, num_steps), - ).astype(np.float32) - - -def generate_pe_value(fold, num_input_channels): - if fold == -1: - fold = num_input_channels - pe = num_input_channels // fold - assert num_input_channels % pe == 0 - return pe - - -def make_single_multithresholding_modelwrapper( - thresholds, - pe, - input_data_type, - output_data_type, - activation_bias, - num_input_vecs, -): - NumChannels = thresholds.shape[0] - - inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, num_input_vecs + [NumChannels]) - outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, num_input_vecs + [NumChannels]) - - node_inp_list = ["inp", "thresh"] - - Multithresholding_node = helper.make_node( - "MultiThreshold", - node_inp_list, - ["outp"], - domain="qonnx.custom_op.general", - out_dtype=output_data_type.name, - out_bias=float(activation_bias), - out_scale=1.0, - ) - - graph = helper.make_graph( - nodes=[Multithresholding_node], - name="multithresholding_graph", - inputs=[inp], - outputs=[outp], - ) - - model = helper.make_model(graph, producer_name="multithresholding-model") - model = ModelWrapper(model) - model = model.transform(InferShapes()) - model = model.transform(InferDataTypes()) - model = model.transform(GiveUniqueNodeNames()) - - model.set_tensor_datatype("inp", input_data_type) - model.set_tensor_datatype("outp", output_data_type) - - model.set_tensor_datatype("thresh", input_data_type) - model.set_initializer("thresh", thresholds) - return model - - -# N.B. Fold values where C % PE != 0 fail -@pytest.mark.parametrize("activation", [DataType["INT4"], DataType["BIPOLAR"]]) -@pytest.mark.parametrize("input_data_type", [DataType["INT16"], DataType["UINT16"]]) -@pytest.mark.parametrize("fold", [-1, 1, 2, 4, 6]) -@pytest.mark.parametrize("num_input_channels", [16]) -@pytest.mark.parametrize("impl_style", ["hls", "rtl"]) -@pytest.mark.fpgadataflow -@pytest.mark.vivado -def test_convert_multithreshold_to_hardware( - impl_style, - activation, - input_data_type, - fold, - num_input_channels, -): - # Handle inputs to the test - pe = generate_pe_value(fold, num_input_channels) - num_steps = activation.get_num_possible_values() - 1 - - # Other non-input parameters - num_input_vecs = [1, 2, 2] - output_data_type = activation - if output_data_type == DataType["BIPOLAR"]: - activation_bias = 0 - else: - activation_bias = output_data_type.min() - - # Generate random thresholds and sort in ascending order - thresholds = generate_random_threshold_values(input_data_type, num_input_channels, num_steps) - - # provide non-decreasing/ascending thresholds - thresholds = sort_thresholds_increasing(thresholds) - - # Make a Multithreshold graph and convert to thresholding binary search node - model = make_single_multithresholding_modelwrapper( - thresholds, - pe, - input_data_type, - output_data_type, - activation_bias, - num_input_vecs, - ) - - model = model.transform(InferThresholdingLayer()) - - # Perform functional validation of the InferThresholdingLayer transform - x = gen_finn_dt_tensor(input_data_type, tuple(num_input_vecs + [num_input_channels])) - - x_nchw = layout_FINN2NCHW(x) - y_expected = multithreshold(x_nchw, thresholds) - - # convert back to NHWC for comparison to hw outputs - y_expected = layout_NCHW2FINN(y_expected) - if activation == DataType["BIPOLAR"]: - # binary to bipolar - y_expected = 2 * y_expected - 1 - else: - # signed offset - y_expected += activation.min() - - input_dict = prepare_inputs(x) - y_produced = oxe.execute_onnx(model, input_dict)["outp"] - - assert (y_produced == y_expected).all() - - # Transform to the specified implementation style, either the - # RTL or HLS according to test parameters - node = model.get_nodes_by_op_type(model.graph.node[0].op_type)[0] - inst = getCustomOp(node) - inst.set_nodeattr("preferred_impl_style", impl_style) - model = model.transform(SpecializeLayers()) - model = model.transform(InferShapes()) - assert model.graph.node[0].op_type == "Thresholding_" + str(impl_style) diff --git a/tests/fpgadataflow/test_fpgadataflow_thresholding.py b/tests/fpgadataflow/test_fpgadataflow_thresholding.py index a6e7e41596..88e4247c2a 100644 --- a/tests/fpgadataflow/test_fpgadataflow_thresholding.py +++ b/tests/fpgadataflow/test_fpgadataflow_thresholding.py @@ -29,24 +29,21 @@ import pytest import numpy as np -import os from onnx import TensorProto, helper -from pyverilator.util.axi_utils import axilite_read, axilite_write from qonnx.core.datatype import DataType from qonnx.core.modelwrapper import ModelWrapper -from qonnx.custom_op.general.multithreshold import multithreshold from qonnx.custom_op.registry import getCustomOp from qonnx.transformation.general import GiveUniqueNodeNames -from qonnx.util.basic import gen_finn_dt_tensor, qonnx_make_model +from qonnx.transformation.infer_datatypes import InferDataTypes +from qonnx.transformation.infer_shapes import InferShapes +from qonnx.util.basic import gen_finn_dt_tensor import finn.core.onnx_exec as oxe from finn.analysis.fpgadataflow.exp_cycles_per_layer import exp_cycles_per_layer from finn.analysis.fpgadataflow.hls_synth_res_estimation import hls_synth_res_estimation -from finn.core.rtlsim_exec import rtlsim_exec from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim -from finn.transformation.fpgadataflow.create_stitched_ip import CreateStitchedIP +from finn.transformation.fpgadataflow.convert_to_hw_layers import InferThresholdingLayer from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP -from finn.transformation.fpgadataflow.insert_fifo import InsertFIFO from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim from finn.transformation.fpgadataflow.prepare_ip import PrepareIP from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim @@ -57,7 +54,14 @@ target_clk_ns = 5 -def generate_random_threshold_values(input_data_type, num_input_channels, num_steps): +def generate_random_threshold_values( + input_data_type, num_input_channels, num_steps, narrow=False, per_tensor=False +): + if per_tensor: + num_input_channels = 1 + if narrow: + num_steps -= 1 + return np.random.randint( input_data_type.min(), input_data_type.max() + 1, @@ -69,76 +73,84 @@ def sort_thresholds_increasing(thresholds): return np.sort(thresholds, axis=1) -# n = batch, c = channel, h = height, w = width of feature map -# Standard = NCHW; FINN = NHWC -# Convert from NHWC(FINN) to NCHW(Standard) -def layout_FINN2NCHW(data): - return np.transpose(data, (0, 3, 1, 2)) - - -# Convert from NCHW(Standard) to NHWC(FINN) -def layout_NCHW2FINN(data): - return np.transpose(data, (0, 2, 3, 1)) - - -def make_single_thresholding_modelwrapper(impl_style, T, idt, odt, actval, n_inp_vecs): - NumChannels = T.shape[0] - - inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, n_inp_vecs + [NumChannels]) - outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, n_inp_vecs + [NumChannels]) +def make_single_multithresholding_modelwrapper( + thresholds, + input_data_type, + output_data_type, + activation_bias, + num_input_vecs, + num_channels, +): + inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, num_input_vecs + [num_channels]) + thresh = helper.make_tensor_value_info("thresh", TensorProto.FLOAT, thresholds.shape) + outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, num_input_vecs + [num_channels]) node_inp_list = ["inp", "thresh"] - Thresholding_node = helper.make_node( - "Thresholding", + Multithresholding_node = helper.make_node( + "MultiThreshold", node_inp_list, ["outp"], - domain="finn.custom_op.fpgadataflow", - backend="fpgadataflow", - NumChannels=NumChannels, - numSteps=T.shape[1], - inputDataType=idt.name, - weightDataType=idt.name, # will be set by MinimizeAccumulatorWidth - outputDataType=odt.name, - ActVal=actval, - numInputVectors=n_inp_vecs, - preferred_impl_style=impl_style, + domain="qonnx.custom_op.general", + out_dtype=output_data_type.name, + out_bias=float(activation_bias), + out_scale=1.0, + data_layout="NHWC", ) + graph = helper.make_graph( - nodes=[Thresholding_node], - name="thresholding_graph", + nodes=[Multithresholding_node], + name="multithresholding_graph", inputs=[inp], outputs=[outp], + value_info=[thresh], ) - model = qonnx_make_model(graph, producer_name="thresholding-model") + model = helper.make_model(graph, producer_name="multithresholding-model") model = ModelWrapper(model) + model = model.transform(InferShapes()) + model = model.transform(InferDataTypes()) + model = model.transform(GiveUniqueNodeNames()) - model.set_tensor_datatype("inp", idt) - model.set_tensor_datatype("outp", odt) + model.set_tensor_datatype("inp", input_data_type) + model.set_tensor_datatype("outp", output_data_type) - model.set_tensor_datatype("thresh", idt) - model.set_initializer("thresh", T) + model.set_tensor_datatype("thresh", input_data_type) + model.set_initializer("thresh", thresholds) return model -# activation: None or DataType -@pytest.mark.parametrize("act", [DataType["INT4"], DataType["BIPOLAR"]]) -# input datatype -@pytest.mark.parametrize("idt", [DataType["INT16"], DataType["UINT16"]]) -# folding, -1 is maximum possible -@pytest.mark.parametrize("nf", [-1, 2, 1]) -# number of input features -@pytest.mark.parametrize("ich", [16]) -# execution mode +@pytest.mark.parametrize("num_input_channels", [6, 16]) +@pytest.mark.parametrize( + "num_input_vecs", + [ + [1], + [1, 2, 2], + ], +) +@pytest.mark.parametrize("activation", [DataType["INT4"], DataType["BIPOLAR"]]) +@pytest.mark.parametrize("input_data_type", [DataType["INT8"], DataType["UINT8"]]) +@pytest.mark.parametrize("fold", [-1, 1, 2]) +@pytest.mark.parametrize("narrow", [True, False]) +@pytest.mark.parametrize("per_tensor", [True, False]) +@pytest.mark.parametrize("impl_style", ["hls", "rtl"]) @pytest.mark.parametrize("exec_mode", ["cppsim", "rtlsim"]) -# memory mode @pytest.mark.parametrize("mem_mode", ["internal_embedded", "internal_decoupled"]) -@pytest.mark.parametrize("impl_style", ["rtl", "hls"]) @pytest.mark.fpgadataflow @pytest.mark.vivado @pytest.mark.slow -def test_fpgadataflow_thresholding(impl_style, idt, act, nf, ich, exec_mode, mem_mode): +def test_fpgadataflow_thresholding( + num_input_channels, + num_input_vecs, + activation, + input_data_type, + fold, + narrow, + per_tensor, + impl_style, + exec_mode, + mem_mode, +): # the mem_mode parameter can only be used for the hls thresholding # so the test will only be executed once for impl_style=rtl and once skipped # when the mem_mode is varied. Otherwise, the same test configuration would always @@ -147,66 +159,72 @@ def test_fpgadataflow_thresholding(impl_style, idt, act, nf, ich, exec_mode, mem pytest.skip( "Skip, because test is identical to impl_style=rtl and mem_mode=internal_embedded" ) - if nf == -1: - nf = ich - pe = ich // nf - n_inp_vecs = [1, 2, 2] - assert ich % pe == 0 - - # generate input data, data layout is NHWC for FINN - x = gen_finn_dt_tensor(idt, tuple(n_inp_vecs + [ich])) - - odt = act - n_steps = act.get_num_possible_values() - 1 - - # Generate random, non-decreasing thresholds - thresholds = generate_random_threshold_values(idt, ich, n_steps) - - thresholds = sort_thresholds_increasing(thresholds) - - if odt == DataType["BIPOLAR"]: - actval = 0 + if narrow and activation == DataType["BIPOLAR"]: + pytest.skip("Narrow needs to be false with biploar activation.") + num_steps = activation.get_num_possible_values() - 1 + + if fold == -1: + fold = num_input_channels + pe = num_input_channels // fold + if num_input_channels % pe != 0: + pytest.skip("Invalid folding configuration. Skipping test.") + + output_data_type = activation + if activation == DataType["BIPOLAR"]: + activation_bias = 0 else: - actval = odt.min() + activation_bias = activation.min() + if narrow: + activation_bias += 1 - # Build DUT - model = make_single_thresholding_modelwrapper( - impl_style, thresholds, idt, odt, actval, n_inp_vecs + # Generate random thresholds and sort in ascending order + thresholds = generate_random_threshold_values( + input_data_type, num_input_channels, num_steps, narrow, per_tensor ) - # Expected Reference output - # multithreshold util fxn wants NCHW input, not NHWC - x_nchw = layout_FINN2NCHW(x) - y = multithreshold(x_nchw, thresholds) + # provide non-decreasing/ascending thresholds + thresholds = sort_thresholds_increasing(thresholds) - # convert back to NHWC for comparison to hw outputs - y = layout_NCHW2FINN(y) - if act == DataType["BIPOLAR"]: - # binary to bipolar - y = 2 * y - 1 - else: - # signed offset - y += act.min() + # Make a Multithreshold graph and convert to thresholding binary search node + model = make_single_multithresholding_modelwrapper( + thresholds, + input_data_type, + output_data_type, + activation_bias, + num_input_vecs, + num_input_channels, + ) - oshape = model.get_tensor_shape("outp") - y_expected = y.reshape(oshape) + # calculate reference output + x = gen_finn_dt_tensor(input_data_type, tuple(num_input_vecs + [num_input_channels])) - # package input data as dictionary - input_dict = {"inp": x} + input_dict = {model.graph.input[0].name: x} + y_expected = oxe.execute_onnx(model, input_dict)[model.graph.output[0].name] - # execute DUT - y_produced = oxe.execute_onnx(model, input_dict)["outp"] + if output_data_type == DataType["BIPOLAR"]: + # binary to bipolar + y_expected = 2 * y_expected - 1 - y_produced = y_produced.reshape(y_expected.shape) + model = model.transform(InferThresholdingLayer()) + # Perform functional validation of the InferThresholdingLayer transform + y_produced = oxe.execute_onnx(model, input_dict)[model.graph.output[0].name] assert (y_produced == y_expected).all() + # Transform to the specified implementation style, either the + # RTL or HLS according to test parameters + node = model.get_nodes_by_op_type(model.graph.node[0].op_type)[0] + inst = getCustomOp(node) + inst.set_nodeattr("preferred_impl_style", impl_style) model = model.transform(SpecializeLayers()) - # Make sure that SpecializeLayers did not default to HLS implementation unexpectedly + model = model.transform(InferShapes()) assert model.graph.node[0].op_type == "Thresholding_" + str(impl_style) - node = model.graph.node[0] + + node = model.get_nodes_by_op_type(model.graph.node[0].op_type)[0] inst = getCustomOp(node) inst.set_nodeattr("PE", pe) + model = model.transform(GiveUniqueNodeNames()) + if impl_style == "hls": inst.set_nodeattr("mem_mode", mem_mode) @@ -215,19 +233,12 @@ def test_fpgadataflow_thresholding(impl_style, idt, act, nf, ich, exec_mode, mem model = model.transform(CompileCppSim()) model = model.transform(SetExecMode("cppsim")) elif exec_mode == "rtlsim": - model = model.transform(SetExecMode("rtlsim")) - model = model.transform(GiveUniqueNodeNames()) model = model.transform(PrepareIP(test_fpga_part, target_clk_ns)) + model = model.transform(SetExecMode("rtlsim")) model = model.transform(HLSSynthIP()) model = model.transform(PrepareRTLSim()) - else: - raise Exception("Unknown exec_mode") - - # execute model - y_produced = oxe.execute_onnx(model, input_dict)["outp"] - - y_produced = y_produced.reshape(y_expected.shape) + y_produced = oxe.execute_onnx(model, input_dict)[model.graph.output[0].name] assert (y_produced == y_expected).all() if exec_mode == "rtlsim": @@ -241,219 +252,3 @@ def test_fpgadataflow_thresholding(impl_style, idt, act, nf, ich, exec_mode, mem exp_cycles = exp_cycles_dict[node.name] assert np.isclose(exp_cycles, cycles_rtlsim, atol=15) assert exp_cycles != 0 - - -@pytest.mark.parametrize("impl_style", ["rtl", "hls"]) -# configuration (ch, pe) -@pytest.mark.parametrize("cfg", [(1, 1), (6, 2), (6, 3), (8, 4)]) -@pytest.mark.fpgadataflow -@pytest.mark.vivado -def test_runtime_thresholds_read(impl_style, cfg): - """Read back threshold weights during runtime - - 1. Create random initial weights T - 2. Execute model - 3. Read back weights via AXI - 4. Compare with initial weights T - """ - ch = cfg[0] - pe = cfg[1] - n_inp_vecs = [1, 2, 2] - hls_mem_mode = "internal_decoupled" - act = DataType["INT4"] - idt = DataType["INT16"] - odt = act - n_steps = act.get_num_possible_values() - 1 - np.random.seed(2) - T = np.random.randint(idt.min(), idt.max() + 1, (ch, n_steps)).astype(np.float32) - # provide non-decreasing thresholds - T = np.sort(T, axis=1) - - if odt == DataType["BIPOLAR"]: - actval = 0 - else: - actval = odt.min() - - model = make_single_thresholding_modelwrapper(impl_style, T, idt, odt, actval, n_inp_vecs) - model = model.transform(SpecializeLayers()) - - # Make sure that specialize layer did not default to HLS implementation - assert model.graph.node[0].op_type == "Thresholding_" + str(impl_style) - - node = model.get_nodes_by_op_type(f"Thresholding_{impl_style}")[0] - op_inst = getCustomOp(node) - op_inst.set_nodeattr("PE", pe) - if impl_style == "hls": - op_inst.set_nodeattr("mem_mode", hls_mem_mode) - op_inst.set_nodeattr("runtime_writeable_weights", 1) - - dat_fname = f"old_weights_{cfg}.dat" - op_inst.make_weight_file(T, "decoupled_runtime", dat_fname) - with open(dat_fname, "r") as f: - old_weight_stream = f.read().strip() - os.remove(dat_fname) - old_weight_stream = map(lambda x: int(x, 16), old_weight_stream.split("\n")) - old_weight_stream = list(old_weight_stream) - # need to create stitched IP for runtime weight testing - model = model.transform(InsertFIFO(True)) - model = model.transform(SpecializeLayers()) - model = model.transform(GiveUniqueNodeNames()) - model = model.transform(PrepareIP(test_fpga_part, target_clk_ns)) - model = model.transform(HLSSynthIP()) - model = model.transform(CreateStitchedIP(test_fpga_part, target_clk_ns)) - model = model.transform(PrepareRTLSim()) - model.set_metadata_prop("exec_mode", "rtlsim") - # add two copies of the input tensor as the first one is just used to - # "flush out" the pipeline (as mvau already starts receiving old weights while - # we read/write new ones and reads seem to cause a disturbance too) - # generate input data - in_tensor = gen_finn_dt_tensor(idt, tuple(n_inp_vecs + [ch])) - in_tensor = np.tile(in_tensor, (2, 1, 1, 1)) - - exec_ctx = {"inp": in_tensor} - extracted_weight_stream = [] - - def read_weights(sim): - addr = 0 - for i in range(len(old_weight_stream)): - extracted_weight_stream.append(axilite_read(sim, addr, basename="s_axilite_0_")) - addr += 4 - - rtlsim_exec(model, exec_ctx, pre_hook=read_weights) - - # Validate the AXI Read weights - assert extracted_weight_stream == old_weight_stream - - y = exec_ctx["outp"][0] - - # multithreshold util fxn wants NCHW input, not NHWC - expected = multithreshold(np.transpose(in_tensor, (0, 3, 1, 2)), T) - # convert back to NHWC for comparison to hw outputs - expected = np.transpose(expected, (0, 2, 3, 1))[1] - - if act == DataType["BIPOLAR"]: - # binary to bipolarW - expected = 2 * expected - 1 - else: - # signed offset - expected += act.min() - - # Validate the output is as expected - assert (y == expected).all() - - -@pytest.mark.parametrize("impl_style", ["hls", "rtl"]) -# configuration (ch, pe) -@pytest.mark.parametrize("cfg", [(1, 1), (6, 2), (6, 3), (8, 4)]) -@pytest.mark.fpgadataflow -@pytest.mark.vivado -def test_runtime_thresholds_write(impl_style, cfg): - """Write threshold weights during runtime - - 1. Create random initial weights T_init - 2. Create model with initial weights - 3. Create new set of weights T_write - 4. Write T_write using AXI bus - 5. Read back using AXI bus to T_read - 6. Compare T_write and T_read - 7. Validate outputs with expected vectors - """ - ch = cfg[0] - pe = cfg[1] - - n_inp_vecs = [1, 2, 2] - hls_mem_mode = "internal_decoupled" - act = DataType["INT4"] - idt = DataType["INT16"] - - odt = act - n_steps = act.get_num_possible_values() - 1 - np.random.seed(2) - T_init = np.random.randint(idt.min(), idt.max() + 1, (ch, n_steps)).astype(np.float32) - # provide non-decreasing thresholds - T_init = np.sort(T_init, axis=1) - - if odt == DataType["BIPOLAR"]: - actval = 0 - else: - actval = odt.min() - - model = make_single_thresholding_modelwrapper(impl_style, T_init, idt, odt, actval, n_inp_vecs) - model = model.transform(SpecializeLayers()) - - # Validate that specialize layer did not default to HLS implementation - assert model.graph.node[0].op_type == "Thresholding_" + str(impl_style) - - op_inst = getCustomOp(model.graph.node[0]) - op_inst.set_nodeattr("PE", pe) - if impl_style == "hls": - op_inst.set_nodeattr("mem_mode", hls_mem_mode) - op_inst.set_nodeattr("runtime_writeable_weights", 1) - - # Make new weights for runtime write - np.random.seed(4) - T_write = np.random.randint(idt.min(), idt.max() + 1, (ch, n_steps)).astype(np.float32) - # provide non-decreasing thresholds - T_write = np.sort(T_write, axis=1) - - dat_fname = f"T_write_{cfg}.dat" # distinguish fname per paramter for distributed testing - op_inst.make_weight_file(T_write, "decoupled_runtime", dat_fname) - with open(dat_fname, "r") as f: - T_write_stream = f.read().strip() - os.remove(dat_fname) - - T_write_stream = map(lambda x: int(x, 16), T_write_stream.split("\n")) - T_write_stream = list(T_write_stream) - - # need to create stitched IP for runtime weight testing - model = model.transform(InsertFIFO(True)) - model = model.transform(SpecializeLayers()) - model = model.transform(GiveUniqueNodeNames()) - model = model.transform(PrepareIP(test_fpga_part, target_clk_ns)) - model = model.transform(HLSSynthIP()) - model = model.transform(CreateStitchedIP(test_fpga_part, target_clk_ns)) - model = model.transform(PrepareRTLSim()) - model.set_metadata_prop("exec_mode", "rtlsim") - # add two copies of the input tensor as the first one is just used to - # "flush out" the pipeline (as mvau already starts receiving old weights while - # we read/write new ones and reads seem to cause a disturbance too) - # generate input data - in_tensor = gen_finn_dt_tensor(idt, tuple(n_inp_vecs + [ch])) - in_tensor = np.tile(in_tensor, (2, 1, 1, 1)) - - exec_ctx_write = {"inp": in_tensor} - - def write_weights(sim): - addr = 0 - for nw in T_write_stream: - axilite_write(sim, addr, nw, basename="s_axilite_0_") - addr += 4 - - T_read_stream = [] - - def read_weights(sim): - addr = 0 - for i in range(len(T_write_stream)): - T_read_stream.append(axilite_read(sim, addr, basename="s_axilite_0_")) - addr += 4 - - rtlsim_exec(model, exec_ctx_write, pre_hook=write_weights, post_hook=read_weights) - - y = exec_ctx_write["outp"][1] - - assert T_read_stream == T_write_stream - - # multithreshold util fxn wants NCHW input, not NHWC - expected = multithreshold(np.transpose(in_tensor, (0, 3, 1, 2)), T_write) - # convert back to NHWC for comparison to hw outputs - expected = np.transpose(expected, (0, 2, 3, 1))[1] - - if act == DataType["BIPOLAR"]: - # binary to bipolarW - expected = 2 * expected - 1 - else: - # signed offset - expected += act.min() - - # Validate the output is as expected - assert (y == expected).all() diff --git a/tests/fpgadataflow/test_fpgadataflow_thresholding_runtime.py b/tests/fpgadataflow/test_fpgadataflow_thresholding_runtime.py new file mode 100644 index 0000000000..a9a2c79551 --- /dev/null +++ b/tests/fpgadataflow/test_fpgadataflow_thresholding_runtime.py @@ -0,0 +1,332 @@ +# Copyright (C) 2024, Advanced Micro Devices, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of FINN nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +import numpy as np +import os +from onnx import TensorProto, helper +from pyverilator.util.axi_utils import axilite_read, axilite_write +from qonnx.core.datatype import DataType +from qonnx.core.modelwrapper import ModelWrapper +from qonnx.custom_op.general.multithreshold import multithreshold +from qonnx.custom_op.registry import getCustomOp +from qonnx.transformation.general import GiveUniqueNodeNames +from qonnx.util.basic import gen_finn_dt_tensor, qonnx_make_model + +from finn.core.rtlsim_exec import rtlsim_exec +from finn.transformation.fpgadataflow.create_stitched_ip import CreateStitchedIP +from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP +from finn.transformation.fpgadataflow.insert_fifo import InsertFIFO +from finn.transformation.fpgadataflow.prepare_ip import PrepareIP +from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim +from finn.transformation.fpgadataflow.specialize_layers import SpecializeLayers + +test_fpga_part = "xczu3eg-sbva484-1-e" +target_clk_ns = 5 + + +def generate_random_threshold_values(input_data_type, num_input_channels, num_steps): + return np.random.randint( + input_data_type.min(), + input_data_type.max() + 1, + (num_input_channels, num_steps), + ).astype(np.float32) + + +def sort_thresholds_increasing(thresholds): + return np.sort(thresholds, axis=1) + + +# n = batch, c = channel, h = height, w = width of feature map +# Standard = NCHW; FINN = NHWC +# Convert from NHWC(FINN) to NCHW(Standard) +def layout_FINN2NCHW(data): + return np.transpose(data, (0, 3, 1, 2)) + + +# Convert from NCHW(Standard) to NHWC(FINN) +def layout_NCHW2FINN(data): + return np.transpose(data, (0, 2, 3, 1)) + + +def make_single_thresholding_modelwrapper(impl_style, T, idt, odt, actval, n_inp_vecs): + NumChannels = T.shape[0] + + inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, n_inp_vecs + [NumChannels]) + outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, n_inp_vecs + [NumChannels]) + + node_inp_list = ["inp", "thresh"] + + Thresholding_node = helper.make_node( + "Thresholding", + node_inp_list, + ["outp"], + domain="finn.custom_op.fpgadataflow", + backend="fpgadataflow", + NumChannels=NumChannels, + numSteps=T.shape[1], + inputDataType=idt.name, + weightDataType=idt.name, # will be set by MinimizeAccumulatorWidth + outputDataType=odt.name, + ActVal=actval, + numInputVectors=n_inp_vecs, + preferred_impl_style=impl_style, + ) + graph = helper.make_graph( + nodes=[Thresholding_node], + name="thresholding_graph", + inputs=[inp], + outputs=[outp], + ) + + model = qonnx_make_model(graph, producer_name="thresholding-model") + model = ModelWrapper(model) + + model.set_tensor_datatype("inp", idt) + model.set_tensor_datatype("outp", odt) + + model.set_tensor_datatype("thresh", idt) + model.set_initializer("thresh", T) + return model + + +@pytest.mark.parametrize("impl_style", ["rtl", "hls"]) +# configuration (ch, pe) +@pytest.mark.parametrize("cfg", [(1, 1), (6, 2), (6, 3), (8, 4)]) +@pytest.mark.fpgadataflow +@pytest.mark.vivado +def test_runtime_thresholds_read(impl_style, cfg): + """Read back threshold weights during runtime + + 1. Create random initial weights T + 2. Execute model + 3. Read back weights via AXI + 4. Compare with initial weights T + """ + ch = cfg[0] + pe = cfg[1] + n_inp_vecs = [1, 2, 2] + hls_mem_mode = "internal_decoupled" + act = DataType["INT4"] + idt = DataType["INT16"] + odt = act + n_steps = act.get_num_possible_values() - 1 + np.random.seed(2) + T = np.random.randint(idt.min(), idt.max() + 1, (ch, n_steps)).astype(np.float32) + # provide non-decreasing thresholds + T = np.sort(T, axis=1) + + if odt == DataType["BIPOLAR"]: + actval = 0 + else: + actval = odt.min() + + model = make_single_thresholding_modelwrapper(impl_style, T, idt, odt, actval, n_inp_vecs) + model = model.transform(SpecializeLayers()) + + # Make sure that specialize layer did not default to HLS implementation + assert model.graph.node[0].op_type == "Thresholding_" + str(impl_style) + + node = model.get_nodes_by_op_type(f"Thresholding_{impl_style}")[0] + op_inst = getCustomOp(node) + op_inst.set_nodeattr("PE", pe) + if impl_style == "hls": + op_inst.set_nodeattr("mem_mode", hls_mem_mode) + op_inst.set_nodeattr("runtime_writeable_weights", 1) + + dat_fname = f"old_weights_{cfg}.dat" + op_inst.make_weight_file(T, "decoupled_runtime", dat_fname) + with open(dat_fname, "r") as f: + old_weight_stream = f.read().strip() + os.remove(dat_fname) + old_weight_stream = map(lambda x: int(x, 16), old_weight_stream.split("\n")) + old_weight_stream = list(old_weight_stream) + # need to create stitched IP for runtime weight testing + model = model.transform(InsertFIFO(True)) + model = model.transform(SpecializeLayers()) + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(PrepareIP(test_fpga_part, target_clk_ns)) + model = model.transform(HLSSynthIP()) + model = model.transform(CreateStitchedIP(test_fpga_part, target_clk_ns)) + model = model.transform(PrepareRTLSim()) + model.set_metadata_prop("exec_mode", "rtlsim") + # add two copies of the input tensor as the first one is just used to + # "flush out" the pipeline (as mvau already starts receiving old weights while + # we read/write new ones and reads seem to cause a disturbance too) + # generate input data + in_tensor = gen_finn_dt_tensor(idt, tuple(n_inp_vecs + [ch])) + in_tensor = np.tile(in_tensor, (2, 1, 1, 1)) + + exec_ctx = {"inp": in_tensor} + extracted_weight_stream = [] + + def read_weights(sim): + addr = 0 + for i in range(len(old_weight_stream)): + extracted_weight_stream.append(axilite_read(sim, addr, basename="s_axilite_0_")) + addr += 4 + + rtlsim_exec(model, exec_ctx, pre_hook=read_weights) + + # Validate the AXI Read weights + assert extracted_weight_stream == old_weight_stream + + y = exec_ctx["outp"][0] + + # multithreshold util fxn wants NCHW input, not NHWC + expected = multithreshold(np.transpose(in_tensor, (0, 3, 1, 2)), T) + # convert back to NHWC for comparison to hw outputs + expected = np.transpose(expected, (0, 2, 3, 1))[1] + + if act == DataType["BIPOLAR"]: + # binary to bipolarW + expected = 2 * expected - 1 + else: + # signed offset + expected += act.min() + + # Validate the output is as expected + assert (y == expected).all() + + +@pytest.mark.parametrize("impl_style", ["hls", "rtl"]) +# configuration (ch, pe) +@pytest.mark.parametrize("cfg", [(1, 1), (6, 2), (6, 3), (8, 4)]) +@pytest.mark.fpgadataflow +@pytest.mark.vivado +def test_runtime_thresholds_write(impl_style, cfg): + """Write threshold weights during runtime + + 1. Create random initial weights T_init + 2. Create model with initial weights + 3. Create new set of weights T_write + 4. Write T_write using AXI bus + 5. Read back using AXI bus to T_read + 6. Compare T_write and T_read + 7. Validate outputs with expected vectors + """ + ch = cfg[0] + pe = cfg[1] + + n_inp_vecs = [1, 2, 2] + hls_mem_mode = "internal_decoupled" + act = DataType["INT4"] + idt = DataType["INT16"] + + odt = act + n_steps = act.get_num_possible_values() - 1 + np.random.seed(2) + T_init = np.random.randint(idt.min(), idt.max() + 1, (ch, n_steps)).astype(np.float32) + # provide non-decreasing thresholds + T_init = np.sort(T_init, axis=1) + + if odt == DataType["BIPOLAR"]: + actval = 0 + else: + actval = odt.min() + + model = make_single_thresholding_modelwrapper(impl_style, T_init, idt, odt, actval, n_inp_vecs) + model = model.transform(SpecializeLayers()) + + # Validate that specialize layer did not default to HLS implementation + assert model.graph.node[0].op_type == "Thresholding_" + str(impl_style) + + op_inst = getCustomOp(model.graph.node[0]) + op_inst.set_nodeattr("PE", pe) + if impl_style == "hls": + op_inst.set_nodeattr("mem_mode", hls_mem_mode) + op_inst.set_nodeattr("runtime_writeable_weights", 1) + + # Make new weights for runtime write + np.random.seed(4) + T_write = np.random.randint(idt.min(), idt.max() + 1, (ch, n_steps)).astype(np.float32) + # provide non-decreasing thresholds + T_write = np.sort(T_write, axis=1) + + dat_fname = f"T_write_{cfg}.dat" # distinguish fname per paramter for distributed testing + op_inst.make_weight_file(T_write, "decoupled_runtime", dat_fname) + with open(dat_fname, "r") as f: + T_write_stream = f.read().strip() + os.remove(dat_fname) + + T_write_stream = map(lambda x: int(x, 16), T_write_stream.split("\n")) + T_write_stream = list(T_write_stream) + + # need to create stitched IP for runtime weight testing + model = model.transform(InsertFIFO(True)) + model = model.transform(SpecializeLayers()) + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(PrepareIP(test_fpga_part, target_clk_ns)) + model = model.transform(HLSSynthIP()) + model = model.transform(CreateStitchedIP(test_fpga_part, target_clk_ns)) + model = model.transform(PrepareRTLSim()) + model.set_metadata_prop("exec_mode", "rtlsim") + # add two copies of the input tensor as the first one is just used to + # "flush out" the pipeline (as mvau already starts receiving old weights while + # we read/write new ones and reads seem to cause a disturbance too) + # generate input data + in_tensor = gen_finn_dt_tensor(idt, tuple(n_inp_vecs + [ch])) + in_tensor = np.tile(in_tensor, (2, 1, 1, 1)) + + exec_ctx_write = {"inp": in_tensor} + + def write_weights(sim): + addr = 0 + for nw in T_write_stream: + axilite_write(sim, addr, nw, basename="s_axilite_0_") + addr += 4 + + T_read_stream = [] + + def read_weights(sim): + addr = 0 + for i in range(len(T_write_stream)): + T_read_stream.append(axilite_read(sim, addr, basename="s_axilite_0_")) + addr += 4 + + rtlsim_exec(model, exec_ctx_write, pre_hook=write_weights, post_hook=read_weights) + + y = exec_ctx_write["outp"][1] + + assert T_read_stream == T_write_stream + + # multithreshold util fxn wants NCHW input, not NHWC + expected = multithreshold(np.transpose(in_tensor, (0, 3, 1, 2)), T_write) + # convert back to NHWC for comparison to hw outputs + expected = np.transpose(expected, (0, 2, 3, 1))[1] + + if act == DataType["BIPOLAR"]: + # binary to bipolarW + expected = 2 * expected - 1 + else: + # signed offset + expected += act.min() + + # Validate the output is as expected + assert (y == expected).all()