Skip to content

Commit

Permalink
Mover selection
Browse files Browse the repository at this point in the history
  • Loading branch information
b8raoult committed Nov 15, 2023
1 parent c871e6a commit 47d3df0
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 3 deletions.
5 changes: 3 additions & 2 deletions src/eckit/io/MoverTransferSelection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void MoverTransferSelection::updateCost(const std::string& name, const eckit::Le


void MoverTransferSelection::updateCost(const eckit::NodeInfo& node, const eckit::Length& length) {
updateCost(node.name(), length);
updateCost(node.node(), length);
}

void MoverTransferSelection::requiredMoverAttributes(const std::set<std::string>& attrs) {
Expand Down Expand Up @@ -120,10 +120,11 @@ NodeInfo MoverTransferSelection::selectedMover() {
}

void MoverTransferSelection::preferredMover(const NodeInfo& node) {
preferredMover(node.name());
preferredMover(node.node());
}

void MoverTransferSelection::preferredMover(const std::string& name) {
Log::info() << "MoverTransferSelection::preferredMover " << name << std::endl;
preferredMover_ = name;
}

Expand Down
12 changes: 12 additions & 0 deletions src/eckit/io/TCPHandle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/

#include "eckit/io/TCPHandle.h"
#include "eckit/io/MoverTransferSelection.h"
#include "eckit/io/cluster/ClusterNodes.h"

//----------------------------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -84,6 +86,16 @@ std::string TCPHandle::title() const {
return os.str();
}

void TCPHandle::selectMover(eckit::MoverTransferSelection& selection, bool read) const {
// If we use a callback server that is collocated on a mover node
// we want to use that mover
NodeInfo node;
if (ClusterNodes::lookUpHost("mover", host_, node)) {
selection.preferredMover(node);
}
}


//----------------------------------------------------------------------------------------------------------------------

} // namespace eckit
2 changes: 2 additions & 0 deletions src/eckit/io/TCPHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class TCPHandle : public DataHandle {

bool canSeek() const override { return false; }

virtual void selectMover(eckit::MoverTransferSelection&, bool) const override;

// From Streamable

void encode(Stream&) const override;
Expand Down
5 changes: 4 additions & 1 deletion src/eckit/io/cluster/ClusterNodes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "eckit/memory/Zero.h"
#include "eckit/thread/AutoLock.h"
#include "eckit/utils/Clock.h"
#include "eckit/net/IPAddress.h"

namespace eckit {

Expand Down Expand Up @@ -488,8 +489,10 @@ bool ClusterNodes::lookUpHost(const std::string& type, const std::string& host,
pthread_once(&once, init);
AutoLock<NodeArray> lock(*nodeArray);

auto ip = eckit::net::IPAddress::hostAddress(host);

for (const auto& k : *nodeArray) {
if (k.active() && type == k.type() && host == k.host()) {
if (k.active() && type == k.type() && ip == eckit::net::IPAddress::hostAddress(k.host())) {
result = k.asNodeInfo();
return true;
}
Expand Down
6 changes: 6 additions & 0 deletions src/eckit/net/IPAddress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <sys/socket.h>
#include <unistd.h>
#include <ostream>
#include <string.h>

namespace eckit::net {

Expand Down Expand Up @@ -45,6 +46,11 @@ std::string IPAddress::asString() const {
return inet_ntoa(address_);
}

bool IPAddress::operator==(const IPAddress& other) const {
return ::memcmp(&address_, &other.address_, sizeof(address_)) == 0;
}


IPAddress IPAddress::hostAddress(const std::string& hostname) {
struct hostent* hostEntry = gethostbyname(hostname.c_str());
ASSERT(hostEntry);
Expand Down
2 changes: 2 additions & 0 deletions src/eckit/net/IPAddress.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class IPAddress {
static IPAddress myIPAddress();
static IPAddress hostAddress(const std::string& hostname);

bool operator==(const IPAddress& other) const;

private:
// Members

Expand Down

0 comments on commit 47d3df0

Please sign in to comment.