Skip to content

Commit

Permalink
Mover selection
Browse files Browse the repository at this point in the history
  • Loading branch information
b8raoult committed Nov 14, 2023
1 parent 1504655 commit bc7478d
Show file tree
Hide file tree
Showing 22 changed files with 307 additions and 153 deletions.
2 changes: 2 additions & 0 deletions src/eckit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ io/MemoryHandle.cc
io/MemoryHandle.h
io/MoverTransfer.cc
io/MoverTransfer.h
io/MoverTransferSelection.cc
io/MoverTransferSelection.h
io/MultiHandle.cc
io/MultiHandle.h
io/Offset.cc
Expand Down
9 changes: 3 additions & 6 deletions src/eckit/io/DataHandle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,6 @@ void DataHandle::restartWriteFrom(const Offset& offset) {
throw NotImplemented(os.str(), Here());
}

const std::set<std::string>& DataHandle::requiredMoverAttributes() const {
static std::set<std::string> nullSet;
return nullSet;
}

void DataHandle::toLocal(Stream& s) const {
s << *this;
}
Expand All @@ -417,7 +412,9 @@ void DataHandle::toRemote(Stream& s) const {
s << *this;
}

void DataHandle::cost(std::map<std::string, Length>& c, bool read) const {}
void DataHandle::selectMover(MoverTransferSelection&, bool) const {

}

