diff --git a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h index a62f5acf685fc..03e62d040a8b2 100644 --- a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h +++ b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h @@ -71,6 +71,7 @@ class FullHistoryMerger : public framework::Task void mergeCache(); void publish(framework::DataAllocator& allocator); void clear(); + bool shouldFinishCycle(const framework::InputRecord& inputs) const; }; } // namespace o2::mergers diff --git a/Utilities/Mergers/include/Mergers/IntegratingMerger.h b/Utilities/Mergers/include/Mergers/IntegratingMerger.h index 8a6cecc437d1e..7a8da4a3f0d29 100644 --- a/Utilities/Mergers/include/Mergers/IntegratingMerger.h +++ b/Utilities/Mergers/include/Mergers/IntegratingMerger.h @@ -60,6 +60,7 @@ class IntegratingMerger : public framework::Task void publishMovingWindow(framework::DataAllocator& allocator); static void merge(ObjectStore& mMergedDelta, ObjectStore&& other); void clear(); + bool shouldFinishCycle(const framework::InputRecord&) const; private: header::DataHeader::SubSpecificationType mSubSpec; diff --git a/Utilities/Mergers/include/Mergers/MergerConfig.h b/Utilities/Mergers/include/Mergers/MergerConfig.h index 1f626cd4345c5..dcaf2682eaa05 100644 --- a/Utilities/Mergers/include/Mergers/MergerConfig.h +++ b/Utilities/Mergers/include/Mergers/MergerConfig.h @@ -41,7 +41,7 @@ enum class MergedObjectTimespan { // when InputObjectsTimespan::FullHistory is set. LastDifference, // Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite). - // The the above will be removed once we switch to NCycles in QC. + // The above will be removed once we switch to NCycles in QC. NCycles }; @@ -52,7 +52,8 @@ enum class PublishMovingWindow { }; enum class PublicationDecision { - EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2... + EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2... + EachNArrivals, // Merged object is published whenever we receive N new input objects. }; enum class TopologySize { @@ -66,6 +67,7 @@ enum class ParallelismType { RoundRobin // Mergers receive their input messages in round robin order. Useful when there is one InputSpec with a wildcard. }; +// fixme: this way of configuring mergers should be refactored, it does not make sense that we share `param`s across for different enum values. template struct ConfigEntry { V value; @@ -82,7 +84,7 @@ class PublicationDecisionParameter PublicationDecisionParameter(size_t param) : decision({{param, 1}}) {} PublicationDecisionParameter(const std::vector>& decision) : decision(decision) {} - std::vector> decision; + std::vector> decision; }; // todo rework configuration in a way that user cannot create an invalid configuration diff --git a/Utilities/Mergers/src/FullHistoryMerger.cxx b/Utilities/Mergers/src/FullHistoryMerger.cxx index 079f557ec83d5..1ae566d7953db 100644 --- a/Utilities/Mergers/src/FullHistoryMerger.cxx +++ b/Utilities/Mergers/src/FullHistoryMerger.cxx @@ -76,7 +76,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx) } } - if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) { + if (shouldFinishCycle(ctx.inputs())) { mCyclesSinceReset++; mergeCache(); publish(ctx.outputs()); @@ -88,6 +88,21 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx) } } +bool FullHistoryMerger::shouldFinishCycle(const framework::InputRecord& inputs) const +{ + if (mFirstObjectSerialized.first.empty()) { + return false; + } + + if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) { + return inputs.isValid("timer-publish"); + } else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) { + return mUpdatesReceived > 0 && mUpdatesReceived % mConfig.publicationDecision.param.decision.begin()->first == 0; + } else { + throw std::runtime_error("unsupported publication decision parameter"); + } +} + void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext) { mergeCache(); diff --git a/Utilities/Mergers/src/IntegratingMerger.cxx b/Utilities/Mergers/src/IntegratingMerger.cxx index 749becd463a5d..e9cfec1cfe8cc 100644 --- a/Utilities/Mergers/src/IntegratingMerger.cxx +++ b/Utilities/Mergers/src/IntegratingMerger.cxx @@ -68,11 +68,22 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx) } } - if (ctx.inputs().isValid("timer-publish")) { + if (shouldFinishCycle(ctx.inputs())) { finishCycle(ctx.outputs()); } } +bool IntegratingMerger::shouldFinishCycle(const framework::InputRecord& inputs) const +{ + if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) { + return inputs.isValid("timer-publish"); + } else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) { + return mDeltasMerged > 0 && mDeltasMerged % mConfig.publicationDecision.param.decision.begin()->first == 0; + } else { + throw std::runtime_error("unsupported publication decision parameter"); + } +} + void IntegratingMerger::finishCycle(DataAllocator& outputs) { mCyclesSinceReset++; diff --git a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx index 233631f0fe80b..9fcb6aaa482dd 100644 --- a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx +++ b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx @@ -128,7 +128,8 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() auto layerInputs = mInputs; // preparing some numbers - auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size()); + const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size()); + const bool expendable = std::ranges::any_of(mConfig.labels, [](const auto& label) { return label.value == "expendable"; }); // topology generation MergerBuilder mergerBuilder; @@ -150,7 +151,6 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() // we also expect moving windows to be published only by the last layer layerConfig.publishMovingWindow = {PublishMovingWindow::No}; } - mergerBuilder.setConfig(layerConfig); framework::Inputs nextLayerInputs; auto inputsRangeBegin = layerInputs.begin(); @@ -162,13 +162,19 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (m < inputsPerMergerRemainder); mergerBuilder.setInputSpecs(framework::Inputs(inputsRangeBegin, inputsRangeEnd)); - inputsRangeBegin = inputsRangeEnd; + if (layer > 1 && !expendable) { + // we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs. + // we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes. + const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd); + assert(inputNumber != 0); + layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber}; + } if (layer == mergersPerLayer.size() - 1) { // the last layer => use the specified external OutputSpec mergerBuilder.setOutputSpec(mOutputSpecIntegral); } - + mergerBuilder.setConfig(layerConfig); auto merger = mergerBuilder.buildSpec(); auto input = DataSpecUtils::matchingInput(merger.outputs.at(0)); @@ -176,6 +182,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() nextLayerInputs.push_back(input); workflow.emplace_back(std::move(merger)); + inputsRangeBegin = inputsRangeEnd; } layerInputs = nextLayerInputs; // todo: could be optimised with pointers } diff --git a/Utilities/Mergers/test/mergersBenchmarkTopology.cxx b/Utilities/Mergers/test/mergersBenchmarkTopology.cxx index 4cf8e84a37e63..beb9a572b6685 100644 --- a/Utilities/Mergers/test/mergersBenchmarkTopology.cxx +++ b/Utilities/Mergers/test/mergersBenchmarkTopology.cxx @@ -71,14 +71,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config) for (size_t p = 0; p < objectsProducers; p++) { mergersInputs.push_back({ "mo", "TST", "HISTO", static_cast(p + 1), - Lifetime::Timeframe }); + Lifetime::Sporadic }); DataProcessorSpec producer{ "producer-histo" + std::to_string(p), Inputs{}, Outputs{ { { "mo" }, "TST", "HISTO", static_cast(p + 1), - Lifetime::Timeframe } }, + Lifetime::Sporadic } }, AlgorithmSpec{ (AlgorithmSpec::ProcessCallback)[ p, periodus = int(1000000 / objectsRate), objectsBins, objectsProducers ]( ProcessingContext& processingContext) mutable { static auto lastTime = steady_clock::now(); @@ -115,7 +115,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config) DataProcessorSpec printer{ "printer-bins", Inputs{ - { "histo", "TST", "HISTO", 0 } + { "histo", "TST", "HISTO", 0, Lifetime::Sporadic } }, Outputs{}, AlgorithmSpec{