Skip to content

Commit

Permalink
DPL: fix circular dependency between expendable / resilient tasks and…
Browse files Browse the repository at this point in the history
… output proxy

The output proxy was always sorted last, however this clearly conflicts in the case
there is expendable tasks or resilient one, where the behavior than depends on the
resiliency of the output proxy itself and the data dependencies.

This address the issue and hopefully fixes the problem for good.
  • Loading branch information
ktf authored and davidrohr committed Feb 2, 2024
1 parent 47f7d02 commit bf10d95
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions Framework/Core/src/TopologyPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
// or submit itself to any jurisdiction.
#include "Framework/DataProcessorSpec.h"
#include "Framework/TopologyPolicy.h"
#include "Framework/Signpost.h"
#include <string>
#include <regex>

O2_DECLARE_DYNAMIC_LOG(topology);

namespace o2::framework
{

Expand Down Expand Up @@ -68,15 +71,20 @@ bool dataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b)

bool expendableDataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b)
{
O2_SIGNPOST_ID_GENERATE(sid, topology);
O2_SIGNPOST_START(topology, sid, "expendableDataDeps", "Checking if %s depends on %s", a.name.c_str(), b.name.c_str());
// We never put anything behind the dummy sink.
if (b.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is dummy sink and it nothing can depend on it.", b.name.c_str());
return false;
}
if (a.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. %s is dummy sink and it nothing can depend on it.", a.name.c_str());
return true;
}
/// If there is an actual dependency between a and b, we return true.
if (dataDeps(a, b)) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. %s has a data dependency on %s", a.name.c_str(), b.name.c_str());
return true;
}
// If we are here we do not have any data dependency,
Expand All @@ -101,27 +109,35 @@ bool expendableDataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b)

// If none is expendable. We simply return false and sort as usual.
if (!isAExpendable && !isBExpendable) {
LOGP(debug, "Neither {} nor {} are expendable. No dependency beyond data deps.", a.name, b.name);
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. Neither %s nor %s are expendable. No dependency beyond data deps.",
a.name.c_str(), b.name.c_str());
return false;
}
// If both are expendable. We return false and sort as usual.
if (isAExpendable && isBExpendable) {
LOGP(debug, "Both {} and {} are expendable. No dependency.", a.name, b.name);
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. Both %s and %s are expendable. No dependency.",
a.name.c_str(), b.name.c_str());
return false;
}

// If b is expendable but b is resilient, we can keep the same order.
if (isAExpendable && bResilient) {
LOGP(debug, "{} is expendable but b is resilient, no need to add an unneeded dependency", a.name, a.name, b.name);
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is expendable but %s is resilient, no need to add an unneeded dependency",
a.name.c_str(), b.name.c_str());
return false;
}
// If a is expendable we consider it as if there was a dependency from a to b,
// however we still need to check if there is not one already from b to a.
if (isAExpendable) {
LOGP(debug, "{} is expendable. Checking if there is a dependency from {} to {}.", a.name, b.name, a.name);
return !dataDeps(b, a);
bool hasDependency = dataDeps(b, a);
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "%s is expendable. %s from %s to %s => %s.",
a.name.c_str(), hasDependency ? "There is however an inverse dependency" : "No inverse dependency", b.name.c_str(), a.name.c_str(),
!hasDependency ? "true" : "false");
return !hasDependency;
}
// b is expendable and a is not. We are fine with no dependency.
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is expendable but %s is not. No need to add an unneeded dependency.",
b.name.c_str(), a.name.c_str());
return false;
};

Expand All @@ -135,18 +151,46 @@ TopologyPolicy::DependencyChecker TopologyPolicyHelpers::dataDependency()
TopologyPolicy::DependencyChecker TopologyPolicyHelpers::alwaysDependent()
{
return [](DataProcessorSpec const& dependent, DataProcessorSpec const& ancestor) {
O2_SIGNPOST_ID_GENERATE(sid, topology);
O2_SIGNPOST_START(topology, sid, "alwaysDependent", "Checking if %s depends on %s", dependent.name.c_str(), ancestor.name.c_str());
if (dependent.name == ancestor.name) {
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. %s and %s are the same.", dependent.name.c_str(), ancestor.name.c_str());
return false;
}
if (ancestor.name == "internal-dpl-injected-dummy-sink") {
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. %s is a dummy sink.", ancestor.name.c_str());
return false;
}
const std::regex matcher(".*output-proxy.*");
// Check if regex applies
std::cmatch m;
if (std::regex_match(ancestor.name.data(), m, matcher) && std::regex_match(ancestor.name.data(), m, matcher)) {
return dataDeps(dependent, ancestor);
bool isAncestorOutputProxy = std::regex_match(ancestor.name.data(), m, matcher);
// For now dependent is always an output proxy.
assert(std::regex_match(dependent.name.data(), m, matcher));
bool isAncestorExpendable = std::find_if(ancestor.labels.begin(), ancestor.labels.end(), [](DataProcessorLabel const& label) {
return label.value == "expendable";
}) != ancestor.labels.end();

bool isDependentResilient = std::find_if(dependent.labels.begin(), dependent.labels.end(), [](DataProcessorLabel const& label) {
return label.value == "resilient";
}) != dependent.labels.end();
bool isAncestorResilient = std::find_if(ancestor.labels.begin(), ancestor.labels.end(), [](DataProcessorLabel const& label) {
return label.value == "resilient";
}) != ancestor.labels.end();

if (!isDependentResilient && isAncestorExpendable) {
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. Ancestor %s is expendable while %s is non-resilient output proxy (dependent).",
ancestor.name.c_str(), dependent.name.c_str());
return false;
}

if (isAncestorOutputProxy || (!isDependentResilient && isAncestorResilient)) {
bool hasDependency = dataDeps(dependent, ancestor);
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "%s. Dependent %s %s a dependency on ancestor %s.",
hasDependency ? "true" : "false", dependent.name.c_str(), hasDependency ? "has" : "has not", ancestor.name.c_str());
return hasDependency;
}
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "true by default. Ancestor %s is not an output proxy.", ancestor.name.c_str());
return true;
};
}
Expand Down

0 comments on commit bf10d95

Please sign in to comment.