Skip to content

Commit

Permalink
Add SkipList Scope Guards.
Browse files Browse the repository at this point in the history
  • Loading branch information
aeoranday committed Jul 3, 2024
1 parent 700ad95 commit e9bb205
Showing 1 changed file with 66 additions and 64 deletions.
130 changes: 66 additions & 64 deletions src/TPCTPRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,70 +118,72 @@ TPCTPRequestHandler::send_tp_sets() {
RequestResult rres(ResultCode::kUnknown, dr);
std::vector<std::pair<void*, size_t>> frag_pieces;

// Get the newest TP
SkipListAcc acc(inherited2::m_latency_buffer->get_skip_list());
auto tail = acc.last();
auto head = acc.first();

newest_ts = (*tail).get_first_timestamp();
oldest_ts = (*head).get_first_timestamp();

if (first_cycle) {
start_win_ts = oldest_ts;
first_cycle = false;
}
if (newest_ts - start_win_ts > m_ts_set_sender_offset_ticks) {
// Get TPs in this window.
end_win_ts = newest_ts - m_ts_set_sender_offset_ticks;
frag_pieces = get_fragment_pieces(start_win_ts, end_win_ts, rres);
auto num_tps = frag_pieces.size();

// Consistent TPSet info.
trigger::TPSet tpset;
tpset.run_number = m_run_number;
tpset.origin = m_sourceid;
tpset.start_time = start_win_ts; // provisory timestamp, will be filled with first TP
tpset.end_time = end_win_ts; // provisory timestamp, will be filled with last TP
tpset.seqno = m_next_tpset_seqno++; // NOLINT(runtime/increment_decrement)

// reserve the space for efficiency
if (num_tps > 0) {
tpset.type = trigger::TPSet::Type::kPayload;
tpset.objects.reserve(frag_pieces.size());
bool first_tp = true;
for (auto f : frag_pieces) {
trgdataformats::TriggerPrimitive tp = *(static_cast<trgdataformats::TriggerPrimitive*>(f.first));
if (first_tp) {
tpset.start_time = tp.time_start;
first_tp = false;
}
tpset.end_time = tp.time_start;
tpset.objects.emplace_back(std::move(tp));
}
}

// TPSet Heartbeats = Empty TPSet.
if (num_tps == 0) {
tpset.type = trigger::TPSet::Type::kHeartbeat;
m_new_heartbeats++;
}

m_cutoff_timestamp.store(tpset.end_time);
if(!m_tpset_sink->try_send(std::move(tpset), iomanager::Sender::s_no_block)) {
ers::warning(FailedToSendTPSet(ERS_HERE, start_win_ts, end_win_ts, m_run_number));
m_new_tps_in_tpsets_send_failed += num_tps;
++m_new_tpsets_send_failed;
}
else {
m_new_tps += num_tps;
++m_new_tpsets;
}

//remember what we sent for the next loop
start_win_ts = end_win_ts;

} // endif (newest_ts - start_ts > m_ts_set_sender_offset_ticks)
} // endif LB occupancy != 0
{ // SkipList Scope
// Get the newest TP
SkipListAcc acc(inherited2::m_latency_buffer->get_skip_list());
auto tail = acc.last();
auto head = acc.first();

newest_ts = (*tail).get_first_timestamp();
oldest_ts = (*head).get_first_timestamp();

if (first_cycle) {
start_win_ts = oldest_ts;
first_cycle = false;
}
if (newest_ts - start_win_ts > m_ts_set_sender_offset_ticks) {
// Get TPs in this window.
end_win_ts = newest_ts - m_ts_set_sender_offset_ticks;
frag_pieces = get_fragment_pieces(start_win_ts, end_win_ts, rres);
auto num_tps = frag_pieces.size();

// Consistent TPSet info.
trigger::TPSet tpset;
tpset.run_number = m_run_number;
tpset.origin = m_sourceid;
tpset.start_time = start_win_ts; // provisory timestamp, will be filled with first TP
tpset.end_time = end_win_ts; // provisory timestamp, will be filled with last TP
tpset.seqno = m_next_tpset_seqno++; // NOLINT(runtime/increment_decrement)

// reserve the space for efficiency
if (num_tps > 0) {
tpset.type = trigger::TPSet::Type::kPayload;
tpset.objects.reserve(frag_pieces.size());
bool first_tp = true;
for (auto f : frag_pieces) {
trgdataformats::TriggerPrimitive tp = *(static_cast<trgdataformats::TriggerPrimitive*>(f.first));
if (first_tp) {
tpset.start_time = tp.time_start;
first_tp = false;
}
tpset.end_time = tp.time_start;
tpset.objects.emplace_back(std::move(tp));
}
}

// TPSet Heartbeats = Empty TPSet.
if (num_tps == 0) {
tpset.type = trigger::TPSet::Type::kHeartbeat;
m_new_heartbeats++;
}

m_cutoff_timestamp.store(tpset.end_time);
if(!m_tpset_sink->try_send(std::move(tpset), iomanager::Sender::s_no_block)) {
ers::warning(FailedToSendTPSet(ERS_HERE, start_win_ts, end_win_ts, m_run_number));
m_new_tps_in_tpsets_send_failed += num_tps;
++m_new_tpsets_send_failed;
}
else {
m_new_tps += num_tps;
++m_new_tpsets;
}

//remember what we sent for the next loop
start_win_ts = end_win_ts;

} // endif (newest_ts - start_ts > m_ts_set_sender_offset_ticks)
} // End SkipList Scope
} // endif LB occupancy != 0

{
std::lock_guard<std::mutex> lock(m_cv_mutex);
Expand Down

0 comments on commit e9bb205

Please sign in to comment.