From 642289bcae65ca3067232991dcf7e795d5fa3ffd Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 23 Jul 2024 21:56:00 +0200 Subject: [PATCH] DPL: provide ability to customise consumption order --- Framework/Core/include/Framework/CompletionPolicy.h | 9 +++++++++ Framework/Core/src/DataProcessingDevice.cxx | 11 +++++++++++ 2 files changed, 20 insertions(+) diff --git a/Framework/Core/include/Framework/CompletionPolicy.h b/Framework/Core/include/Framework/CompletionPolicy.h index 55d3014166956..987adfac90e7a 100644 --- a/Framework/Core/include/Framework/CompletionPolicy.h +++ b/Framework/Core/include/Framework/CompletionPolicy.h @@ -62,6 +62,13 @@ struct CompletionPolicy { Retry, }; + /// Order in which the completed slots must be consumed + enum struct CompletionOrder { + Any, + Timeslice, + Slot + }; + using Matcher = std::function; using InputSetElement = DataRef; using CallbackFull = std::function const&, ServiceRegistryRef&)>; @@ -91,6 +98,8 @@ struct CompletionPolicy { /// data. bool balanceChannels = true; + CompletionOrder order = CompletionOrder::Any; + /// Helper to create the default configuration. static std::vector createDefaultPolicies(); }; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index e6bf8f3330597..36f12156bab85 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -2307,6 +2307,17 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v using namespace o2::framework; stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast(relayer.getParallelTimeslices() - completed.size())}); stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0}); + switch (spec.completionPolicy.order) { + case CompletionPolicy::CompletionOrder::Timeslice: + std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; }); + break; + case CompletionPolicy::CompletionOrder::Slot: + std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; }); + break; + case CompletionPolicy::CompletionOrder::Any: + default: + break; + } for (auto action : completed) { O2_SIGNPOST_ID_GENERATE(aid, device);