diff --git a/tests/analog_functions.py b/tests/analog_functions.py index e5f230f5..8881f049 100644 --- a/tests/analog_functions.py +++ b/tests/analog_functions.py @@ -13,9 +13,12 @@ import random import sys import reset_def_values as reset +from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file, plot_to_file_multiline from open_context import ctx_timeout, ctx from create_files import results_file, results_dir, csv +from shapefile import shape_gen, Shape + # dicts that will be saved to csv files shape_csv_vals = {} ampl_csv_vals = {} @@ -133,16 +136,8 @@ def test_amplitude(out_data, ref_data, n, ain, aout, channel, trig): # corr_amplitude_min -- correlation coefficient between the vector that holds the minimum amplitude values in the # input signal and the vector that holds the maximum amplitude values in the reference signal - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) + reset.analog_in(ain) reset.analog_out(aout) @@ -204,7 +199,7 @@ def test_amplitude(out_data, ref_data, n, ain, aout, channel, trig): data_string.append("Minimum amplitude computed:") data_string.append(str(min_in)) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) aout.stop(channel) return corr_amplitude_max, corr_amplitude_min @@ -229,16 +224,7 @@ def test_shape(channel, out_data, ref_data, ain, aout, trig, ch_ratio, shapename # phase_diff_vect-- vector that holds the phase difference between the input data and the reference data for each # signal shape - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) @@ -284,7 +270,7 @@ def test_shape(channel, out_data, ref_data, ain, aout, trig, ch_ratio, shapename "Correlation coefficient between " + shapename[i] + "signal and its reference:" + str(corr_shape)) data_string.append("Phase difference between " + shapename[i] + "signal and its reference:" + str(phase_diff)) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) aout.stop(channel) return corr_shape_vect, phase_diff_vect @@ -304,16 +290,8 @@ def phase_diff_ch0_ch1(aout, ain, trig): # Returns: # phase_diff_between_channels-- the phase difference between channels in degrees # - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) @@ -359,7 +337,7 @@ def phase_diff_ch0_ch1(aout, ain, trig): phasediff_csv['Ch1, ADCsr=' + str(sr)] = input_data[libm2k.ANALOG_IN_CHANNEL_2] if gen_reports: channel = 0 - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) save_data_to_csv(phasediff_csv, csv_path + 'ph_diff_channels.csv') aout.stop() @@ -380,16 +358,7 @@ def test_analog_trigger(channel, trig, aout, ain): # trig_test -- Vector that holds 1 for each trigger condition fulfilled and 0 otherwise # condition_name-- Vector that holds names of the trigger conditions - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) @@ -531,7 +500,7 @@ def test_analog_trigger(channel, trig, aout, ain): data_string.append("level set: " + str(high)) data_string.append("\nlevel read: " + str(input_data[delay])) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) aout.stop(channel) return trig_test, condition_name @@ -552,16 +521,7 @@ def test_offset(out_data, n, ain, aout, trig, channel): # Returns # corr_offset -- Correlation coefficient between the computed offset vector and the defined offset vector - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) @@ -604,7 +564,7 @@ def test_offset(out_data, n, ain, aout, trig, channel): data_string.append(str(in_offset)) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) corr_offset, _ = pearsonr(offset, in_offset) # compare the original offset vector with the average values obtained aout.stop(channel) @@ -623,16 +583,7 @@ def test_voltmeter_functionality(channel, ain, aout, ctx): # Returns: # voltmeter_ -- Vector thet holds 1 if the read voltage is in specified range and 0 otherwise - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) @@ -666,7 +617,7 @@ def test_voltmeter_functionality(channel, ain, aout, ctx): else: voltmeter_ = np.append(voltmeter_, 0) # the voltage is out of range if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) aout.stop(channel) return voltmeter_ @@ -699,16 +650,7 @@ def cyclic_buffer_test(aout, ain, channel, trig): # Returns: # cyclic_false -- Must be 1 if a single buffer was sent and succesfully recieved, 0 otherwise - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) dac_sr = 75000 adc_sr = 10000 min_periods = 3 @@ -754,16 +696,7 @@ def noncyclic_buffer_test(aout, ain, channel, trig, ctx): # Returns: # cyclic_false -- Must be 1 if a single buffer was sent and succesfully recieved, 0 otherwise - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) @@ -855,16 +788,7 @@ def compute_frequency(channel, ain, aout, trig): # ofreqs-- Vector that holds the frequencies of the output buffers # ifreqs-- Vector that holds the frequencies of the input buffers - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) test_name = "freq" data_string = [] @@ -924,7 +848,7 @@ def compute_frequency(channel, ain, aout, trig): data_string.append("Out signal frequency:" + str(out_freq)) data_string.append("In singal frequency:" + str(in_freq)) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) return ofreqs, ifreqs @@ -987,16 +911,7 @@ def test_oversampling_ratio(channel, ain, aout, trig): # test_osr -- Must be 1 if the computed oversampling ratio is equal with the set oversampling ratio # and 0 otherwise - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) @@ -1044,7 +959,7 @@ def test_oversampling_ratio(channel, ain, aout, trig): data_string.append("Oversampling ratios computed: \n" + str(verify_osr)) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) test_osr = 1 for i in range(len(osr)): if osr[i] != verify_osr[i]: @@ -1053,44 +968,6 @@ def test_oversampling_ratio(channel, ain, aout, trig): return test_osr -def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None, data_marked=None): - # Saves the plots in a separate folder - # Arguments: - # title -- Title of the plot - # data -- Data to be plotted - # filename -- Name of the file with the plot - # Keyword Arguments: - # xlabel -- Label of x-Axis (default: {None}) - # ylabel -- Label of y-Axis(default: {None}) - # data1 -- Data that should be plotted on the same plot(default: {None}) - # data_marked -- Data that represents specific points on the plot(default: {None}) - # plot the signals in a separate folder - plt.title(title) - if xlabel is not None: # if xlabel and ylabel are not specified there will be default values - plt.xlabel(xlabel) - else: - plt.xlabel('Samples') - if ylabel is not None: - plt.ylabel(ylabel) - else: - plt.ylabel('Voltage [V]') - plt.grid(visible=True) - plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case) - if data1 is not None: - plt.plot(data1) - if data_marked is not None: - plt.plot(data_marked, data[data_marked], 'xr') - plt.savefig(dir_name + "/" + filename) - plt.close() - return - - -def save_data_to_csv(csv_vals, csv_file): - df = DataFrame(csv_vals) - df.to_csv(csv_file) - return - - def channels_diff_in_samples(trig, channel, aout, ain): # Find if there is a sample delay between channels for the same signal and trigger # Arguments: @@ -1107,16 +984,7 @@ def channels_diff_in_samples(trig, channel, aout, ain): # dac_osr-- oversampling ratios set for Aout # freq -- frequency of the signals used for the test - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) reset.analog_in(ain) reset.analog_out(aout) # /len(samples_diff1[i]) reset.trigger(trig) @@ -1172,7 +1040,7 @@ def channels_diff_in_samples(trig, channel, aout, ain): data_string.append('Samples difference: ' + str(diff_osr)) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) save_data_to_csv(diff_in_samples0, csv_path + 'diffSamples_ch0_trigSrc_' + str(channel) + '.csv') save_data_to_csv(diff_in_samples1, csv_path + 'diffSamples_ch1_trigSrc_' + str(channel) + '.csv') aout.stop() @@ -1200,6 +1068,8 @@ def write_file(file, test_name, channel, data_string): file.write("\n\nAmplitude test on channel " + str(channel) + ": \n") elif test_name == "buffer_transition_glitch": file.write("\n\nTest buffer transition glitch on channel " + str(channel) + ": \n") + elif test_name == "aout_triggering": + file.write("\n\nTest aout start with trigger event on channel = " + str(channel) + ": \n") for i in range(len(data_string)): file.write(str(data_string[i]) + '\n') @@ -1317,16 +1187,7 @@ def is_spike(data, peak, threshold = 0.25): return percentage_dif > threshold def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude=1): - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) BUFFER_SIZE = 5_00_000 @@ -1401,11 +1262,416 @@ def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude= "Number of glitch peaks found in " + waveform + " signal :" + str(num_peaks)) if gen_reports: - write_file(file, test_name, channel, data_string) + write_file(file_name, test_name, channel, data_string) plot_to_file(f'Buffer Glitch , channel{channel}', data, dir_name, f'buffer_glitch_plot_ch{channel}_{waveform}.png', data_marked=filtered_peaks) - return num_peaks \ No newline at end of file + return num_peaks + + +def get_experiment_config_for_sample_hold(dac_sr): + cfg = {} + if dac_sr == 75_000_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 100_000_000 + cfg["buffer_size"] = 20_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 * 8 + cfg["offset"] = 0 + elif dac_sr == 7_500_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 100_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 + cfg["offset"] = 0 + elif dac_sr == 750_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 10_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 + cfg["offset"] = 0 + elif dac_sr == 75_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 1_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 + cfg["offset"] = 0 + elif dac_sr == 7_500: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 1_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 128 + cfg["offset"] = 0 + # 750 Hz ommited to avoid long test duration + else: + raise ValueError("Invalid DAC sample rate.") + return cfg + +def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn=None): + assert lower_bound < upper_bound, "Invalid bounds" + is_CH0_in_range = np.all((lower_bound <= data[0]) & (data[0] <= upper_bound)) + is_CH1_in_range = np.all((lower_bound <= data[1]) & (data[1] <= upper_bound)) + if chn is None: + return is_CH0_in_range and is_CH1_in_range + elif chn == libm2k.ANALOG_IN_CHANNEL_1: + return is_CH0_in_range + elif chn == libm2k.ANALOG_IN_CHANNEL_2: + return is_CH1_in_range + else: + raise ValueError(f"Unknown channel: {chn}") +def test_last_sample_hold( + ain: libm2k.M2kAnalogIn, + aout: libm2k.M2kAnalogOut, + trig: libm2k.M2kHardwareTrigger, + ctx: libm2k.M2k, + cfg, channel +): + def step_ramp_rising(aout_chn, trig_chn, buffer_ramp_up): + set_trig(trig, trig_chn, 8192, libm2k.RISING_EDGE_ANALOG, -cfg.get("trig_threshold")) + ain.startAcquisition(cfg.get("buffer_size")) + if aout_chn is None: + aout.push([buffer_ramp_up, buffer_ramp_up]) + else: + aout.push(aout_chn, buffer_ramp_up) + data = np.array(ain.getSamples(cfg.get("buffer_size"))) + # Flush values from previous buffer + ain.stopAcquisition() + return data + + def step_ramp_falling(aout_chn, trig_chn, buffer_ramp_down): + set_trig(trig, trig_chn, 8192, libm2k.FALLING_EDGE_ANALOG, cfg.get("trig_threshold")) + ain.startAcquisition(cfg.get("buffer_size")) + if aout_chn is None: + aout.push([buffer_ramp_down, buffer_ramp_down]) + else: + aout.push(aout_chn, buffer_ramp_down) + data = np.array(ain.getSamples(cfg.get("buffer_size"))) + # Flush values from previous buffer + ain.stopAcquisition() + return data + + def check_for_glitch(data, threshold=0.3): + # The glitch is unwanted and happened in between the last sample of the previous buffer and the first sample of the new buffer. + # NOTE: At DAC_SR <= 7.5 KHz we see oscilations due to the response of the HDL filter + glitch_found = False + for chn_samples in data: + if any(abs(left - right) >= threshold for left, right in zip(chn_samples, chn_samples[1:])): + glitch_found = True + return glitch_found + + file_name, dir_name, csv_path = get_result_files(gen_reports) + test_name = "sample_hold" + data_string = [] + + chn_str = "both_channels" if channel is None else f"CH{channel}" + sr_str = get_sample_rate_display_format(cfg.get("dac_sr")) + x_time, x_label = get_time_format(cfg.get("buffer_size"), cfg.get("adc_sr")) + + if gen_reports: + subdir_name = f"{dir_name}/last_sample_hold/{chn_str}" + os.makedirs(subdir_name, exist_ok=True) + + SLEEP = 0.15 + glitched = False + is_last_sample_hold_ok = True # Assume it is ok until proven otherwise + is_idle_ok = True + assert channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None], "Invalid channel ... None means use both channels" + trig_chn = libm2k.ANALOG_IN_CHANNEL_1 if channel is None else channel + + buffer_ramp_up = shape_gen(n=cfg["samples_per_period"], + amplitude=cfg["amplitude"], + offset=cfg["offset"])[Shape.RISING_RAMP.value] + buffer_ramp_down = shape_gen(n=cfg["samples_per_period"], + amplitude=cfg["amplitude"], + offset=cfg["offset"])[Shape.FALLING_RAMP.value] + + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True) + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True) + ain.setSampleRate(cfg.get("adc_sr")) + ain.setRange(0, libm2k.PLUS_MINUS_25V) + ain.setRange(1, libm2k.PLUS_MINUS_25V) + + aout.setSampleRate(0, cfg.get("dac_sr")) + aout.setSampleRate(1, cfg.get("dac_sr")) + aout.setKernelBuffersCount(0, 4) + aout.setKernelBuffersCount(1, 4) + aout.enableChannel(0, True) + aout.enableChannel(1, True) + aout.setCyclic(False) + + # Alternate between rising and falling ramps: rising, falling, rising, falling + # NOTE: we selected an arbitraty number of samples from both ends to validate sample hold and reset functionality + # 1: Rising + data = step_ramp_rising(channel, trig_chn, buffer_ramp_up) + if channel is None: + # Both channels should idle at 0V before push due to being reset + is_idle_ok = is_idle_ok and are_values_within_range(data[:, :2000], -0.20, 0.20, channel) + assert is_idle_ok, "STEP1: Both channels should idle low before push due to being reset" + elif channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP1: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP1: CH1 should idle at 0V if we are testing CH2" + # Shoud hold last sample from new buffer for current channel config + is_idle_ok = is_idle_ok and are_values_within_range(data[:, -2000:], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step1.png") + time.sleep(SLEEP) # wait for the DAC output to settle with last sample + # 2: Falling + data = step_ramp_falling(channel, trig_chn, buffer_ramp_down) + # Shoud start with last sample from previous buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + # Shoud hold last sample from new buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel) + if channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP2: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP2: CH1 should idle at 0V if we are testing CH2" + glitched = glitched or check_for_glitch(data) + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step2.png") + time.sleep(SLEEP) # wait for the DAC output to settle with last sample + # 3: Rising + data = step_ramp_rising(channel, trig_chn, buffer_ramp_up) + # Shoud start with last sample from previous buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel) + # Shoud hold last sample from new buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + if channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP3: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP3: CH1 should idle at 0V if we are testing CH2" + glitched = glitched or check_for_glitch(data) + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step3.png") + time.sleep(SLEEP) # wait for the DAC output to settle with last sample + # 4: Falling + data = step_ramp_falling(channel, trig_chn, buffer_ramp_down) + # Shoud start with last sample from previous buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + # Shoud hold last sample from new buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel) + if channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP4: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP4: CH1 should idle at 0V if we are testing CH2" + glitched = glitched or check_for_glitch(data) + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step4.png") + + aout.stop() + return glitched, is_last_sample_hold_ok, is_idle_ok + + +def test_aout_triggering( + ain: libm2k.M2kAnalogIn, + aout: libm2k.M2kAnalogOut, + dig: libm2k.M2kDigital, + trig: libm2k.M2kHardwareTrigger, + ctx: libm2k.M2k, + auto_rearm : bool, isCyclic : bool, status +): + def configure_trigger(trig: libm2k.M2kHardwareTrigger, + dig: libm2k.M2kDigital, + trig_pin, status, delay): + trig.setAnalogDelay(-delay) + trig.setDigitalDelay(-delay) + trig.setDigitalSource(libm2k.SRC_NONE) # DigitalIn conditioned by internal trigger structure + trig.setDigitalCondition(trig_pin, libm2k.RISING_EDGE_DIGITAL) + trig.setAnalogOutTriggerSource(libm2k.TRIGGER_LA) # aout conditioned by the LA trigger + trig.setAnalogOutTriggerStatus(status) + file_name, dir_name, csv_path = get_result_files(gen_reports) + test_name = "aout_triggering" + data_string = [] + + TRIG_PIN = libm2k.DIO_CHANNEL_0 + DELAY = 8_000 + BUFFER_SIZE = 16_000 + OVERSAMPLING = 1 + KB_COUNT = 40 + N_SAMPLES = 1024 + AMPLITUDE = 5 + OFFSET = 0 + TIMEOUT = 10_000 + + ADC_SR = 100_000_000 + DAC_SR = 75_000_000 + SR_IN_DIG = 100_000_000 + SR_OUT_DIG = 100_000_000 + + ctx.reset() + ctx.calibrateADC() + ctx.calibrateDAC() + ctx.setTimeout(TIMEOUT) + + ain.setSampleRate(ADC_SR) + ain.setOversamplingRatio(OVERSAMPLING) + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True) + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True) + ain.setRange(libm2k.ANALOG_IN_CHANNEL_1, -10, 10) + ain.setRange(libm2k.ANALOG_IN_CHANNEL_2, -10, 10) + assert ain.getSampleRate() == ADC_SR, "Failed to set the sample rate for AnalogIn" + + aout.setSampleRate(0, DAC_SR) + aout.setSampleRate(1, DAC_SR) + aout.enableChannel(0, True) + aout.enableChannel(1, True) + aout.setOversamplingRatio(0, 1) + aout.setOversamplingRatio(1, 1) + aout.setKernelBuffersCount(0, KB_COUNT) + aout.setKernelBuffersCount(1, KB_COUNT) + assert aout.getSampleRate(1) == DAC_SR, "Failed to set the sample rate for AnalogOut1" + + dig.setDirection(TRIG_PIN, libm2k.DIO_OUTPUT) + dig.setOutputMode(TRIG_PIN, libm2k.DIO_PUSHPULL) + dig.enableChannel(TRIG_PIN, True) + dig.setCyclic(False) + dig.setValueRaw(TRIG_PIN, libm2k.LOW) + dig.setSampleRateIn(SR_IN_DIG) + dig.setSampleRateOut(SR_OUT_DIG) + assert dig.getSampleRateIn() == SR_IN_DIG , "Failed to set the sample rate for DigitalIn" + assert dig.getSampleRateOut() == SR_OUT_DIG , "Failed to set the sample rate for DigitalOut" + + # LA trigger will determine an action for the aout based on the provided status + configure_trigger(trig, dig, TRIG_PIN, status, DELAY) + aout.setCyclic(isCyclic) + aout.setBufferRearmOnTrigger(auto_rearm) + + # Configure Aout Signal + buf = shape_gen(n=N_SAMPLES, amplitude=AMPLITUDE, offset=OFFSET)[Shape.FALLING_RAMP.value] + aout.push([buf, buf]) + + ctx.startMixedSignalAcquisition(BUFFER_SIZE) + + dig.setValueRaw(TRIG_PIN, libm2k.HIGH) # Trigger event -> should start the AOUT + analog_data = np.array(ain.getSamples(BUFFER_SIZE)) + digital_data = np.array(dig.getSamples(BUFFER_SIZE)) + mask = 0x0001 << TRIG_PIN + digital_data_chn = (digital_data & mask) >> TRIG_PIN + + ctx.stopMixedSignalAcquisition() + + # Validate test + peaks_CH0, _ = find_peaks(analog_data[0], prominence=1, height=1, distance = 100) + peaks_CH1, _ = find_peaks(analog_data[1], prominence=1, height=1, distance = 100) + + CH0_left = analog_data[0][:DELAY] + CH0_right = analog_data[0][DELAY:] + peaks_CH0_left, _ = find_peaks(CH0_left, prominence=1, height=1, distance = 100) + peaks_CH0_right, _ = find_peaks(CH0_right, prominence=1, height=1, distance = 100) + CH1_left = analog_data[1][:DELAY] + CH1_right = analog_data[1][DELAY:] + peaks_CH1_left, _ = find_peaks(CH1_left, prominence=1, height=1, distance = 100) + peaks_CH1_right, _ = find_peaks(CH1_right, prominence=1, height=1, distance = 100) + + status_str = "START" if status == libm2k.START else "STOP" + isCyclic_str = "Cyclic" if isCyclic else "Non-Cyclic" + rearm_str = "Ream" if auto_rearm else "No-Rearm" + data_string.append(f"Configuration: status={status_str} \t isCyclic={isCyclic_str} \t auto_rearm={rearm_str}") + data_string.append(f"\tPeaks before trigger: CH0={len(peaks_CH0_left)} CH1={len(peaks_CH1_left)}") + data_string.append(f"\tPeaks after trigger: CH0={len(peaks_CH0_right)} CH1={len(peaks_CH1_right)}") + + result = True + # NOTE: auto_rearm only has effect on START status + # Case 1, 2, 4 + if ((status == libm2k.START) and (not isCyclic) and (not auto_rearm)) or \ + ((status == libm2k.START) and (not isCyclic) and (auto_rearm)) or \ + ((status == libm2k.START) and (isCyclic) and (auto_rearm)): + # Should IDLE before trigger at 0V because the channel was reset + result = are_values_within_range(analog_data[:, :DELAY - 500], -0.2, 0.2) + # result = result and (len(peaks_CH0_left) == 0) and (len(peaks_CH1_left) == 0) + # Should output exactly 1 period after trigger + result = result and (len(peaks_CH0_right) == 1) and (len(peaks_CH1_right) == 1) + # Case 3 + if (status == libm2k.START) and (isCyclic) and (not auto_rearm): + # Should IDLE before trigger at 0V because the channel was reset + result = are_values_within_range(analog_data[:, :DELAY ], -0.2, 0.2) + # Should output multiple period after trigger + result = result and (len(peaks_CH0_right) > 1) and (len(peaks_CH1_right) > 1) + # Case 5 and 6 + if ((status == libm2k.STOP) and (not isCyclic) and (not auto_rearm)) or \ + ((status == libm2k.STOP) and (not isCyclic) and (auto_rearm)): + # The channels are in the last sample hold state and STOP is not available for non-cyclic buffers due to HDL limitations + # We expect both channels to hold last sample for the entire duration + result = result and are_values_within_range(analog_data, -AMPLITUDE * 1.2, -AMPLITUDE * 0.8) + result = result and (len(peaks_CH0_left) == 0) and (len(peaks_CH1_left) == 0) + result = result and (len(peaks_CH0_right) == 0) and (len(peaks_CH1_right) == 0) + # Case 7 and 8 + if ((status == libm2k.STOP) and (isCyclic) and (not auto_rearm)) or \ + ((status == libm2k.STOP) and (isCyclic) and (auto_rearm)): + # Should be generating cyclic signal before trigger + result = result and (len(peaks_CH0_left) > 1) and (len(peaks_CH1_left) > 1) + # Should stop generating signal after trigger + # TODO: might need aditional delay since the channel takes some time untill it stops from when the trigger event occurs + result = result and are_values_within_range(analog_data[:, -DELAY + 500:], -0.2, 0.2) + + if gen_reports: + write_file(file_name, test_name, "Both Channels", data_string) + filename_str = f"aout_triggering_{status_str}_{isCyclic_str}_{rearm_str}.png" + plot_to_file_multiline( + title="AOUT Triggering", + datasets=[ + (None, digital_data_chn, {"label":"Digital"}), + (None, analog_data[0], {"label" : "Analog CH0"}), + (peaks_CH0, analog_data[0], {"label" : "Peaks CH0", "marker" : "x"}), + (None, analog_data[1],{"label" : "Analog CH1"}), + (peaks_CH1, analog_data[1], {"label" : "Peaks CH1", "marker" : "x"}), + ], + dir_name=dir_name, + filename=filename_str, + ylim=(-6, 6), + ) + aout.stop() + return result \ No newline at end of file diff --git a/tests/bug_checks_functions.py b/tests/bug_checks_functions.py index 19e08ef4..1bf22180 100644 --- a/tests/bug_checks_functions.py +++ b/tests/bug_checks_functions.py @@ -2,6 +2,7 @@ import matplotlib.pyplot as plt import libm2k +from helpers import get_result_files, plot_to_file from open_context import ctx_timeout, ctx, ain, aout, trig gen_reports = True @@ -9,16 +10,8 @@ def test_dac_artifact(ain, aout, trig, chn): # check for DAC artifacts at slow sample rates - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] + file_name, dir_name, csv_path = get_result_files(gen_reports) + buffer_size = 4000 sampling_frequency_in = 1000000 sampling_frequency_out = 750000 @@ -63,33 +56,4 @@ def generate_clock_signal(): buffer.append(1) for i in range(256): buffer.append(0) - return buffer - - -def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None): - # Saves the plots in a separate folder - # Arguments: - # title -- Title of the plot - # data -- Data to be plotted - # filename -- Name of the file with the plot - # Keyword Arguments: - # xlabel -- Label of x-Axis (default: {None}) - # ylabel -- Label of y-Axis(default: {None}) - # data1 -- Data that should be plotted on the same plot(default: {None}) - # plot the signals in a separate folder - plt.title(title) - if xlabel is not None: # if xlabel and ylabel are not specified there will be default values - plt.xlabel(xlabel) - else: - plt.xlabel('Samples') - if ylabel is not None: - plt.ylabel(ylabel) - else: - plt.ylabel('Voltage [V]') - plt.grid(visible=True) - plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case) - if data1 is not None: - plt.plot(data1) - plt.savefig(dir_name + "/" + filename) - plt.close() - return + return buffer \ No newline at end of file diff --git a/tests/digital_functions.py b/tests/digital_functions.py index d1fcb81e..7e5e5ed4 100644 --- a/tests/digital_functions.py +++ b/tests/digital_functions.py @@ -3,48 +3,18 @@ import libm2k from multiprocessing.pool import ThreadPool import threading -from pandas import DataFrame import random import time from open_context import ctx +from helpers import get_result_files, save_data_to_csv, plot_to_file +import reset_def_values as reset + from scipy.signal import find_peaks sample = random.randint(0, 255) gen_reports = True -def result_files(gen_reports): - if gen_reports: - from create_files import results_file, results_dir, csv, open_files_and_dirs - if results_file is None: - file, dir_name, csv_path = open_files_and_dirs() - else: - file = results_file - dir_name = results_dir - csv_path = csv - else: - file = [] - dir_name = [] - csv_path = [] - - return file, dir_name, csv_path - - -def dig_reset(dig): - # Reset digital object - # Arguments: - # dig -- Digital object - - dig.setSampleRateIn(10000) - dig.setSampleRateOut(10000) - dig.setCyclic(True) - dig.setDirection(0b1111111111111111) - for i in range(16): - dig.setOutputMode(i, 1) - dig.enableAllOut(True) - return - - def set_digital_trigger(dig): # Set the digital trigger # Arguments: @@ -59,7 +29,6 @@ def set_digital_trigger(dig): def check_digital_channels_state(dig, channel): - dig.reset() # enable channel under test dig.setDirection(channel, libm2k.DIO_OUTPUT) @@ -142,8 +111,9 @@ def get_data_to_check_trig_condition(dig, channel, i, buff): def check_digital_trigger(channel, dig, d_trig): - file_name, dir_name, csv_path = result_files(gen_reports) - dig_reset(dig) + file_name, dir_name, csv_path = get_result_files(gen_reports) + + reset.digital(dig) delay = 1 condition = [libm2k.RISING_EDGE_DIGITAL, libm2k.FALLING_EDGE_DIGITAL, libm2k.LOW_LEVEL_DIGITAL, @@ -249,7 +219,8 @@ def task1(nb_samples, dig): def test_digital_cyclic_buffer(dig, d_trig, channel): - file, dir_name, csv_path = result_files(gen_reports) + file, dir_name, csv_path = get_result_files(gen_reports) + dig.setDirection(channel, libm2k.DIO_OUTPUT) dig.enableChannel(channel, True) d_trig.setDigitalCondition(channel, libm2k.LOW_LEVEL_DIGITAL) @@ -285,36 +256,6 @@ def test_digital_cyclic_buffer(dig, d_trig, channel): passed = False return passed - -def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None): - # Saves the plots in a separate folder - # Arguments: - # title -- Title of the plot\n - # data -- Data to be plotted\n - # filename -- Name of the file with the plot\n - # Keyword Arguments: - # xlabel -- Label of x-Axis (default: {None}) - # ylabel -- Label of y-Axis(default: {None}) - # data1 -- Data that should be plotted on the same plot(default: {None}) - - # plot the signals in a separate folder - plt.title(title) - if xlabel is not None: # if xlabel and ylabel are not specified there will be default values - plt.xlabel(xlabel) - else: - plt.xlabel('Samples') - if ylabel is not None: - plt.ylabel(ylabel) - else: - plt.ylabel('Voltage [V]') - plt.grid(visible=True) - plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case) - if data1 is not None: - plt.plot(data1) - plt.savefig(dir_name + "/" + filename) - plt.close() - return - def plot_to_file_all_channels(title, data, dir_name, filename, xlabel=None, ylabel=None): # Saves the plots in a separate folder # Arguments: @@ -344,11 +285,6 @@ def plot_to_file_all_channels(title, data, dir_name, filename, xlabel=None, ylab plt.close() return -def save_data_to_csv(csv_vals, csv_file): - df = DataFrame(csv_vals) - df.to_csv(csv_file) - return - def test_kernel_buffers(dig, nb_kernel_buffers): error = False dig.reset() @@ -405,7 +341,6 @@ def test_pattern_generator_pulse(dig, d_trig, channel): test_name = "pattern_generator_glitch" data_string = [] - file_name, dir_name, csv_path = result_files(gen_reports) dig.reset() ctx.setTimeout(timeout) diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 00000000..930f8652 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,135 @@ + +import numpy as np +from pandas import DataFrame +import matplotlib.pyplot as plt + + + +def get_result_files(gen_reports): + if gen_reports: + from create_files import results_file, results_dir, csv, open_files_and_dirs + if results_file is None: + file, dir_name, csv_path = open_files_and_dirs() + else: + file = results_file + dir_name = results_dir + csv_path = csv + else: + file = [] + dir_name = [] + csv_path = [] + + return file, dir_name, csv_path + + +def save_data_to_csv(csv_vals, csv_file): + df = DataFrame(csv_vals) + df.to_csv(csv_file) + return + + +def plot_to_file(title, data, dir_name, filename, xlabel=None, x_lim = None, ylabel=None, y_lim = None, data1=None, x_data = None, data_marked=None): + # Saves the plots in a separate folder + # Arguments: + # title -- Title of the plot + # data -- Data to be plotted + # filename -- Name of the file with the plot + # Keyword Arguments: + # xlabel -- Label of x-Axis (default: {None}) + # ylabel -- Label of y-Axis(default: {None}) + # data1 -- Data that should be plotted on the same plot(default: {None}) + # data_marked -- Data that represents specific points on the plot(default: {None}) + # plot the signals in a separate folder + plt.title(title) + # if xlabel and ylabel are not specified there will be default values + if xlabel is not None: + plt.xlabel(xlabel) + else: + plt.xlabel('Samples') + if ylabel is not None: + plt.ylabel(ylabel) + else: + plt.ylabel('Voltage [V]') + plt.grid(visible=True) + # if x_data is not None, the plot will be displayed with the specified x_data + if x_data is not None: + plt.plot(x_data, data) + else: + plt.plot(data) + # if a second set of data must be printed (for ch0 and ch1 phase difference in this case) + if data1 is not None: + if x_data is not None: + plt.plot(x_data, data1) + else: + plt.plot(data1) + # Optional configurations + if x_lim is not None: + plt.xlim(*x_lim) + if y_lim is not None: + plt.ylim(*y_lim) + if data_marked is not None: + plt.plot(data_marked, data[data_marked], 'xr') + plt.savefig(dir_name + "/" + filename) + plt.close() + return + +def plot_to_file_multiline( + title, + datasets, + dir_name, + filename, + xlabel="Samples", ylabel="Voltage [V]", + xlim = None, ylim = None, +): + plt.title(title) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + plt.grid(visible=True) + + for data in datasets: + xdata, ydata, fmt = data + if xdata is not None: + if "marker" in fmt: + # Mark scattered points + plt.plot(xdata, ydata[xdata], linestyle="None", **fmt) + else: + plt.plot(xdata, ydata, **fmt) + else: + plt.plot(ydata, **fmt) + + if xlim is not None: + plt.xlim(*xlim) + if ylim is not None: + plt.ylim(*ylim) + plt.legend() + + plt.savefig(f"{dir_name}/{filename}") + plt.close() + return + + +def get_time_format(samples, sample_rate): + x_time = np.linspace(0, samples/sample_rate, samples) + + if x_time[-1] < 1e-6: + x_time *= 1e9 + x_label = "Time [ns]" + elif x_time[-1] < 1e-3: + x_time *= 1e6 + x_label = "Time [us]" + elif x_time[-1] < 1: + x_time *= 1e3 + x_label = "Time [ms]" + else: + x_label = "Time [s]" + return x_time, x_label + + +def get_sample_rate_display_format(sample_rate): + if sample_rate < 1e3: + return f"{sample_rate:.2f} Hz" + if sample_rate < 1e6: + return f"{sample_rate/1e3:.2f} KHz" + if sample_rate < 1e9: + return f"{sample_rate/1e6:.2f} MHz" + return f"{sample_rate/1e9:.2f} GHz" diff --git a/tests/m2k_analog_test.py b/tests/m2k_analog_test.py index 79bb8203..5abfc5fa 100644 --- a/tests/m2k_analog_test.py +++ b/tests/m2k_analog_test.py @@ -1,20 +1,23 @@ +import itertools import sys import unittest import libm2k from shapefile import shape_gen, ref_shape_gen, shape_name -from analog_functions import test_amplitude, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \ +from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_aout_triggering, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \ test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \ test_calibration from analog_functions import compare_in_out_frequency, test_oversampling_ratio, channels_diff_in_samples, test_timeout, \ cyclic_buffer_test import reset_def_values as reset -from open_context import ctx, ain, aout, trig, create_dir +from open_context import ctx, ain, aout, dig, trig, create_dir from create_files import results_dir, csv, results_file import logger from repeat_test import repeat +from helpers import get_sample_rate_display_format + class A_AnalogTests(unittest.TestCase): # Class Where are defined all test methods for AnalogIn, AnalogOut, AnalogTrigger @@ -251,4 +254,43 @@ def test_buffer_transition_glitch(self): num_glitches = test_buffer_transition_glitch(channel, ain, aout, trig, waveform) with self.subTest(msg='Test buffer transition glitch: ' + waveform + ' on ch' + str(channel)): - self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel)) \ No newline at end of file + self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel)) + + @unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', + 'The sample and hold feature is available starting with firmware v0.33. Note: v0.32 had a glitch that is handled in this test.') + def test_last_sample_hold(self): + # Tests the last sample hold functionality for different channels and DAC sample rates. + # This test iterates over different channels (each channel individually and both channels together) + # and then tests the last sample hold functionality. When testing both channels together, 'None' + # is used to denote this case. + # It verifies that the last sample is held correctly and that there are no glitches in the output signal in between the last sample and a new push. + + for channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None]: + for dac_sr in [75_000_000, 7_500_000, 750_000, 75_000, 7_500]: + cfg = get_experiment_config_for_sample_hold(dac_sr) + sr_format = get_sample_rate_display_format(cfg.get("dac_sr")) + chn_str = "both_channels" if channel is None else f"CH{channel}" + reset.analog_in(ain) + reset.analog_out(aout) + reset.trigger(trig) + has_glitch, is_last_sample_hold_ok, is_idle_ok = test_last_sample_hold(ain, aout, trig, ctx, cfg, channel) + with self.subTest(msg='Test last sample hold on ' + str(chn_str) + ' with DAC SR ' + str(cfg.get("dac_sr"))): + self.assertEqual(has_glitch, False, f'Found glitches on {chn_str} with DAC SR {sr_format}') + self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}') + self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}') + self.assertEqual(is_idle_ok, True, 'Test idle condition failed') + + @unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', 'DAC triggering is available starting with firmware v0.33') + def test_aout_triggering(self): + # Test the triggering functionality of the M2kAnalogOut. + # The test looks for patterns before and after the trigger event for 8 different combinations. + autorearm = [False, True] + isCyclic = [False, True] + status = [libm2k.START, libm2k.STOP] + combinations = list(itertools.product(autorearm, isCyclic, status)) + for combination in combinations: + autorearm, isCyclic, status = combination + test_result = test_aout_triggering(ain, aout, dig, trig, ctx, autorearm, isCyclic, status) + status_str = "START" if status == libm2k.START else "STOP" + with self.subTest(msg=f'Test aout start with trigger for: status={status_str}, isCyclic={isCyclic}, autorearm={autorearm} '): + self.assertEqual(test_result, True, msg=f'Specification not met') diff --git a/tests/m2k_digital_test.py b/tests/m2k_digital_test.py index 7c24d2dc..553d06dd 100644 --- a/tests/m2k_digital_test.py +++ b/tests/m2k_digital_test.py @@ -1,8 +1,9 @@ import unittest import libm2k -from digital_functions import dig_reset, set_digital_trigger, check_digital_channels_state, check_digital_output, \ - check_digital_trigger, check_open_drain_mode, test_kernel_buffers, test_last_sample_hold, test_pattern_generator_pulse +from digital_functions import set_digital_trigger, check_digital_channels_state, check_digital_output, \ + check_digital_trigger, check_open_drain_mode, test_kernel_buffers, test_pattern_generator_pulse from digital_functions import test_digital_cyclic_buffer +import reset_def_values as reset from open_context import ctx, dig, d_trig import logger from repeat_test import repeat diff --git a/tests/main.py b/tests/main.py index fdf264c1..ef11c5a2 100644 --- a/tests/main.py +++ b/tests/main.py @@ -74,7 +74,9 @@ def wait_(): "test_shapes_ch0\n" "test_shapes_ch1\n" "test_voltmeter\n" - "test_buffer_transition_glitch\n") + "test_buffer_transition_glitch\n" + "test_last_sample_hold\n" + "test_aout_triggering\n") print("\n ===== class B_TriggerTests ===== \n") print(" ===== tests ====== \n") print("test_1_trigger_object\n" diff --git a/tests/reset_def_values.py b/tests/reset_def_values.py index 28ceadce..d0a22bb0 100644 --- a/tests/reset_def_values.py +++ b/tests/reset_def_values.py @@ -56,3 +56,17 @@ def trigger(trig): trig.setAnalogDelay(0) return + + +def digital(dig: libm2k.M2kDigital): + # Sets default values for digital parameters + # Arguments: + # dig -- Digital object + + dig.setSampleRateIn(10000) + dig.setSampleRateOut(10000) + dig.setCyclic(True) + dig.setDirection(0b1111111111111111) + for i in range(16): + dig.setOutputMode(i, 1) + dig.enableAllOut(True) diff --git a/tests/shapefile.py b/tests/shapefile.py index ce5c2949..12b4f561 100644 --- a/tests/shapefile.py +++ b/tests/shapefile.py @@ -1,3 +1,4 @@ +from enum import Enum import numpy as np import math @@ -6,7 +7,7 @@ # signals that will be sent to output buffer -def shape_gen(n): +def shape_gen(n, amplitude: float = 1.0, offset: float = 0.0): # Generates different signal shapes that will be sent to the output # Arguments: # n -- Number of samples in the output buffer @@ -15,18 +16,18 @@ def shape_gen(n): shape = [[]] # generate sine wave - sine = np.sin(np.linspace(-np.pi, np.pi, n)) + sine = amplitude*(np.sin(np.linspace(-np.pi, np.pi, n))) + offset # generate square wave - square = np.append(np.linspace(-1, -1, int(n / 2)), np.linspace(1, 1, int(n / 2))) + square = amplitude * np.append(np.linspace(-1, -1, int(n / 2)), np.linspace(1, 1, int(n / 2))) + offset # generate triangle - triangle = np.append(np.linspace(-1, 1, int(n / 2)), np.linspace(1, -1, int(n / 2))) + triangle = amplitude * np.append(np.linspace(-1, 1, int(n / 2)), np.linspace(1, -1, int(n / 2))) + offset # generate rising ramp - rising_ramp = np.linspace(-1, 1, n) + rising_ramp = amplitude * np.linspace(-1, 1, n) + offset # generate falling ramp - falling_ramp = np.linspace(1, -1, n) + falling_ramp = amplitude * np.linspace(1, -1, n) + offset # shape and reference shape buffers shape = [sine, square, triangle, rising_ramp, falling_ramp] @@ -58,3 +59,11 @@ def shape_name(): shape_name = ['Sine', 'Square', 'Triangle', 'Rising_ramp', 'Falling_ramp'] return shape_name + + +class Shape(Enum): + SINE = 0 + SQUARE = 1 + TRIANGLE = 2 + RISING_RAMP = 3 + FALLING_RAMP = 4 \ No newline at end of file