DataHandle* DataHandle::clone() const {
std::ostringstream os;
Expand Down
7 changes: 5 additions & 2 deletions src/eckit/io/DataHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace eckit {

class MD5;
class Metrics;
class MoverTransferSelection;

//----------------------------------------------------------------------------------------------------------------------

Expand All @@ -40,6 +41,7 @@ class RestartTransfer {
const Offset& from() const { return from_; }
};


//----------------------------------------------------------------------------------------------------------------------

class DataHandle : public Streamable {
Expand Down Expand Up @@ -125,13 +127,14 @@ class DataHandle : public Streamable {
// For remote data movers

virtual bool moveable() const { return false; }
virtual const std::set<std::string>& requiredMoverAttributes() const;
virtual void toLocal(Stream& s) const;

virtual DataHandle* toLocal();

virtual void toRemote(Stream& s) const;
virtual void cost(std::map<std::string, Length>&, bool) const;

virtual void selectMover(MoverTransferSelection&, bool read) const;

virtual std::string title() const;
virtual std::string metricsTag() const;
virtual void collectMetrics(const std::string& what) const; // Tag for metrics collection
Expand Down
7 changes: 4 additions & 3 deletions src/eckit/io/FileHandle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "eckit/log/Log.h"
#include "eckit/os/Stat.h"
#include "eckit/utils/MD5.h"
#include "eckit/io/MoverTransferSelection.h"


namespace eckit {
Expand Down Expand Up @@ -278,13 +279,13 @@ void FileHandle::toRemote(Stream& s) const {
s << *remote;
}

void FileHandle::cost(std::map<std::string, Length>& c, bool read) const {
void FileHandle::selectMover(MoverTransferSelection& c, bool read) const {
if (read) {
c[NodeInfo::thisNode().node()] += const_cast<FileHandle*>(this)->estimate();
c.updateCost(NodeInfo::thisNode(), const_cast<FileHandle*>(this)->estimate());
}
else {
// Just mark the node as being a candidate
c[NodeInfo::thisNode().node()] += 0;
c.updateCost(NodeInfo::thisNode(), 0);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/eckit/io/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class FileHandle : public DataHandle {
void restartReadFrom(const Offset& from) override;
void restartWriteFrom(const Offset& from) override;
void toRemote(Stream&) const override;
void cost(std::map<std::string, Length>&, bool) const override;
void selectMover(MoverTransferSelection&, bool read) const override;

std::string title() const override;
std::string metricsTag() const override;

Expand Down
67 changes: 5 additions & 62 deletions src/eckit/io/MoverTransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,13 @@
* does it submit to any jurisdiction.
*/


#include "eckit/io/MoverTransfer.h"
#include "eckit/io/cluster/ClusterNodes.h"
#include "eckit/io/cluster/NodeInfo.h"
#include "eckit/log/Bytes.h"
#include "eckit/log/Progress.h"
#include "eckit/net/Connector.h"
#include "eckit/net/TCPServer.h"
#include "eckit/net/TCPStream.h"
#include "eckit/runtime/Metrics.h"
#include "eckit/runtime/Monitor.h"
#include "eckit/thread/AutoLock.h"
#include "eckit/thread/Thread.h"
#include "eckit/thread/ThreadControler.h"
#include "eckit/value/Value.h"
#include "eckit/io/MoverTransferSelection.h"


namespace eckit {
Expand All @@ -45,55 +37,17 @@ Length MoverTransfer::transfer(DataHandle& from, DataHandle& to) {
throw SeriousBug(to.title() + " is not moveable");
}

// Attributes that are required from the mover

std::set<std::string> moverAttributes;
{
auto&& f = from.requiredMoverAttributes();
moverAttributes.insert(f.begin(), f.end());
auto&& t = to.requiredMoverAttributes();
moverAttributes.insert(t.begin(), t.end());
}

// Using node-specific info, determine beneficial nodes to use

std::map<std::string, Length> cost;
from.cost(cost, true);
to.cost(cost, false);

// Should any of the nodes be removed from the cost matrix, because they don't support
// the required attributes?
// Also remove any movers that are not up

for (auto it = cost.begin(); it != cost.end(); /* no increment */) {
if (ClusterNodes::available("mover", it->first)) {
if (!ClusterNodes::lookUp("mover", it->first).supportsAttributes(moverAttributes)) {
cost.erase(it++);
}
else {
++it;
}
}
else {
cost.erase(it++);
}
}

if (cost.empty()) {
NodeInfo info = ClusterNodes::any("mover", moverAttributes);
cost[info.node()] = 0;
send_costs = false;
// throw SeriousBug(std::string("No cost for ") + from.title() + " => " + to.title());
}
MoverTransferSelection cost;
from.selectMover(cost, true);
to.selectMover(cost, false);

Log::info() << "MoverTransfer::transfer(" << from << "," << to << ")" << std::endl;

Log::info() << "MoverTransfer::transfer cost:" << std::endl;
for (std::map<std::string, Length>::iterator j = cost.begin(); j != cost.end(); ++j) {
Log::info() << " " << (*j).first << " => " << Bytes((*j).second) << std::endl;
}

net::Connector& c(net::Connector::service("mover", cost));
net::Connector& c(net::Connector::service(cost.selectedMover()));
AutoLock<net::Connector> lock(c);
// This will close the connector on unlock
c.autoclose(true);
Expand Down Expand Up @@ -149,17 +103,6 @@ Length MoverTransfer::transfer(DataHandle& from, DataHandle& to) {
Metrics::receive(s);


Metrics::set("mover_node", c.node());
if (send_costs) {
for (auto j = cost.begin(); j != cost.end(); ++j) {
std::string h = (*j).first;
unsigned long long l = (*j).second;
Metrics::set("mover_costs." + h, l);
}
}
// Metrics::set("mover_metric", prefix_);
// // ASSERT(len == total);

Log::message() << " " << std::endl;


Expand Down
133 changes: 133 additions & 0 deletions src/eckit/io/MoverTransferSelection.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/


#include "eckit/io/MoverTransferSelection.h"
#include "eckit/io/cluster/ClusterNodes.h"
#include "eckit/exception/Exceptions.h"

#include "eckit/log/Bytes.h"
#include "eckit/log/Log.h"

#include "eckit/runtime/Metrics.h"



namespace eckit {


MoverTransferSelection::MoverTransferSelection() {}
MoverTransferSelection::~MoverTransferSelection(){}


void MoverTransferSelection::updateCost(const std::string& name, const eckit::Length& length) {
cost_[name] += length;
}


void MoverTransferSelection::updateCost(const eckit::NodeInfo& node, const eckit::Length& length) {
updateCost(node.name(), length);
}

void MoverTransferSelection::requiredMoverAttributes(const std::set<std::string>& attrs) {
moverAttributes_.insert(attrs.begin(), attrs.end());
}

void MoverTransferSelection::selectedMover(NodeInfo& result, bool& metrics) {

metrics = false;

if (preferredMover_.length() ) {
if (ClusterNodes::available("mover", preferredMover_)) {
NodeInfo preferred = ClusterNodes::lookUp("mover", preferredMover_);
if (preferred.supportsAttributes(moverAttributes_)) {
Log::info() << "Using preferred mover " << preferredMover_ << std::endl;
result = preferred;
return;
}
Log::warning() << "Preferred mover "
<< preferredMover_
<< " does not support mover attributes: "
<< moverAttributes_ << std::endl;
}
else {
Log::warning() << "Preferred mover " << preferredMover_
<< " is not available" << std::endl;
}
}

std::map<std::string, Length> cost;


for (const auto& c : cost_) {
if (ClusterNodes::available("mover", c.first) &&
ClusterNodes::lookUp("mover", c.first).supportsAttributes(moverAttributes_)) {
cost.insert(c);
}
}

if (cost.empty()) {
result = ClusterNodes::any("mover", moverAttributes_);
return;
}


Log::info() << "MoverTransfer::transfer cost:" << std::endl;
for (std::map<std::string, Length>::iterator j = cost.begin(); j != cost.end(); ++j) {
Log::info() << " " << (*j).first << " => " << Bytes((*j).second) << std::endl;
}

std::string which;
Length best = 0;

for (const auto& c : cost) {
if(c.second >= best) {
best = c.second;
which = c.first;
}
}

ASSERT(which != "");


metrics = true;
result = ClusterNodes::lookUp("mover", which);
}

NodeInfo MoverTransferSelection::selectedMover() {

NodeInfo result;
bool metrics = false;
selectedMover(result, metrics);

Metrics::set("mover_node", result.node());
if (metrics) {
for (auto j = cost_.begin(); j != cost_.end(); ++j) {
std::string h = (*j).first;
unsigned long long l = (*j).second;
Metrics::set("mover_costs." + h, l);
}
}

return result;
}

void MoverTransferSelection::preferredMover(const NodeInfo& node) {
preferredMover(node.name());
}

void MoverTransferSelection::preferredMover(const std::string& name) {
preferredMover_ = name;
}


//----------------------------------------------------------------------------------------------------------------------

} // namespace eckit
Loading

0 comments on commit bc7478d

Please sign in to comment.