From 24e05f9d9f82045330f8a64c941b089c18441797 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Mon, 9 Dec 2024 23:04:34 +0100 Subject: [PATCH] QC-1253 Mergers: Shorter latency with multiple layers (#13782) If we run multiple layers of Mergers, the merged object arrival time can be described as: merger cycle duration * number of layers (it can be shorter due to randomized timer shifts at startup). As a consequence, adding each new layer adds the latency to the merger topology. Assuming that the deployed Mergers are not expendable, we can rely on expecting the right number of input messages to know that each Merger in the lower layer produced an update, so we can publish the merged object. As an effect, we get lower latency. --- .../Mergers/include/Mergers/FullHistoryMerger.h | 1 + .../Mergers/include/Mergers/IntegratingMerger.h | 1 + .../Mergers/include/Mergers/MergerConfig.h | 8 +++++--- Utilities/Mergers/src/FullHistoryMerger.cxx | 17 ++++++++++++++++- Utilities/Mergers/src/IntegratingMerger.cxx | 13 ++++++++++++- .../Mergers/src/MergerInfrastructureBuilder.cxx | 15 +++++++++++---- .../Mergers/test/mergersBenchmarkTopology.cxx | 6 +++--- 7 files changed, 49 insertions(+), 12 deletions(-) diff --git a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h index a62f5acf685fc..03e62d040a8b2 100644 --- a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h +++ b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h @@ -71,6 +71,7 @@ class FullHistoryMerger : public framework::Task void mergeCache(); void publish(framework::DataAllocator& allocator); void clear(); + bool shouldFinishCycle(const framework::InputRecord& inputs) const; }; } // namespace o2::mergers diff --git a/Utilities/Mergers/include/Mergers/IntegratingMerger.h b/Utilities/Mergers/include/Mergers/IntegratingMerger.h index 8a6cecc437d1e..7a8da4a3f0d29 100644 --- a/Utilities/Mergers/include/Mergers/IntegratingMerger.h +++ b/Utilities/Mergers/include/Mergers/IntegratingMerger.h @@ -60,6 +60,7 @@ class IntegratingMerger : public framework::Task void publishMovingWindow(framework::DataAllocator& allocator); static void merge(ObjectStore& mMergedDelta, ObjectStore&& other); void clear(); + bool shouldFinishCycle(const framework::InputRecord&) const; private: header::DataHeader::SubSpecificationType mSubSpec; diff --git a/Utilities/Mergers/include/Mergers/MergerConfig.h b/Utilities/Mergers/include/Mergers/MergerConfig.h index 1f626cd4345c5..dcaf2682eaa05 100644 --- a/Utilities/Mergers/include/Mergers/MergerConfig.h +++ b/Utilities/Mergers/include/Mergers/MergerConfig.h @@ -41,7 +41,7 @@ enum class MergedObjectTimespan { // when InputObjectsTimespan::FullHistory is set. LastDifference, // Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite). - // The the above will be removed once we switch to NCycles in QC. + // The above will be removed once we switch to NCycles in QC. NCycles }; @@ -52,7 +52,8 @@ enum class PublishMovingWindow { }; enum class PublicationDecision { - EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2... + EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2... + EachNArrivals, // Merged object is published whenever we receive N new input objects. }; enum class TopologySize { @@ -66,6 +67,7 @@ enum class ParallelismType { RoundRobin // Mergers receive their input messages in round robin order. Useful when there is one InputSpec with a wildcard. }; +// fixme: this way of configuring mergers should be refactored, it does not make sense that we share `param`s across for different enum values. template struct ConfigEntry { V value; @@ -82,7 +84,7 @@ class PublicationDecisionParameter PublicationDecisionParameter(size_t param) : decision({{param, 1}}) {} PublicationDecisionParameter(const std::vector>& decision) : decision(decision) {} - std::vector> decision; + std::vector> decision; }; // todo rework configuration in a way that user cannot create an invalid configuration diff --git a/Utilities/Mergers/src/FullHistoryMerger.cxx b/Utilities/Mergers/src/FullHistoryMerger.cxx index 079f557ec83d5..1ae566d7953db 100644 --- a/Utilities/Mergers/src/FullHistoryMerger.cxx +++ b/Utilities/Mergers/src/FullHistoryMerger.cxx @@ -76,7 +76,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx) } } - if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) { + if (shouldFinishCycle(ctx.inputs())) { mCyclesSinceReset++; mergeCache(); publish(ctx.outputs()); @@ -88,6 +88,21 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx) } } +bool FullHistoryMerger::shouldFinishCycle(const framework::InputRecord& inputs) const +{ + if (mFirstObjectSerialized.first.empty()) { + return false; + } + + if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) { + return inputs.isValid("timer-publish"); + } else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) { + return mUpdatesReceived > 0 && mUpdatesReceived % mConfig.publicationDecision.param.decision.begin()->first == 0; + } else { + throw std::runtime_error("unsupported publication decision parameter"); + } +} + void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext) { mergeCache(); diff --git a/Utilities/Mergers/src/IntegratingMerger.cxx b/Utilities/Mergers/src/IntegratingMerger.cxx index 749becd463a5d..e9cfec1cfe8cc 100644 --- a/Utilities/Mergers/src/IntegratingMerger.cxx +++ b/Utilities/Mergers/src/IntegratingMerger.cxx @@ -68,11 +68,22 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx) } } - if (ctx.inputs().isValid("timer-publish")) { + if (shouldFinishCycle(ctx.inputs())) { finishCycle(ctx.outputs()); } } +bool IntegratingMerger::shouldFinishCycle(const framework::InputRecord& inputs) const +{ + if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) { + return inputs.isValid("timer-publish"); + } else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) { + return mDeltasMerged > 0 && mDeltasMerged % mConfig.publicationDecision.param.decision.begin()->first == 0; + } else { + throw std::runtime_error("unsupported publication decision parameter"); + } +} + void IntegratingMerger::finishCycle(DataAllocator& outputs) { mCyclesSinceReset++; diff --git a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx index 233631f0fe80b..9fcb6aaa482dd 100644 --- a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx +++ b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx @@ -128,7 +128,8 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() auto layerInputs = mInputs; // preparing some numbers - auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size()); + const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size()); + const bool expendable = std::ranges::any_of(mConfig.labels, [](const auto& label) { return label.value == "expendable"; }); // topology generation MergerBuilder mergerBuilder; @@ -150,7 +151,6 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() // we also expect moving windows to be published only by the last layer layerConfig.publishMovingWindow = {PublishMovingWindow::No}; } - mergerBuilder.setConfig(layerConfig); framework::Inputs nextLayerInputs; auto inputsRangeBegin = layerInputs.begin(); @@ -162,13 +162,19 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (m < inputsPerMergerRemainder); mergerBuilder.setInputSpecs(framework::Inputs(inputsRangeBegin, inputsRangeEnd)); - inputsRangeBegin = inputsRangeEnd; + if (layer > 1 && !expendable) { + // we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs. + // we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes. + const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd); + assert(inputNumber != 0); + layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber}; + } if (layer == mergersPerLayer.size() - 1) { // the last layer => use the specified external OutputSpec mergerBuilder.setOutputSpec(mOutputSpecIntegral); } - + mergerBuilder.setConfig(layerConfig); auto merger = mergerBuilder.buildSpec(); auto input = DataSpecUtils::matchingInput(merger.outputs.at(0)); @@ -176,6 +182,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() nextLayerInputs.push_back(input); workflow.emplace_back(std::move(merger)); + inputsRangeBegin = inputsRangeEnd; } layerInputs = nextLayerInputs; // todo: could be optimised with pointers } diff --git a/Utilities/Mergers/test/mergersBenchmarkTopology.cxx b/Utilities/Mergers/test/mergersBenchmarkTopology.cxx index 4cf8e84a37e63..beb9a572b6685 100644 --- a/Utilities/Mergers/test/mergersBenchmarkTopology.cxx +++ b/Utilities/Mergers/test/mergersBenchmarkTopology.cxx @@ -71,14 +71,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config) for (size_t p = 0; p < objectsProducers; p++) { mergersInputs.push_back({ "mo", "TST", "HISTO", static_cast(p + 1), - Lifetime::Timeframe }); + Lifetime::Sporadic }); DataProcessorSpec producer{ "producer-histo" + std::to_string(p), Inputs{}, Outputs{ { { "mo" }, "TST", "HISTO", static_cast(p + 1), - Lifetime::Timeframe } }, + Lifetime::Sporadic } }, AlgorithmSpec{ (AlgorithmSpec::ProcessCallback)[ p, periodus = int(1000000 / objectsRate), objectsBins, objectsProducers ]( ProcessingContext& processingContext) mutable { static auto lastTime = steady_clock::now(); @@ -115,7 +115,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config) DataProcessorSpec printer{ "printer-bins", Inputs{ - { "histo", "TST", "HISTO", 0 } + { "histo", "TST", "HISTO", 0, Lifetime::Sporadic } }, Outputs{}, AlgorithmSpec{