From bf10d957676827ed96a66360eea2add43fbf4ab9 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 2 Feb 2024 11:42:03 +0100 Subject: [PATCH] DPL: fix circular dependency between expendable / resilient tasks and output proxy The output proxy was always sorted last, however this clearly conflicts in the case there is expendable tasks or resilient one, where the behavior than depends on the resiliency of the output proxy itself and the data dependencies. This address the issue and hopefully fixes the problem for good. --- Framework/Core/src/TopologyPolicy.cxx | 58 +++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/Framework/Core/src/TopologyPolicy.cxx b/Framework/Core/src/TopologyPolicy.cxx index fb96eff5af2fc..ebeb70c7950a3 100644 --- a/Framework/Core/src/TopologyPolicy.cxx +++ b/Framework/Core/src/TopologyPolicy.cxx @@ -10,9 +10,12 @@ // or submit itself to any jurisdiction. #include "Framework/DataProcessorSpec.h" #include "Framework/TopologyPolicy.h" +#include "Framework/Signpost.h" #include #include +O2_DECLARE_DYNAMIC_LOG(topology); + namespace o2::framework { @@ -68,15 +71,20 @@ bool dataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b) bool expendableDataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b) { + O2_SIGNPOST_ID_GENERATE(sid, topology); + O2_SIGNPOST_START(topology, sid, "expendableDataDeps", "Checking if %s depends on %s", a.name.c_str(), b.name.c_str()); // We never put anything behind the dummy sink. if (b.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) { + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is dummy sink and it nothing can depend on it.", b.name.c_str()); return false; } if (a.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) { + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. %s is dummy sink and it nothing can depend on it.", a.name.c_str()); return true; } /// If there is an actual dependency between a and b, we return true. if (dataDeps(a, b)) { + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. %s has a data dependency on %s", a.name.c_str(), b.name.c_str()); return true; } // If we are here we do not have any data dependency, @@ -101,27 +109,35 @@ bool expendableDataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b) // If none is expendable. We simply return false and sort as usual. if (!isAExpendable && !isBExpendable) { - LOGP(debug, "Neither {} nor {} are expendable. No dependency beyond data deps.", a.name, b.name); + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. Neither %s nor %s are expendable. No dependency beyond data deps.", + a.name.c_str(), b.name.c_str()); return false; } // If both are expendable. We return false and sort as usual. if (isAExpendable && isBExpendable) { - LOGP(debug, "Both {} and {} are expendable. No dependency.", a.name, b.name); + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. Both %s and %s are expendable. No dependency.", + a.name.c_str(), b.name.c_str()); return false; } // If b is expendable but b is resilient, we can keep the same order. if (isAExpendable && bResilient) { - LOGP(debug, "{} is expendable but b is resilient, no need to add an unneeded dependency", a.name, a.name, b.name); + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is expendable but %s is resilient, no need to add an unneeded dependency", + a.name.c_str(), b.name.c_str()); return false; } // If a is expendable we consider it as if there was a dependency from a to b, // however we still need to check if there is not one already from b to a. if (isAExpendable) { - LOGP(debug, "{} is expendable. Checking if there is a dependency from {} to {}.", a.name, b.name, a.name); - return !dataDeps(b, a); + bool hasDependency = dataDeps(b, a); + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "%s is expendable. %s from %s to %s => %s.", + a.name.c_str(), hasDependency ? "There is however an inverse dependency" : "No inverse dependency", b.name.c_str(), a.name.c_str(), + !hasDependency ? "true" : "false"); + return !hasDependency; } // b is expendable and a is not. We are fine with no dependency. + O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is expendable but %s is not. No need to add an unneeded dependency.", + b.name.c_str(), a.name.c_str()); return false; }; @@ -135,18 +151,46 @@ TopologyPolicy::DependencyChecker TopologyPolicyHelpers::dataDependency() TopologyPolicy::DependencyChecker TopologyPolicyHelpers::alwaysDependent() { return [](DataProcessorSpec const& dependent, DataProcessorSpec const& ancestor) { + O2_SIGNPOST_ID_GENERATE(sid, topology); + O2_SIGNPOST_START(topology, sid, "alwaysDependent", "Checking if %s depends on %s", dependent.name.c_str(), ancestor.name.c_str()); if (dependent.name == ancestor.name) { + O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. %s and %s are the same.", dependent.name.c_str(), ancestor.name.c_str()); return false; } if (ancestor.name == "internal-dpl-injected-dummy-sink") { + O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. %s is a dummy sink.", ancestor.name.c_str()); return false; } const std::regex matcher(".*output-proxy.*"); // Check if regex applies std::cmatch m; - if (std::regex_match(ancestor.name.data(), m, matcher) && std::regex_match(ancestor.name.data(), m, matcher)) { - return dataDeps(dependent, ancestor); + bool isAncestorOutputProxy = std::regex_match(ancestor.name.data(), m, matcher); + // For now dependent is always an output proxy. + assert(std::regex_match(dependent.name.data(), m, matcher)); + bool isAncestorExpendable = std::find_if(ancestor.labels.begin(), ancestor.labels.end(), [](DataProcessorLabel const& label) { + return label.value == "expendable"; + }) != ancestor.labels.end(); + + bool isDependentResilient = std::find_if(dependent.labels.begin(), dependent.labels.end(), [](DataProcessorLabel const& label) { + return label.value == "resilient"; + }) != dependent.labels.end(); + bool isAncestorResilient = std::find_if(ancestor.labels.begin(), ancestor.labels.end(), [](DataProcessorLabel const& label) { + return label.value == "resilient"; + }) != ancestor.labels.end(); + + if (!isDependentResilient && isAncestorExpendable) { + O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. Ancestor %s is expendable while %s is non-resilient output proxy (dependent).", + ancestor.name.c_str(), dependent.name.c_str()); + return false; + } + + if (isAncestorOutputProxy || (!isDependentResilient && isAncestorResilient)) { + bool hasDependency = dataDeps(dependent, ancestor); + O2_SIGNPOST_END(topology, sid, "alwaysDependent", "%s. Dependent %s %s a dependency on ancestor %s.", + hasDependency ? "true" : "false", dependent.name.c_str(), hasDependency ? "has" : "has not", ancestor.name.c_str()); + return hasDependency; } + O2_SIGNPOST_END(topology, sid, "alwaysDependent", "true by default. Ancestor %s is not an output proxy.", ancestor.name.c_str()); return true; }; }