diff --git a/CMakeLists.txt b/CMakeLists.txt index 0910269..a1ac4ad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(ctbmodules VERSION 2.1.0) +project(ctbmodules VERSION 3.0.0) find_package(daq-cmake REQUIRED) diff --git a/plugins/CTBModule.cpp b/plugins/CTBModule.cpp index 8431a34..3b6d1ca 100644 --- a/plugins/CTBModule.cpp +++ b/plugins/CTBModule.cpp @@ -10,7 +10,6 @@ #include "CTBModule.hpp" #include "CTBModuleIssues.hpp" -#include "appfwk/DAQModuleHelper.hpp" #include "iomanager/IOManager.hpp" #include "logging/Logging.hpp" #include "rcif/cmd/Nljs.hpp" @@ -33,9 +32,13 @@ namespace ctbmodules { CTBModule::CTBModule(const std::string& name) : hsilibs::HSIEventSender(name) , m_is_running(false) + , m_stop_requested(false) , m_is_configured(false) - , m_n_TS_words(0) , m_error_state(false) + , m_total_hlt_counter(0) + , m_ts_word_counter(0) + , m_hlt_trigger_counter() + , m_llt_trigger_counter() , m_control_ios() , m_receiver_ios() , m_control_socket(m_control_ios) @@ -65,12 +68,38 @@ CTBModule::~CTBModule(){ } void -CTBModule::init(const nlohmann::json& init_data) +CTBModule::init(std::shared_ptr mcfg) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method"; - HSIEventSender::init(init_data); - m_llt_hsi_data_sender = get_iom_sender(appfwk::connection_uid(init_data, "llt_output")); - m_hlt_hsi_data_sender = get_iom_sender(appfwk::connection_uid(init_data, "hlt_output")); + HSIEventSender::init(mcfg); + + m_cfg = mcfg->module(get_name()); + + if (!m_cfg) { + throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object"); + } + + m_llt_hsi_data_sender.reset(); + m_hlt_hsi_data_sender.reset(); + + for (auto con : m_cfg->get_outputs()) { + if (con->get_data_type() == datatype_to_string()) { + if (con->UID().find("llt") != std::string::npos) { + m_llt_hsi_data_sender = get_iom_sender(con->UID()); + } + if (con->UID().find("hlt") != std::string::npos) { + m_hlt_hsi_data_sender = get_iom_sender(con->UID()); + } + } + } + + if ( ! m_llt_hsi_data_sender ) { + // throw that the sender is not onfigured + } + + if ( ! m_hlt_hsi_data_sender ) { + // throw that the sender is not onfigured + } TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; } @@ -90,6 +119,28 @@ CTBModule::do_configure(const data_t& args) // Initialise monitoring variables m_num_control_messages_sent = 0; m_num_control_responses_received = 0; + m_ts_word_counter = 0; + + std::map id_to_idx; + for(size_t i = 0; i < m_hlt_range; i++) id_to_idx["HLT_" + std::to_string(i)] = i; + for(size_t i = 0; i < m_llt_range; i++) id_to_idx["LLT_" + std::to_string(i)] = i; + + nlohmann::json random_triggers = m_cfg.board_config.ctb.misc; + + // HLTs + // 0th HLT is random trigger that's not in HLT array + if (random_triggers["randomtrigger_1"]["enable"]) m_hlt_trigger_counter[0] = 0; + nlohmann::json trigger_array = m_cfg.board_config.ctb.HLT.trigger; + for (const auto& trigger : trigger_array) { if (trigger["enable"]) m_hlt_trigger_counter[id_to_idx[trigger["id"]]] = 0; } + + // LLTs: Beam and CRT + // 0th LLT is random trigger that's not in HLT array + if (random_triggers["randomtrigger_2"]["enable"]) m_llt_trigger_counter[0] = 0; + trigger_array = m_cfg.board_config.ctb.subsystems.crt.triggers; + for (const auto& trigger : trigger_array) { if (trigger["enable"]) m_llt_trigger_counter[id_to_idx[trigger["id"]]] = 0; } + + trigger_array = m_cfg.board_config.ctb.subsystems.beam.triggers; + for (const auto& trigger : trigger_array) { if (trigger["enable"]) m_llt_trigger_counter[id_to_idx[trigger["id"]]] = 0; } // network connection to ctb hardware control boost::asio::ip::tcp::resolver resolver( m_control_ios ); @@ -128,9 +179,14 @@ CTBModule::do_start(const nlohmann::json& startobj) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; + // Set this to false early so it doesn't interfere with the start + m_stop_requested.store(false); + auto start_params = startobj.get(); m_run_number.store(start_params.run); + m_total_hlt_counter.store(0); + TLOG_DEBUG(0) << get_name() << ": Sending start of run command"; m_thread_.start_working_thread(); @@ -140,8 +196,6 @@ CTBModule::do_start(const nlohmann::json& startobj) SetCalibrationStream(run.str()) ; } - - if ( send_message( "{\"command\":\"StartRun\"}" ) ) { m_is_running.store(true); TLOG_DEBUG(1) << get_name() << ": successfully started"; @@ -160,10 +214,14 @@ CTBModule::do_stop(const nlohmann::json& /*stopobj*/) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; TLOG_DEBUG(0) << get_name() << ": Sending stop run command" << std::endl; + + // Give the do_work thread a chance to stop before stopping the CTB, + // otherwise we end up reading from an empty buffer + m_stop_requested.store(true); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); if(send_message( "{\"command\":\"StopRun\"}" ) ){ TLOG_DEBUG(1) << get_name() << ": successfully stopped"; - m_is_running.store( false ) ; } else{ @@ -198,7 +256,7 @@ CTBModule::do_hsi_work(std::atomic& running_flag) std::future accepting = async( std::launch::async, [&]{ acceptor.accept(m_receiver_socket) ; } ) ; - while ( running_flag.load() ) { + while ( running_flag.load() && !m_stop_requested.load() ) { if ( accepting.wait_for( m_timeout ) == std::future_status::ready ){ break ; } @@ -216,7 +274,7 @@ CTBModule::do_hsi_work(std::atomic& running_flag) uint64_t prev_timestamp = 0; std::pair prev_channel, prev_prev_channel, prev_llt, prev_prev_llt; // pair - while (running_flag.load()) { + while (running_flag.load() && !m_stop_requested.load()) { update_calibration_file(); @@ -234,6 +292,11 @@ CTBModule::do_hsi_work(std::atomic& running_flag) update_buffer_counts(n_words); for ( unsigned int i = 0 ; i < n_words ; ++i ) { + + if (!running_flag.load() || m_stop_requested.load()) { + break; + } + //read a word if ( ! read( temp_word ) ) { connection_closed = true ; @@ -247,7 +310,7 @@ CTBModule::do_hsi_work(std::atomic& running_flag) //check if it is a TS word and increment the counter if ( IsTSWord( temp_word ) ) { - m_n_TS_words++ ; + ++m_ts_word_counter; TLOG_DEBUG(9) << "Received timestamp word! TS: "+temp_word.timestamp; prev_timestamp = temp_word.timestamp; } @@ -272,7 +335,8 @@ CTBModule::do_hsi_work(std::atomic& running_flag) m_last_readout_hlt_timestamp = hlt_word->timestamp; // Now find the associated LLT - llt_payload = MatchTriggerInput( hlt_word->timestamp, prev_llt, prev_prev_llt, true ); + // if HLT0, skip matching + llt_payload = MatchTriggerInput( hlt_word, prev_llt, prev_prev_llt, true ); // Send HSI data to a DLH std::array hsi_struct; @@ -300,6 +364,10 @@ CTBModule::do_hsi_work(std::atomic& running_flag) // TODO properly fill device id dfmessages::HSIEvent event = dfmessages::HSIEvent(0x1, hlt_word->trigger_word, hlt_word->timestamp, m_run_HLT_counter, m_run_number); send_hsi_event(event); + + // Count the total HLTs and each specific one + ++m_total_hlt_counter; + for (auto &hlt : m_hlt_trigger_counter) { if( (hlt_word->trigger_word >> hlt.first) & 0x1 ) ++hlt.second; } } else if (temp_word.word_type == content::word::t_lt) { @@ -308,7 +376,7 @@ CTBModule::do_hsi_work(std::atomic& running_flag) content::word::trigger_t * llt_word = reinterpret_cast( & temp_word ) ; // Find the matching channel status word - channel_payload = MatchTriggerInput( llt_word->timestamp, prev_channel, prev_prev_channel, false ); + channel_payload = MatchTriggerInput( llt_word, prev_channel, prev_prev_channel, false ); // Send HSI data to a DLH std::array hsi_struct; @@ -336,6 +404,8 @@ CTBModule::do_hsi_work(std::atomic& running_flag) // store the previous 2 LLTs so we can match to the HLT prev_prev_llt = prev_llt; prev_llt = { llt_word->timestamp, (llt_word->trigger_word & 0xFFFFFFFF) }; + + for (auto &llt : m_llt_trigger_counter) { if( (llt_word->trigger_word >> llt.first) & 0x1 ) ++llt.second; } } else if (temp_word.word_type == content::word::t_ch) { @@ -361,6 +431,11 @@ CTBModule::do_hsi_work(std::atomic& running_flag) } } + // Make sure CTB run stops before closing socket + while ( m_is_running.load() ) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + boost::system::error_code closing_error; if ( m_error_state.load() ) { @@ -416,11 +491,13 @@ bool CTBModule::read( T &obj) { return true ; } -uint64_t CTBModule::MatchTriggerInput( const uint64_t trigger_ts, const std::pair &prev_input, const std::pair &prev_prev_input, bool hlt_matching) noexcept { +uint64_t CTBModule::MatchTriggerInput(const content::word::trigger_t * trigger, const std::pair &prev_input, const std::pair &prev_prev_input, bool hlt_matching) noexcept { // The first condition should be true the majority of the time and the "else" should never happen. // Find the matching word whcih caused the LLT or HLT and return its payload - + uint64_t trigger_ts = trigger->timestamp; + uint64_t trigger_word = trigger->trigger_word; + if ( trigger_ts == prev_input.first + 1 ) { return prev_input.second; } @@ -430,7 +507,11 @@ uint64_t CTBModule::MatchTriggerInput( const uint64_t trigger_ts, const std::pai else { std::stringstream msg; if ( hlt_matching ) { - msg << "No LLT match found for HLT TS " << trigger_ts << " (LLT TS prev=" + if (trigger_word == 0x1 || trigger_word == (0x1<<16)) { + // we don't care if fake trigger (HLT0) or the pulse train (HLT 16) have no matching LLTs + return 0; + } + msg << "No LLT m/atch found for HLT TS " << trigger_ts << " (LLT TS prev=" << prev_input.first << " prev_prev=" << prev_prev_input.first << ")"; } else { @@ -603,7 +684,7 @@ bool CTBModule::send_message( const std::string & msg ) { ers::warning(CTBMessage(ERS_HERE, messages[i]["message"].dump())); } else if ( type.find("info") != std::string::npos || type.find("Info") != std::string::npos || type.find("INFO") != std::string::npos) { - ers::info(CTBMessage(ERS_HERE, messages[i]["message"].dump())); + TLOG() << "Message from the board: " << messages[i]["message"].dump(); } else { std::stringstream blob; @@ -654,14 +735,31 @@ void CTBModule::get_info(opmonlib::InfoCollector& ci, int /*level*/) module_info.num_control_responses_received = m_num_control_responses_received.load(); module_info.ctb_hardware_run_status = m_is_running; module_info.ctb_hardware_configuration_status = m_is_configured; - module_info.num_ts_words_received = m_n_TS_words; module_info.last_readout_timestamp = m_last_readout_hlt_timestamp.load(); - module_info.sent_hsi_events_counter = m_sent_counter.load(); module_info.failed_to_send_hsi_events_counter = m_failed_to_send_counter.load(); module_info.last_sent_timestamp = m_last_sent_timestamp.load(); module_info.average_buffer_occupancy = read_average_buffer_counts(); + module_info.total_hlt_count = m_total_hlt_counter.load(); + module_info.ts_word_count = m_ts_word_counter.exchange(0); + + for (auto &hlt : m_hlt_trigger_counter) { + opmonlib::InfoCollector tmp_ic; + dunedaq::ctbmodules::ctbmoduleinfo::LevelTriggerInfo ti; + ti.count = hlt.second.exchange(0); + tmp_ic.add(ti); + ci.add("hlt_" + std::to_string(hlt.first), tmp_ic); + } + + for (auto &llt : m_llt_trigger_counter) { + opmonlib::InfoCollector tmp_ic; + dunedaq::ctbmodules::ctbmoduleinfo::LevelTriggerInfo ti; + ti.count = llt.second.exchange(0); + tmp_ic.add(ti); + ci.add("llt_" + std::to_string(llt.first), tmp_ic); + } + ci.add(module_info); } diff --git a/plugins/CTBModule.hpp b/plugins/CTBModule.hpp index 23989c6..2f24ae4 100644 --- a/plugins/CTBModule.hpp +++ b/plugins/CTBModule.hpp @@ -23,14 +23,14 @@ #include "CTBPacketContent.hpp" -#include "ctbmodules/ctbmodule/Nljs.hpp" -#include "ctbmodules/ctbmoduleinfo/InfoNljs.hpp" +#include "appmodel/CTBModule.hpp" #include #include #include #include #include +#include #include #include @@ -56,9 +56,9 @@ class CTBModule : public dunedaq::hsilibs::HSIEventSender CTBModule(CTBModule&&) = delete; ///< CTBModule is not move-constructible CTBModule& operator=(CTBModule&&) = delete; ///< CTBModule is not move-assignable - void init(const nlohmann::json& iniobj) override; + void init(std::shared_ptr mcfg) override; - static uint64_t MatchTriggerInput(const uint64_t trigger_ts, const std::pair &prev_input, const std::pair &prev_prev_input, bool hlt_matching) noexcept; + static uint64_t MatchTriggerInput(const content::word::trigger_t * trigger, const std::pair &prev_input, const std::pair &prev_prev_input, bool hlt_matching) noexcept; static bool IsTSWord( const content::word::word_t &w ) noexcept; static bool IsFeedbackWord( const content::word::word_t &w ) noexcept; bool ErrorState() const { return m_error_state.load() ; } @@ -67,9 +67,10 @@ class CTBModule : public dunedaq::hsilibs::HSIEventSender private: - // control variables + // control and monitoring variables std::atomic m_is_running; + std::atomic m_stop_requested; std::atomic m_is_configured; /*const */unsigned int m_receiver_port; @@ -77,6 +78,14 @@ class CTBModule : public dunedaq::hsilibs::HSIEventSender std::atomic m_n_TS_words; std::atomic m_error_state; + std::atomic m_total_hlt_counter; + std::atomic m_ts_word_counter; + + size_t m_hlt_range = 20; + size_t m_llt_range = 25; + std::map> m_hlt_trigger_counter; + std::map> m_llt_trigger_counter; + boost::asio::io_service m_control_ios; boost::asio::io_service m_receiver_ios; boost::asio::ip::tcp::socket m_control_socket; @@ -98,7 +107,7 @@ class CTBModule : public dunedaq::hsilibs::HSIEventSender bool send_message(const std::string & msg); // Configuration - dunedaq::ctbmodules::ctbmodule::Conf m_cfg; + const appmodel::CTBModule * m_cfg = nullptr; std::atomic m_run_number; // Threading diff --git a/python/ctbmodules/apps/ctb_hsi_gen.py b/python/ctbmodules/apps/ctb_hsi_gen.py index bcc8012..b660dbe 100644 --- a/python/ctbmodules/apps/ctb_hsi_gen.py +++ b/python/ctbmodules/apps/ctb_hsi_gen.py @@ -157,7 +157,7 @@ def get_ctb_hsi_app( mgraph.add_endpoint(f"timesync_ctb_llt", f"ctb_llt_datahandler.timesync_output", "TimeSync", Direction.OUT, is_pubsub=True, toposort=False) mgraph.add_endpoint(f"timesync_ctb_hlt", f"ctb_hlt_datahandler.timesync_output", "TimeSync", Direction.OUT, is_pubsub=True, toposort=False) - mgraph.add_endpoint("hsievents", f"{nickname}.hsievents", "HSIEvent", Direction.OUT) + mgraph.add_endpoint("ctb_hsievents", f"{nickname}.hsievents", "HSIEvent", Direction.OUT) # dummy subscriber mgraph.add_endpoint(None, None, data_type="TimeSync", inout=Direction.IN, is_pubsub=True) diff --git a/schema/ctbmodules/ctbmodule.jsonnet b/schema/ctbmodules/ctbmodule.jsonnet index 96794ed..5ab8f14 100644 --- a/schema/ctbmodules/ctbmodule.jsonnet +++ b/schema/ctbmodules/ctbmodule.jsonnet @@ -65,7 +65,7 @@ local ctbmodule = { ], doc="Central Trigger Board Pulser Configuration"), timing: s.record("Timing", [ - s.field("address", self.string, "0xF0"), + s.field("address", self.string, "0xF"), s.field("group", self.string, "0x0"), s.field("triggers", self.boolean, true), s.field("lockout", self.string, "0x10"), diff --git a/schema/ctbmodules/ctbmoduleinfo.jsonnet b/schema/ctbmodules/ctbmoduleinfo.jsonnet index f6a38fa..ef84f78 100644 --- a/schema/ctbmodules/ctbmoduleinfo.jsonnet +++ b/schema/ctbmodules/ctbmoduleinfo.jsonnet @@ -6,30 +6,30 @@ local moo = import "moo.jsonnet"; local s = moo.oschema.schema("dunedaq.ctbmodules.ctbmoduleinfo"); local info = { - uint8 : s.number("uint8", "u8", - doc="An unsigned of 8 bytes"), - float8 : s.number("float8", "f8", - doc="A float of 8 bytes"), + uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"), + float8 : s.number("float8", "f8", doc="A float of 8 bytes"), choice : s.boolean("Choice"), - string : s.string("String", moo.re.ident, - doc="A string field"), - double_val: s.number("DoubleValue", "f8", - doc="A double"), + string : s.string("String", moo.re.ident, doc="A string field"), + double_val: s.number("DoubleValue", "f8", doc="A double"), info: s.record("CTBModuleInfo", [ - s.field("num_control_messages_sent", self.uint8, 0, doc="Number of control messages sent to CTB"), - s.field("num_control_responses_received", self.uint8, 0, doc="Number of control message responses received from CTB"), - s.field("ctb_hardware_run_status", self.choice, 0, doc="Run status of CTB hardware itself"), - s.field("ctb_hardware_configuration_status", self.choice, 0, doc="Configuration status of CTB hardware itself"), - s.field("num_ts_words_received", self.uint8, 0, doc="Number of ts words received from CTB"), + s.field("num_control_messages_sent", self.uint8, 0, doc="Number of control messages sent to CTB"), + s.field("num_control_responses_received", self.uint8, 0, doc="Number of control message responses received from CTB"), + s.field("ctb_hardware_run_status", self.choice, 0, doc="Run status of CTB hardware itself"), + s.field("ctb_hardware_configuration_status", self.choice, 0, doc="Configuration status of CTB hardware itself"), + s.field("sent_hsi_events_counter", self.uint8, 0, doc="Number of sent HSIEvents so far"), + s.field("failed_to_send_hsi_events_counter", self.uint8, 0, doc="Number of failed send attempts so far"), + s.field("last_sent_timestamp", self.uint8, 0, doc="Timestamp of the last sent HSIEvent"), + s.field("last_readout_timestamp", self.uint8, 0, doc="Timestamp of the last read HLT word"), + s.field("average_buffer_occupancy", self.double_val, 0, doc="Average (word) occupancy of buffer in CTB firmware."), + s.field("total_hlt_count", self.uint8, 0, doc="Total HLT count for a run."), + s.field("ts_word_count", self.uint8, 0, doc="Timestamp word count. Fixed frequency heartbeat."), + ], doc="Central Trigger Board Module Information"), - s.field("sent_hsi_events_counter", self.uint8, doc="Number of sent HSIEvents so far"), - s.field("failed_to_send_hsi_events_counter", self.uint8, doc="Number of failed send attempts so far"), - s.field("last_sent_timestamp", self.uint8, doc="Timestamp of the last sent HSIEvent"), - s.field("last_readout_timestamp", self.uint8, doc="Timestamp of the last read HLT word"), - s.field("average_buffer_occupancy", self.double_val, doc="Average (word) occupancy of buffer in CTB firmware."), + trigger: s.record("LevelTriggerInfo", [ + s.field("count", self.uint8, 0, doc="Count for a single level trigger"), + ], doc="Level Trigger information") - ], doc="Central Trigger Board Module Information"), }; -moo.oschema.sort_select(info) \ No newline at end of file +moo.oschema.sort_select(info)