Skip to content

Commit

Permalink
DPL: provide ability to customise consumption order
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jul 24, 2024
1 parent c57a63a commit 642289b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Framework/Core/include/Framework/CompletionPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ struct CompletionPolicy {
Retry,
};

/// Order in which the completed slots must be consumed
enum struct CompletionOrder {
Any,
Timeslice,
Slot
};

using Matcher = std::function<bool(DeviceSpec const& device)>;
using InputSetElement = DataRef;
using CallbackFull = std::function<CompletionOp(InputSpan const&, std::vector<InputSpec> const&, ServiceRegistryRef&)>;
Expand Down Expand Up @@ -91,6 +98,8 @@ struct CompletionPolicy {
/// data.
bool balanceChannels = true;

CompletionOrder order = CompletionOrder::Any;

/// Helper to create the default configuration.
static std::vector<CompletionPolicy> createDefaultPolicies();
};
Expand Down
11 changes: 11 additions & 0 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,17 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
using namespace o2::framework;
stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
switch (spec.completionPolicy.order) {
case CompletionPolicy::CompletionOrder::Timeslice:
std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
break;
case CompletionPolicy::CompletionOrder::Slot:
std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
break;
case CompletionPolicy::CompletionOrder::Any:
default:
break;
}

for (auto action : completed) {
O2_SIGNPOST_ID_GENERATE(aid, device);
Expand Down

0 comments on commit 642289b

Please sign in to comment.