Skip to content

Commit

Permalink
Improve Readability.
Browse files Browse the repository at this point in the history
Removed cases of awkward spacing and clarifying some of the many end brackets.
  • Loading branch information
aeoranday committed Jul 3, 2024
1 parent 26a010f commit 700ad95
Showing 1 changed file with 81 additions and 80 deletions.
161 changes: 81 additions & 80 deletions src/TPCTPRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,98 +98,99 @@ TPCTPRequestHandler::report_tardy_packet(const types::TriggerPrimitiveTypeAdapte

void
TPCTPRequestHandler::send_tp_sets() {
timestamp_t oldest_ts=0;
timestamp_t newest_ts=0;
timestamp_t start_win_ts=0;
timestamp_t end_win_ts=0;
bool first_cycle = true;
dunedaq::dfmessages::DataRequest dr;

while (m_run_marker.load()) {
{
timestamp_t oldest_ts = 0;
timestamp_t newest_ts = 0;
timestamp_t start_win_ts = 0;
timestamp_t end_win_ts = 0;
bool first_cycle = true;
dunedaq::dfmessages::DataRequest dr;

while (m_run_marker.load()) {
{
std::unique_lock<std::mutex> lock(m_cv_mutex);
m_cv.wait(lock, [&] { return !m_cleanup_requested; });
m_requests_running++;
}

m_cv.notify_all();
if(m_latency_buffer->occupancy() != 0) {
// Prepare response
RequestResult rres(ResultCode::kUnknown, dr);
std::vector<std::pair<void*, size_t>> frag_pieces;

// Get the newest TP
SkipListAcc acc(inherited2::m_latency_buffer->get_skip_list());
auto tail = acc.last();
auto head = acc.first();
newest_ts = (*tail).get_first_timestamp();
oldest_ts = (*head).get_first_timestamp();

if (first_cycle) {
start_win_ts = oldest_ts;
first_cycle = false;
}
if (newest_ts - start_win_ts > m_ts_set_sender_offset_ticks) {
end_win_ts = newest_ts - m_ts_set_sender_offset_ticks;
frag_pieces = get_fragment_pieces(start_win_ts, end_win_ts, rres);
auto num_tps = frag_pieces.size();
/*
if (num_tps == 0) {
std::stringstream s;
s << "No TPs in time interval " << start_win_ts << " " << end_win_ts;
ers::info(TPHandlerMsg(ERS_HERE, s.str()));
}
else {
*/

trigger::TPSet tpset;
tpset.run_number = m_run_number;
tpset.type = num_tps>0 ? trigger::TPSet::Type::kPayload : trigger::TPSet::Type::kHeartbeat;
tpset.origin = m_sourceid;
tpset.start_time = start_win_ts; // provisory timestamp, will be filled with first TP
tpset.end_time = end_win_ts; // provisory timestamp, will be filled with last TP
tpset.seqno = m_next_tpset_seqno++; // NOLINT(runtime/increment_decrement)
// reserve the space for efficiency
if (num_tps > 0) {
tpset.objects.reserve(frag_pieces.size());
bool first_tp = true;
for( auto f : frag_pieces) {
trgdataformats::TriggerPrimitive tp = *(static_cast<trgdataformats::TriggerPrimitive*>(f.first));
if(first_tp) {
tpset.start_time = tp.time_start;
first_tp = false;
}
tpset.end_time = tp.time_start;
tpset.objects.emplace_back(std::move(tp));
// Prepare response
RequestResult rres(ResultCode::kUnknown, dr);
std::vector<std::pair<void*, size_t>> frag_pieces;

// Get the newest TP
SkipListAcc acc(inherited2::m_latency_buffer->get_skip_list());
auto tail = acc.last();
auto head = acc.first();

newest_ts = (*tail).get_first_timestamp();
oldest_ts = (*head).get_first_timestamp();

if (first_cycle) {
start_win_ts = oldest_ts;
first_cycle = false;
}
if (newest_ts - start_win_ts > m_ts_set_sender_offset_ticks) {
// Get TPs in this window.
end_win_ts = newest_ts - m_ts_set_sender_offset_ticks;
frag_pieces = get_fragment_pieces(start_win_ts, end_win_ts, rres);
auto num_tps = frag_pieces.size();

// Consistent TPSet info.
trigger::TPSet tpset;
tpset.run_number = m_run_number;
tpset.origin = m_sourceid;
tpset.start_time = start_win_ts; // provisory timestamp, will be filled with first TP
tpset.end_time = end_win_ts; // provisory timestamp, will be filled with last TP
tpset.seqno = m_next_tpset_seqno++; // NOLINT(runtime/increment_decrement)

// reserve the space for efficiency
if (num_tps > 0) {
tpset.type = trigger::TPSet::Type::kPayload;
tpset.objects.reserve(frag_pieces.size());
bool first_tp = true;
for (auto f : frag_pieces) {
trgdataformats::TriggerPrimitive tp = *(static_cast<trgdataformats::TriggerPrimitive*>(f.first));
if (first_tp) {
tpset.start_time = tp.time_start;
first_tp = false;
}
}
m_cutoff_timestamp.store(tpset.end_time);
if(!m_tpset_sink->try_send(std::move(tpset), iomanager::Sender::s_no_block)) {
ers::warning(FailedToSendTPSet(ERS_HERE, start_win_ts, end_win_ts, m_run_number));
m_new_tps_in_tpsets_send_failed += num_tps;
++m_new_tpsets_send_failed;
}
else {
m_new_tps += num_tps;
++m_new_tpsets;
}

if (num_tps == 0) {
m_new_heartbeats++;
}
//remember what we sent for the next loop
start_win_ts = end_win_ts;

}
}
tpset.end_time = tp.time_start;
tpset.objects.emplace_back(std::move(tp));
}
}

// TPSet Heartbeats = Empty TPSet.
if (num_tps == 0) {
tpset.type = trigger::TPSet::Type::kHeartbeat;
m_new_heartbeats++;
}

m_cutoff_timestamp.store(tpset.end_time);
if(!m_tpset_sink->try_send(std::move(tpset), iomanager::Sender::s_no_block)) {
ers::warning(FailedToSendTPSet(ERS_HERE, start_win_ts, end_win_ts, m_run_number));
m_new_tps_in_tpsets_send_failed += num_tps;
++m_new_tpsets_send_failed;
}
else {
m_new_tps += num_tps;
++m_new_tpsets;
}

//remember what we sent for the next loop
start_win_ts = end_win_ts;

} // endif (newest_ts - start_ts > m_ts_set_sender_offset_ticks)
} // endif LB occupancy != 0

{
std::lock_guard<std::mutex> lock(m_cv_mutex);
m_requests_running--;
}
m_cv.notify_all();
m_cv.notify_all();
std::this_thread::sleep_for(std::chrono::microseconds(m_tp_set_sender_sleep_us));
}
return;
}
return;
}

} // namespace fdreadoutlibs
Expand Down

0 comments on commit 700ad95

Please sign in to comment.