Skip to content

Commit

Permalink
DPL: do not update the new run flag unless we do process data
Browse files Browse the repository at this point in the history
Data might be dropped by the oldest possible timeframe mechanism for the
first timeslice which arrives, at which point we lose the information about
the new run.

Moving it down ensures that the check is done for the first timeframe which arrives.
  • Loading branch information
ktf authored and martenole committed Jan 28, 2024
1 parent 1ad00d0 commit 9b34ac3
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2134,7 +2134,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
// create messages) because the messages need to have the timeslice id into
// it.
auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
auto& dataProcessorContext = ref.get<DataProcessorContext>();
auto& relayer = ref.get<DataRelayer>();
auto& timingInfo = ref.get<TimingInfo>();
ZoneScopedN("DataProcessingDevice::prepareForCurrentTimeslice");
Expand All @@ -2145,6 +2144,12 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
timingInfo.runNumber = relayer.getRunNumberForSlot(i);
timingInfo.creation = relayer.getCreationTimeForSlot(i);
};
auto updateRunInformation = [ref](TimesliceSlot i) -> void {
auto& dataProcessorContext = ref.get<DataProcessorContext>();
auto& relayer = ref.get<DataRelayer>();
auto& timingInfo = ref.get<TimingInfo>();
auto timeslice = relayer.getTimesliceForSlot(i);
timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
// A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
Expand Down Expand Up @@ -2277,9 +2282,14 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
continue;
}

prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
action.op == CompletionPolicy::CompletionOp::Discard;
prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
if (action.op != CompletionPolicy::CompletionOp::Discard &&
action.op != CompletionPolicy::CompletionOp::Wait &&
action.op != CompletionPolicy::CompletionOp::Retry) {
updateRunInformation(TimesliceSlot{action.slot});
}
InputSpan span = getInputSpan(action.slot, shouldConsume);
auto& spec = ref.get<DeviceSpec const>();
InputRecord record{spec.inputs,
Expand Down

0 comments on commit 9b34ac3

Please sign in to comment.