From bc7478d038e8c3e5077b1ff9ef6646117e36081a Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Tue, 14 Nov 2023 13:16:57 +0000 Subject: [PATCH] Mover selection --- src/eckit/CMakeLists.txt | 2 + src/eckit/io/DataHandle.cc | 9 +- src/eckit/io/DataHandle.h | 7 +- src/eckit/io/FileHandle.cc | 7 +- src/eckit/io/FileHandle.h | 3 +- src/eckit/io/MoverTransfer.cc | 67 +------------ src/eckit/io/MoverTransferSelection.cc | 133 +++++++++++++++++++++++++ src/eckit/io/MoverTransferSelection.h | 112 +++++++++++++++++++++ src/eckit/io/MultiHandle.cc | 14 +-- src/eckit/io/MultiHandle.h | 4 +- src/eckit/io/PartFileHandle.cc | 5 +- src/eckit/io/PartFileHandle.h | 3 +- src/eckit/io/SharedHandle.cc | 2 +- src/eckit/io/SharedHandle.h | 3 +- src/eckit/io/StatsHandle.cc | 7 +- src/eckit/io/StatsHandle.h | 5 +- src/eckit/io/TeeHandle.cc | 15 +-- src/eckit/io/TeeHandle.h | 5 +- src/eckit/io/cluster/ClusterNodes.cc | 16 ++- src/eckit/io/cluster/ClusterNodes.h | 2 + src/eckit/net/Connector.cc | 36 +------ src/eckit/net/Connector.h | 3 +- 22 files changed, 307 insertions(+), 153 deletions(-) create mode 100644 src/eckit/io/MoverTransferSelection.cc create mode 100644 src/eckit/io/MoverTransferSelection.h diff --git a/src/eckit/CMakeLists.txt b/src/eckit/CMakeLists.txt index 099080958..35f82a795 100644 --- a/src/eckit/CMakeLists.txt +++ b/src/eckit/CMakeLists.txt @@ -189,6 +189,8 @@ io/MemoryHandle.cc io/MemoryHandle.h io/MoverTransfer.cc io/MoverTransfer.h +io/MoverTransferSelection.cc +io/MoverTransferSelection.h io/MultiHandle.cc io/MultiHandle.h io/Offset.cc diff --git a/src/eckit/io/DataHandle.cc b/src/eckit/io/DataHandle.cc index b7e9fe771..aee3889a7 100644 --- a/src/eckit/io/DataHandle.cc +++ b/src/eckit/io/DataHandle.cc @@ -400,11 +400,6 @@ void DataHandle::restartWriteFrom(const Offset& offset) { throw NotImplemented(os.str(), Here()); } -const std::set& DataHandle::requiredMoverAttributes() const { - static std::set nullSet; - return nullSet; -} - void DataHandle::toLocal(Stream& s) const { s << *this; } @@ -417,7 +412,9 @@ void DataHandle::toRemote(Stream& s) const { s << *this; } -void DataHandle::cost(std::map& c, bool read) const {} +void DataHandle::selectMover(MoverTransferSelection&, bool) const { + +} DataHandle* DataHandle::clone() const { std::ostringstream os; diff --git a/src/eckit/io/DataHandle.h b/src/eckit/io/DataHandle.h index 9e27c1a81..ae5416d58 100644 --- a/src/eckit/io/DataHandle.h +++ b/src/eckit/io/DataHandle.h @@ -28,6 +28,7 @@ namespace eckit { class MD5; class Metrics; +class MoverTransferSelection; //---------------------------------------------------------------------------------------------------------------------- @@ -40,6 +41,7 @@ class RestartTransfer { const Offset& from() const { return from_; } }; + //---------------------------------------------------------------------------------------------------------------------- class DataHandle : public Streamable { @@ -125,13 +127,14 @@ class DataHandle : public Streamable { // For remote data movers virtual bool moveable() const { return false; } - virtual const std::set& requiredMoverAttributes() const; virtual void toLocal(Stream& s) const; virtual DataHandle* toLocal(); virtual void toRemote(Stream& s) const; - virtual void cost(std::map&, bool) const; + + virtual void selectMover(MoverTransferSelection&, bool read) const; + virtual std::string title() const; virtual std::string metricsTag() const; virtual void collectMetrics(const std::string& what) const; // Tag for metrics collection diff --git a/src/eckit/io/FileHandle.cc b/src/eckit/io/FileHandle.cc index 6499ce4b3..259160a61 100644 --- a/src/eckit/io/FileHandle.cc +++ b/src/eckit/io/FileHandle.cc @@ -22,6 +22,7 @@ #include "eckit/log/Log.h" #include "eckit/os/Stat.h" #include "eckit/utils/MD5.h" +#include "eckit/io/MoverTransferSelection.h" namespace eckit { @@ -278,13 +279,13 @@ void FileHandle::toRemote(Stream& s) const { s << *remote; } -void FileHandle::cost(std::map& c, bool read) const { +void FileHandle::selectMover(MoverTransferSelection& c, bool read) const { if (read) { - c[NodeInfo::thisNode().node()] += const_cast(this)->estimate(); + c.updateCost(NodeInfo::thisNode(), const_cast(this)->estimate()); } else { // Just mark the node as being a candidate - c[NodeInfo::thisNode().node()] += 0; + c.updateCost(NodeInfo::thisNode(), 0); } } diff --git a/src/eckit/io/FileHandle.h b/src/eckit/io/FileHandle.h index 09534f0f4..650b010f1 100644 --- a/src/eckit/io/FileHandle.h +++ b/src/eckit/io/FileHandle.h @@ -56,7 +56,8 @@ class FileHandle : public DataHandle { void restartReadFrom(const Offset& from) override; void restartWriteFrom(const Offset& from) override; void toRemote(Stream&) const override; - void cost(std::map&, bool) const override; + void selectMover(MoverTransferSelection&, bool read) const override; + std::string title() const override; std::string metricsTag() const override; diff --git a/src/eckit/io/MoverTransfer.cc b/src/eckit/io/MoverTransfer.cc index f322a7e1c..ac9b08bcd 100644 --- a/src/eckit/io/MoverTransfer.cc +++ b/src/eckit/io/MoverTransfer.cc @@ -8,21 +8,13 @@ * does it submit to any jurisdiction. */ - #include "eckit/io/MoverTransfer.h" -#include "eckit/io/cluster/ClusterNodes.h" -#include "eckit/io/cluster/NodeInfo.h" #include "eckit/log/Bytes.h" #include "eckit/log/Progress.h" #include "eckit/net/Connector.h" -#include "eckit/net/TCPServer.h" -#include "eckit/net/TCPStream.h" #include "eckit/runtime/Metrics.h" #include "eckit/runtime/Monitor.h" -#include "eckit/thread/AutoLock.h" -#include "eckit/thread/Thread.h" -#include "eckit/thread/ThreadControler.h" -#include "eckit/value/Value.h" +#include "eckit/io/MoverTransferSelection.h" namespace eckit { @@ -45,55 +37,17 @@ Length MoverTransfer::transfer(DataHandle& from, DataHandle& to) { throw SeriousBug(to.title() + " is not moveable"); } - // Attributes that are required from the mover - - std::set moverAttributes; - { - auto&& f = from.requiredMoverAttributes(); - moverAttributes.insert(f.begin(), f.end()); - auto&& t = to.requiredMoverAttributes(); - moverAttributes.insert(t.begin(), t.end()); - } // Using node-specific info, determine beneficial nodes to use - std::map cost; - from.cost(cost, true); - to.cost(cost, false); - - // Should any of the nodes be removed from the cost matrix, because they don't support - // the required attributes? - // Also remove any movers that are not up - - for (auto it = cost.begin(); it != cost.end(); /* no increment */) { - if (ClusterNodes::available("mover", it->first)) { - if (!ClusterNodes::lookUp("mover", it->first).supportsAttributes(moverAttributes)) { - cost.erase(it++); - } - else { - ++it; - } - } - else { - cost.erase(it++); - } - } - - if (cost.empty()) { - NodeInfo info = ClusterNodes::any("mover", moverAttributes); - cost[info.node()] = 0; - send_costs = false; - // throw SeriousBug(std::string("No cost for ") + from.title() + " => " + to.title()); - } + MoverTransferSelection cost; + from.selectMover(cost, true); + to.selectMover(cost, false); Log::info() << "MoverTransfer::transfer(" << from << "," << to << ")" << std::endl; - Log::info() << "MoverTransfer::transfer cost:" << std::endl; - for (std::map::iterator j = cost.begin(); j != cost.end(); ++j) { - Log::info() << " " << (*j).first << " => " << Bytes((*j).second) << std::endl; - } - net::Connector& c(net::Connector::service("mover", cost)); + net::Connector& c(net::Connector::service(cost.selectedMover())); AutoLock lock(c); // This will close the connector on unlock c.autoclose(true); @@ -149,17 +103,6 @@ Length MoverTransfer::transfer(DataHandle& from, DataHandle& to) { Metrics::receive(s); - Metrics::set("mover_node", c.node()); - if (send_costs) { - for (auto j = cost.begin(); j != cost.end(); ++j) { - std::string h = (*j).first; - unsigned long long l = (*j).second; - Metrics::set("mover_costs." + h, l); - } - } - // Metrics::set("mover_metric", prefix_); - // // ASSERT(len == total); - Log::message() << " " << std::endl; diff --git a/src/eckit/io/MoverTransferSelection.cc b/src/eckit/io/MoverTransferSelection.cc new file mode 100644 index 000000000..798753097 --- /dev/null +++ b/src/eckit/io/MoverTransferSelection.cc @@ -0,0 +1,133 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + + +#include "eckit/io/MoverTransferSelection.h" +#include "eckit/io/cluster/ClusterNodes.h" +#include "eckit/exception/Exceptions.h" + +#include "eckit/log/Bytes.h" +#include "eckit/log/Log.h" + +#include "eckit/runtime/Metrics.h" + + + +namespace eckit { + + +MoverTransferSelection::MoverTransferSelection() {} +MoverTransferSelection::~MoverTransferSelection(){} + + +void MoverTransferSelection::updateCost(const std::string& name, const eckit::Length& length) { + cost_[name] += length; +} + + +void MoverTransferSelection::updateCost(const eckit::NodeInfo& node, const eckit::Length& length) { + updateCost(node.name(), length); +} + +void MoverTransferSelection::requiredMoverAttributes(const std::set& attrs) { + moverAttributes_.insert(attrs.begin(), attrs.end()); +} + +void MoverTransferSelection::selectedMover(NodeInfo& result, bool& metrics) { + + metrics = false; + + if (preferredMover_.length() ) { + if (ClusterNodes::available("mover", preferredMover_)) { + NodeInfo preferred = ClusterNodes::lookUp("mover", preferredMover_); + if (preferred.supportsAttributes(moverAttributes_)) { + Log::info() << "Using preferred mover " << preferredMover_ << std::endl; + result = preferred; + return; + } + Log::warning() << "Preferred mover " + << preferredMover_ + << " does not support mover attributes: " + << moverAttributes_ << std::endl; + } + else { + Log::warning() << "Preferred mover " << preferredMover_ + << " is not available" << std::endl; + } + } + + std::map cost; + + + for (const auto& c : cost_) { + if (ClusterNodes::available("mover", c.first) && + ClusterNodes::lookUp("mover", c.first).supportsAttributes(moverAttributes_)) { + cost.insert(c); + } + } + + if (cost.empty()) { + result = ClusterNodes::any("mover", moverAttributes_); + return; + } + + + Log::info() << "MoverTransfer::transfer cost:" << std::endl; + for (std::map::iterator j = cost.begin(); j != cost.end(); ++j) { + Log::info() << " " << (*j).first << " => " << Bytes((*j).second) << std::endl; + } + + std::string which; + Length best = 0; + + for (const auto& c : cost) { + if(c.second >= best) { + best = c.second; + which = c.first; + } + } + + ASSERT(which != ""); + + + metrics = true; + result = ClusterNodes::lookUp("mover", which); +} + +NodeInfo MoverTransferSelection::selectedMover() { + + NodeInfo result; + bool metrics = false; + selectedMover(result, metrics); + + Metrics::set("mover_node", result.node()); + if (metrics) { + for (auto j = cost_.begin(); j != cost_.end(); ++j) { + std::string h = (*j).first; + unsigned long long l = (*j).second; + Metrics::set("mover_costs." + h, l); + } + } + + return result; +} + +void MoverTransferSelection::preferredMover(const NodeInfo& node) { + preferredMover(node.name()); +} + +void MoverTransferSelection::preferredMover(const std::string& name) { + preferredMover_ = name; +} + + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit diff --git a/src/eckit/io/MoverTransferSelection.h b/src/eckit/io/MoverTransferSelection.h new file mode 100644 index 000000000..fc340b099 --- /dev/null +++ b/src/eckit/io/MoverTransferSelection.h @@ -0,0 +1,112 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +// File MoverTransferSelection.h +// Baudouin Raoult - (c) ECMWF Jun 23 + + +#ifndef eckit_MoverTransferSelection_h +#define eckit_MoverTransferSelection_h + +#include +#include +#include + + +#include "eckit/io/Length.h" + +//----------------------------------------------------------------------------- + +namespace eckit { + +class NodeInfo; + +//----------------------------------------------------------------------------- + + +class MoverTransferSelection { +public: + + MoverTransferSelection(); + ~MoverTransferSelection(); + + // -- Methods + + void updateCost(const NodeInfo&, const Length& length); + void updateCost(const std::string&, const Length& length); + + void requiredMoverAttributes(const std::set& attrs); + + void preferredMover(const NodeInfo&); + void preferredMover(const std::string&); + + NodeInfo selectedMover(); + + // -- Overridden methods + // None + + // -- Class members + // None + + // -- Class methods + // None + +protected: + // -- Members + // None + + // -- Methods + + // void print(std::ostream&) const; + + // -- Overridden methods + // None + + // -- Class members + // None + + // -- Class methods + // None + +private: + // No copy allowed + + MoverTransferSelection(const MoverTransferSelection&); + MoverTransferSelection& operator=(const MoverTransferSelection&); + + // -- Members + std::set moverAttributes_; + std::map cost_; + std::string preferredMover_; + + // -- Methods + void selectedMover(NodeInfo&, bool&); + + // -- Overridden methods + // None + + // -- Class members + // None + + // -- Class methods + // None + + // -- Friends + + // friend std::ostream& operator<<(std::ostream& s,const MoverTransfer& p) + // { p.print(s); return s; } +}; + + +//----------------------------------------------------------------------------- + +} // namespace eckit + +#endif diff --git a/src/eckit/io/MultiHandle.cc b/src/eckit/io/MultiHandle.cc index c194a2dbc..659fe5396 100644 --- a/src/eckit/io/MultiHandle.cc +++ b/src/eckit/io/MultiHandle.cc @@ -412,9 +412,9 @@ DataHandle* MultiHandle::toLocal() { return this; } -void MultiHandle::cost(std::map& c, bool read) const { +void MultiHandle::selectMover(MoverTransferSelection& c, bool read) const { for (size_t i = 0; i < datahandles_.size(); i++) { - datahandles_[i]->cost(c, read); + datahandles_[i]->selectMover(c, read); } } @@ -427,16 +427,6 @@ bool MultiHandle::moveable() const { return datahandles_.size() > 0; } -const std::set& MultiHandle::requiredMoverAttributes() const { - if (requiredAttributes_.empty()) { - for (const auto& dh : datahandles_) { - auto&& attrs = dh->requiredMoverAttributes(); - requiredAttributes_.insert(attrs.begin(), attrs.end()); - } - } - return requiredAttributes_; -} - std::string MultiHandle::title() const { std::ostringstream os; os << "["; diff --git a/src/eckit/io/MultiHandle.h b/src/eckit/io/MultiHandle.h index 09f935e98..7f17c2a88 100644 --- a/src/eckit/io/MultiHandle.h +++ b/src/eckit/io/MultiHandle.h @@ -72,10 +72,10 @@ class MultiHandle : public DataHandle { void toRemote(Stream&) const override; void toLocal(Stream&) const override; DataHandle* toLocal() override; - void cost(std::map&, bool) const override; + void selectMover(MoverTransferSelection&, bool read) const override; + std::string title() const override; bool moveable() const override; - const std::set& requiredMoverAttributes() const override; DataHandle* clone() const override; void collectMetrics(const std::string& what) const override; // From Streamable diff --git a/src/eckit/io/PartFileHandle.cc b/src/eckit/io/PartFileHandle.cc index 49ab7c2fb..5c29c02ae 100644 --- a/src/eckit/io/PartFileHandle.cc +++ b/src/eckit/io/PartFileHandle.cc @@ -18,6 +18,7 @@ #include "eckit/io/PartFileHandle.h" #include "eckit/io/PooledHandle.h" +#include "eckit/io/MoverTransferSelection.h" namespace eckit { @@ -268,9 +269,9 @@ Length PartFileHandle::estimate() { return std::accumulate(length_.begin(), length_.end(), Length(0)); } -void PartFileHandle::cost(std::map& c, bool read) const { +void PartFileHandle::selectMover(MoverTransferSelection& c, bool read) const { if (read) { - c[path_.node()] += const_cast(this)->estimate(); + c.updateCost(path_.node(), const_cast(this)->estimate()); } } diff --git a/src/eckit/io/PartFileHandle.h b/src/eckit/io/PartFileHandle.h index 7709d8a89..13f6becab 100644 --- a/src/eckit/io/PartFileHandle.h +++ b/src/eckit/io/PartFileHandle.h @@ -63,7 +63,8 @@ class PartFileHandle : public DataHandle { Offset seek(const Offset&) override; bool canSeek() const override; - void cost(std::map&, bool) const override; + void selectMover(MoverTransferSelection&, bool read) const override; + std::string title() const override; std::string metricsTag() const override; diff --git a/src/eckit/io/SharedHandle.cc b/src/eckit/io/SharedHandle.cc index 01864ddf0..e00af37f8 100644 --- a/src/eckit/io/SharedHandle.cc +++ b/src/eckit/io/SharedHandle.cc @@ -142,7 +142,7 @@ void SharedHandle::toRemote(Stream& s) const { NOTIMP; } -void SharedHandle::cost(std::map&, bool) const { +void SharedHandle::selectMover(MoverTransferSelection&, bool) const { NOTIMP; } diff --git a/src/eckit/io/SharedHandle.h b/src/eckit/io/SharedHandle.h index 083ac91b8..e21b0d759 100644 --- a/src/eckit/io/SharedHandle.h +++ b/src/eckit/io/SharedHandle.h @@ -82,7 +82,8 @@ class SharedHandle : public DataHandle { DataHandle* toLocal() override; void toRemote(Stream& s) const override; - void cost(std::map&, bool) const override; + void selectMover(MoverTransferSelection&, bool read) const override; + std::string title() const override; void collectMetrics(const std::string& what) const override; // Tag for metrics collection diff --git a/src/eckit/io/StatsHandle.cc b/src/eckit/io/StatsHandle.cc index 74012c8d6..124513020 100644 --- a/src/eckit/io/StatsHandle.cc +++ b/src/eckit/io/StatsHandle.cc @@ -189,11 +189,6 @@ bool StatsHandle::moveable() const { return false; } -const std::set& StatsHandle::requiredMoverAttributes() const { - // Requires moveable(); - NOTIMP; -} - void StatsHandle::toLocal(Stream& s) const { NOTIMP; } @@ -206,7 +201,7 @@ void StatsHandle::toRemote(Stream& s) const { NOTIMP; } -void StatsHandle::cost(std::map&, bool) const { +void StatsHandle::selectMover(MoverTransferSelection&, bool) const { NOTIMP; } diff --git a/src/eckit/io/StatsHandle.h b/src/eckit/io/StatsHandle.h index e9638a05b..435082d88 100644 --- a/src/eckit/io/StatsHandle.h +++ b/src/eckit/io/StatsHandle.h @@ -78,13 +78,14 @@ class StatsHandle : public DataHandle, public HandleHolder { bool moveable() const override; - const std::set& requiredMoverAttributes() const override; + void toLocal(Stream& s) const override; DataHandle* toLocal() override; void toRemote(Stream& s) const override; - void cost(std::map&, bool) const override; + void selectMover(MoverTransferSelection&, bool read) const override; + std::string title() const override; void collectMetrics(const std::string& what) const override; // Tag for metrics collection diff --git a/src/eckit/io/TeeHandle.cc b/src/eckit/io/TeeHandle.cc index f7851e8eb..c55ff3b20 100644 --- a/src/eckit/io/TeeHandle.cc +++ b/src/eckit/io/TeeHandle.cc @@ -163,9 +163,9 @@ DataHandle* TeeHandle::toLocal() { return this; } -void TeeHandle::cost(std::map& c, bool read) const { +void TeeHandle::selectMover(MoverTransferSelection& c, bool read) const { for (size_t i = 0; i < datahandles_.size(); i++) { - datahandles_[i]->cost(c, read); + datahandles_[i]->selectMover(c, read); } } @@ -178,17 +178,6 @@ bool TeeHandle::moveable() const { return true; } -const std::set& TeeHandle::requiredMoverAttributes() const { - if (requiredAttributes_.empty()) { - for (const auto& dh : datahandles_) { - auto&& attrs = dh->requiredMoverAttributes(); - requiredAttributes_.insert(attrs.begin(), attrs.end()); - } - } - return requiredAttributes_; -} - - //---------------------------------------------------------------------------------------------------------------------- } // namespace eckit diff --git a/src/eckit/io/TeeHandle.h b/src/eckit/io/TeeHandle.h index 991989674..2ba0c5c2c 100644 --- a/src/eckit/io/TeeHandle.h +++ b/src/eckit/io/TeeHandle.h @@ -61,9 +61,10 @@ class TeeHandle : public DataHandle { void toRemote(Stream&) const override; void toLocal(Stream&) const override; DataHandle* toLocal() override; - void cost(std::map&, bool) const override; + void selectMover(MoverTransferSelection&, bool read) const override; + bool moveable() const override; - const std::set& requiredMoverAttributes() const override; + // From Streamable diff --git a/src/eckit/io/cluster/ClusterNodes.cc b/src/eckit/io/cluster/ClusterNodes.cc index 73a6ea5be..ca188454e 100644 --- a/src/eckit/io/cluster/ClusterNodes.cc +++ b/src/eckit/io/cluster/ClusterNodes.cc @@ -58,7 +58,7 @@ class ClusterNodeEntry { public: ClusterNodeEntry(const NodeInfo& info) : - ClusterNodeEntry(info.node(), info.name(), info.host(), info.port(), info.attributes()) {} + ClusterNodeEntry(info.node(), info.name(), info.host(), info.port(), info.attributes()) {} NodeInfo asNodeInfo() const { NodeInfo info; @@ -484,6 +484,20 @@ void ClusterNodes::receive(Stream& s) { } +bool ClusterNodes::lookUpHost(const std::string& type, const std::string& host, NodeInfo& result) { + pthread_once(&once, init); + AutoLock lock(*nodeArray); + + for (const auto& k : *nodeArray) { + if (k.active() && type == k.type() && host == k.host()) { + result = k.asNodeInfo(); + return true; + } + } + + return false; +} + //---------------------------------------------------------------------------------------------------------------------- } // namespace eckit diff --git a/src/eckit/io/cluster/ClusterNodes.h b/src/eckit/io/cluster/ClusterNodes.h index 317b9d3ea..1d1655f20 100644 --- a/src/eckit/io/cluster/ClusterNodes.h +++ b/src/eckit/io/cluster/ClusterNodes.h @@ -55,6 +55,8 @@ class ClusterNodes { static void onLine(const std::string&, int); static std::vector all(); + + static bool lookUpHost(const std::string& type, const std::string& host, NodeInfo& ); }; diff --git a/src/eckit/net/Connector.cc b/src/eckit/net/Connector.cc index 2fa017597..bdfce12f6 100644 --- a/src/eckit/net/Connector.cc +++ b/src/eckit/net/Connector.cc @@ -214,40 +214,8 @@ Connector& Connector::service(const std::string& name, const std::string& node) return get(info.host(), info.port(), info.node()); } -Connector& Connector::service(const std::string& name, const std::map& cost, - const std::set& attributes) { - std::string host; - std::string node; - - int port = 0; - Length best = 0; - - for (std::map::const_iterator j = cost.begin(); j != cost.end(); ++j) { - if ((*j).second > best || port == 0) { - best = (*j).second; - if (ClusterNodes::available(name, (*j).first)) { - NodeInfo info = ClusterNodes::lookUp(name, (*j).first); - host = info.host(); - port = info.port(); - node = info.node(); - } - else { - Log::warning() << "Service not available: " << name << "@" << (*j).first << std::endl; - } - } - } - - if (!port) { - NodeInfo info = ClusterNodes::any(name, attributes); - host = info.host(); - port = info.port(); - node = info.node(); - Log::warning() << "Using node: " << info << std::endl; - } - - ASSERT(port); - - return get(host, port, node); +Connector& Connector::service(const NodeInfo& info) { + return get(info.host(), info.port(), info.node()); } void Connector::lock() { diff --git a/src/eckit/net/Connector.h b/src/eckit/net/Connector.h index 341718f56..c0da694c9 100644 --- a/src/eckit/net/Connector.h +++ b/src/eckit/net/Connector.h @@ -58,9 +58,8 @@ class Connector : public Stream { void memoize(bool on, unsigned long time); + static Connector& service(const NodeInfo& node); static Connector& service(const std::string& name, const std::string& node); - static Connector& service(const std::string& name, const std::map& cost, - const std::set& attributes = {}); static NodeInfo nodeInfo(const std::string& name, const std::string& node);