Skip to content

Commit

Permalink
DPL: add ability to disable inputs programmatically
Browse files Browse the repository at this point in the history
Disabled inputs will not result in an actual route for the data, however it
will be stored in the configuration, so that analysis workflows will not need
to have the configuration available at every step.
  • Loading branch information
ktf committed Feb 2, 2024
1 parent aab8635 commit 78f4a8d
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 22 deletions.
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/InputSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ struct InputSpec {

/// A set of configurables which can be used to customise the InputSpec.
std::vector<ConfigParamSpec> metadata;
/// Wether or not the input is to be considered enabled.
/// Useful to programmatically disable inputs e.g. for the ProcessorOptions.
bool enabled = true;

friend std::ostream& operator<<(std::ostream& stream, InputSpec const& arg);
bool operator==(InputSpec const& that) const;
Expand Down
4 changes: 4 additions & 0 deletions Framework/Core/include/Framework/OutputSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ struct OutputSpec {
/// A set of configurables which can be used to customise the InputSpec.
std::vector<ConfigParamSpec> metadata;

/// Wether or not this output is enabled. This is useful to decide programmatically
/// wether or not to produce a given output.
bool enabled = true;

friend std::ostream& operator<<(std::ostream& stream, OutputSpec const& arg);
};

Expand Down
15 changes: 13 additions & 2 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -679,19 +679,27 @@ void DeviceSpecHelpers::processOutEdgeActions(ConfigContext const& configContext
assert(policyPtr != nullptr);

if (edge.isForward == false) {
auto& matcher = outputsMatchers[edge.outputGlobalIndex];
if (matcher.enabled == false) {
throw runtime_error_f("Output %s is disabled but it was still used in topology", DataSpecUtils::describe(matcher).data());
}
OutputRoute route{
edge.timeIndex,
consumer.maxInputTimeslices,
outputsMatchers[edge.outputGlobalIndex],
matcher,
channel.name,
policyPtr,
};
device.outputs.emplace_back(route);
} else {
auto& matcher = workflow[edge.consumer].inputs[edge.consumerInputIndex];
if (matcher.enabled == false) {
throw runtime_error_f("Output %s is disabled but it was still used in topology", DataSpecUtils::describe(matcher).data());
}
ForwardRoute route{
edge.timeIndex,
consumer.maxInputTimeslices,
workflow[edge.consumer].inputs[edge.consumerInputIndex],
matcher,
channel.name};
device.forwards.emplace_back(route);
}
Expand Down Expand Up @@ -915,6 +923,9 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,

auto const& inputSpec = consumer.inputs[edge.consumerInputIndex];
auto const& sourceChannel = consumerDevice.inputChannels[ci].name;
if (inputSpec.enabled == false) {
throw runtime_error_f("Input %s is disabled but it was still used in topology", DataSpecUtils::describe(inputSpec).data());
}

InputRoute route{
inputSpec,
Expand Down
12 changes: 9 additions & 3 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -841,11 +841,13 @@ void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow,
for (size_t wi = 0; wi < workflow.size(); ++wi) {
auto& producer = workflow[wi];

for (size_t oi = 0; oi < producer.outputs.size(); ++oi) {
auto& out = producer.outputs[oi];
for (auto& output : producer.outputs) {
if (output.enabled == false) {
continue;
}
auto uniqueOutputId = outputs.size();
availableOutputsInfo.emplace_back(LogicalOutputInfo{wi, uniqueOutputId, false});
outputs.push_back(out);
outputs.push_back(output);
}
}
};
Expand Down Expand Up @@ -879,6 +881,10 @@ void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow,
std::vector<bool> matches(constOutputs.size());
for (size_t consumer = 0; consumer < workflow.size(); ++consumer) {
for (size_t input = 0; input < workflow[consumer].inputs.size(); ++input) {
// Skip disabled inputs.
if (workflow[consumer].inputs[input].enabled == false) {
continue;
}
forwards.clear();
for (size_t i = 0; i < constOutputs.size(); i++) {
matches[i] = DataSpecUtils::match(workflow[consumer].inputs[input], constOutputs[i]);
Expand Down
32 changes: 31 additions & 1 deletion Framework/Core/src/WorkflowSerializationHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
IN_INPUT_ORIGIN,
IN_INPUT_DESCRIPTION,
IN_INPUT_SUBSPEC,
IN_INPUT_ENABLED,
IN_INPUT_ORIGIN_REF,
IN_INPUT_DESCRIPTION_REF,
IN_INPUT_SUBSPEC_REF,
Expand All @@ -75,6 +76,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
IN_OUTPUT_BINDING,
IN_OUTPUT_ORIGIN,
IN_OUTPUT_DESCRIPTION,
IN_OUTPUT_ENABLED,
IN_OUTPUT_SUBSPEC,
IN_OUTPUT_LIFETIME,
IN_OUTPUT_OPTIONS,
Expand Down Expand Up @@ -167,6 +169,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
case State::IN_INPUT_SUBSPEC:
s << "IN_INPUT_SUBSPEC";
break;
case State::IN_INPUT_ENABLED:
s << "IN_INPUT_ENABLED";
break;
case State::IN_INPUT_ORIGIN_REF:
s << "IN_INPUT_ORIGIN_REF";
break;
Expand Down Expand Up @@ -218,6 +223,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
case State::IN_OUTPUT_OPTIONS:
s << "IN_OUTPUT_OPTIONS";
break;
case WorkflowImporter::State::IN_OUTPUT_ENABLED:
s << "IN_OUTPUT_ENABLED";
break;
case State::IN_OPTION:
s << "IN_OPTION";
break;
Expand Down Expand Up @@ -302,6 +310,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
dataProcessors.push_back(DataProcessorSpec{});
} else if (in(State::IN_INPUTS)) {
push(State::IN_INPUT);
enabled = true;
inputMatcherNodes.clear();
} else if (in(State::IN_INPUT_MATCHER)) {
// start a new embedded matcher
Expand All @@ -313,6 +322,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
// will be merged into the parent matcher
} else if (in(State::IN_OUTPUTS)) {
push(State::IN_OUTPUT);
enabled = true;
outputHasSubSpec = false;
} else if (in(State::IN_OPTIONS)) {
push(State::IN_OPTION);
Expand Down Expand Up @@ -370,9 +380,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
} else {
dataProcessors.back().inputs.push_back(InputSpec({binding}, std::move(*matcher), lifetime, inputOptions));
}
dataProcessors.back().inputs.back().enabled = enabled;
inputMatcherNodes.clear();
inputOptions.clear();

} else if (in(State::IN_INPUT_MATCHER) && inputMatcherNodes.size() > 1) {
data_matcher::Node child = std::move(inputMatcherNodes.back());
inputMatcherNodes.pop_back();
Expand Down Expand Up @@ -433,6 +443,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
} else {
dataProcessors.back().outputs.push_back(OutputSpec({binding}, {origin, description}, lifetime));
}
dataProcessors.back().outputs.back().enabled = enabled;
outputHasSubSpec = false;
} else if (in(State::IN_OPTION)) {
std::unique_ptr<ConfigParamSpec> opt{nullptr};
Expand Down Expand Up @@ -545,12 +556,14 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
push(State::IN_DATAPROCESSORS);
} else if (in(State::IN_INPUTS)) {
push(State::IN_INPUT);
enabled = true;
} else if (in(State::IN_INPUT_OPTIONS)) {
push(State::IN_OPTION);
} else if (in(State::IN_OUTPUT_OPTIONS)) {
push(State::IN_OPTION);
} else if (in(State::IN_OUTPUTS)) {
push(State::IN_OUTPUT);
enabled = true;
outputHasSubSpec = false;
} else if (in(State::IN_OPTIONS)) {
push(State::IN_OPTION);
Expand Down Expand Up @@ -602,6 +615,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
push(State::IN_INPUT_DESCRIPTION_REF);
} else if (in(State::IN_INPUT) && strncmp(str, "subspecRef", length) == 0) {
push(State::IN_INPUT_SUBSPEC_REF);
} else if (in(State::IN_INPUT) && strncmp(str, "enabled", length) == 0) {
push(State::IN_INPUT_ENABLED);
} else if (in(State::IN_INPUT) && strncmp(str, "matcher", length) == 0) {
// the outermost matcher is starting here
// we create a placeholder which is being updated later
Expand Down Expand Up @@ -664,6 +679,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
push(State::IN_OUTPUT_LIFETIME);
} else if (in(State::IN_OUTPUT) && strncmp(str, "metadata", length) == 0) {
push(State::IN_OUTPUT_OPTIONS);
} else if (in(State::IN_OUTPUT) && strncmp(str, "enabled", length) == 0) {
push(State::IN_OUTPUT_ENABLED);
} else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "name", length) == 0) {
push(State::IN_DATAPROCESSOR_NAME);
} else if (in(State::IN_DATAPROCESSOR) && strncmp(str, "ranks", length) == 0) {
Expand Down Expand Up @@ -734,6 +751,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
metadata.back().executable = s;
} else if (in(State::IN_INPUT_BINDING)) {
binding = s;
} else if (in(State::IN_INPUT_ENABLED)) {
enabled = (s == "true");
} else if (in(State::IN_INPUT_ORIGIN)) {
origin.runtimeInit(s.c_str(), std::min(s.size(), 4UL));
std::string v(s.c_str(), std::min(s.size(), 4UL));
Expand Down Expand Up @@ -841,6 +860,10 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
dataProcessors.back().inputTimeSliceId = i;
} else if (in(State::IN_DATAPROCESSOR_MAX_TIMESLICES)) {
dataProcessors.back().maxInputTimeslices = i;
} else if (in(State::IN_INPUT_ENABLED)) {
enabled = (i == 1);
} else if (in(State::IN_OUTPUT_ENABLED)) {
enabled = (i == 1);
}
pop();
return true;
Expand Down Expand Up @@ -915,6 +938,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
header::DataDescription description;
size_t subspec;
size_t ref;
// Keep track of the enabled state of the input/output
// Unless specified, inputs are enabled by default.
bool enabled = true;
Lifetime lifetime;
std::string metadatumKey;
std::string metadatumValue;
Expand Down Expand Up @@ -1103,6 +1129,8 @@ void WorkflowSerializationHelpers::dump(std::ostream& out,
}
w.Key("lifetime");
w.Uint((int)input.lifetime);
w.Key("enabled");
w.Uint((int)input.enabled);
if (input.metadata.empty() == false) {
w.Key("metadata");
w.StartArray();
Expand Down Expand Up @@ -1152,6 +1180,8 @@ void WorkflowSerializationHelpers::dump(std::ostream& out,
}
w.Key("lifetime");
w.Uint((int)output.lifetime);
w.Key("enabled");
w.Uint((int)output.enabled);
if (output.metadata.empty() == false) {
w.Key("metadata");
w.StartArray();
Expand Down
48 changes: 32 additions & 16 deletions Framework/Core/test/test_FrameworkDataFlowToDDS.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,16 @@ TEST_CASE("TestDDS")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
},
{
&quot;binding&quot;: &quot;TST/A2/0&quot;,
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A2&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;options&quot;: [],
Expand All @@ -195,7 +197,8 @@ TEST_CASE("TestDDS")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;outputs&quot;: [
Expand All @@ -204,7 +207,8 @@ TEST_CASE("TestDDS")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;B1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;options&quot;: [],
Expand All @@ -223,7 +227,8 @@ TEST_CASE("TestDDS")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A2&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;outputs&quot;: [
Expand All @@ -232,7 +237,8 @@ TEST_CASE("TestDDS")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;C1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;options&quot;: [],
Expand All @@ -251,14 +257,16 @@ TEST_CASE("TestDDS")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;B1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
},
{
&quot;binding&quot;: &quot;y&quot;,
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;C1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;outputs&quot;: [],
Expand Down Expand Up @@ -433,14 +441,16 @@ TEST_CASE("TestDDSExpendable")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
},
{
&quot;binding&quot;: &quot;TST/A2/0&quot;,
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A2&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;options&quot;: [],
Expand All @@ -459,7 +469,8 @@ TEST_CASE("TestDDSExpendable")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;outputs&quot;: [
Expand All @@ -468,7 +479,8 @@ TEST_CASE("TestDDSExpendable")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;B1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;options&quot;: [],
Expand All @@ -487,7 +499,8 @@ TEST_CASE("TestDDSExpendable")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;A2&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;outputs&quot;: [
Expand All @@ -496,7 +509,8 @@ TEST_CASE("TestDDSExpendable")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;C1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;options&quot;: [],
Expand All @@ -515,14 +529,16 @@ TEST_CASE("TestDDSExpendable")
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;B1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
},
{
&quot;binding&quot;: &quot;y&quot;,
&quot;origin&quot;: &quot;TST&quot;,
&quot;description&quot;: &quot;C1&quot;,
&quot;subspec&quot;: 0,
&quot;lifetime&quot;: 0
&quot;lifetime&quot;: 0,
&quot;enabled&quot;: 1
}
],
&quot;outputs&quot;: [],
Expand Down
Loading

0 comments on commit 78f4a8d

Please sign in to comment.