diff --git a/VERSION b/VERSION index ae96cc731..2f4320f67 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.24.3 +1.24.4 diff --git a/src/eckit/CMakeLists.txt b/src/eckit/CMakeLists.txt index 3eaf8a10c..099080958 100644 --- a/src/eckit/CMakeLists.txt +++ b/src/eckit/CMakeLists.txt @@ -660,6 +660,8 @@ serialisation/Streamable.cc serialisation/Streamable.h ) + + list( APPEND eckit_persist_srcs persist/Bless.h persist/DumpLoad.cc @@ -965,6 +967,7 @@ if( HAVE_ECKIT_SQL ) add_subdirectory( sql ) endif() +add_subdirectory( distributed ) add_subdirectory( geometry ) add_subdirectory( linalg ) add_subdirectory( maths ) diff --git a/src/eckit/container/BTree.cc b/src/eckit/container/BTree.cc index e0ae1bd3e..33de294cd 100644 --- a/src/eckit/container/BTree.cc +++ b/src/eckit/container/BTree.cc @@ -652,6 +652,7 @@ size_t BTree::count(unsigned long page) const { return c; } + template void BTree::lockShared() { L::lockRange(file_.fileno(), 0, 0, F_SETLKW, F_RDLCK); diff --git a/src/eckit/distributed/Actor.cc b/src/eckit/distributed/Actor.cc new file mode 100644 index 000000000..62407e8c3 --- /dev/null +++ b/src/eckit/distributed/Actor.cc @@ -0,0 +1,88 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + + +#include "eckit/distributed/Actor.h" +#include "eckit/distributed/Transport.h" + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +Actor::~Actor() { +} + +Actor::Actor(Transport &transport): + transport_(transport), + title_(transport.title()) { +} + +void Actor::sendMessageToNextWorker(const Message &msg) const { + transport_.sendMessageToNextWorker(msg); +} + +void Actor::sendStatisticsToProducer(const Message &msg) const { + transport_.sendStatisticsToProducer(msg); +} + +void Actor::getNextWorkMessage(Message &message) const { + transport_.getNextWorkMessage(message); +} + +void Actor::getNextWriteMessage(Message &message) const { + transport_.getNextWriteMessage(message); +} + +void Actor::sendToWriter(int writer, const Message &message) const { + transport_.sendToWriter(writer, message); +} + +void Actor::sendShutDownMessage() const { + transport_.sendShutDownMessage(*this); +} + +void Actor::messageFromWorker(Message&, int) const { +} + +void Actor::messageFromWriter(Message&, int) const { +} + +const char* Actor::tagName(int tag) { + switch (tag) { + + case READY: + return "READY"; + + case WORK: + return "WORK"; + + case SHUTDOWN: + return "SHUTDOWN"; + + case OPEN: + return "OPEN"; + + case WRITE: + return "WRITE"; + + case CLOSE: + return "CLOSE"; + + case STATISTICS: + return "STATISTICS"; + + default: + return "UNKNOWN"; + } +} +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + diff --git a/src/eckit/distributed/Actor.h b/src/eckit/distributed/Actor.h new file mode 100644 index 000000000..8ad744d6f --- /dev/null +++ b/src/eckit/distributed/Actor.h @@ -0,0 +1,75 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file Actor.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date May 2016 + +#pragma once + +#include + +#include "eckit/memory/NonCopyable.h" + + +namespace eckit::distributed { + +class Transport; +class Message; + +//---------------------------------------------------------------------------------------------------------------------- + +class Actor : private eckit::NonCopyable { +public: + + enum MessageTags { + READY, + WORK, + SHUTDOWN, + OPEN, + WRITE, + CLOSE, + STATISTICS, + BYE + }; + +public: // methods + + Actor(Transport &transport); + virtual ~Actor(); + + virtual void run() = 0; + virtual void finalise() = 0; + + virtual void messageFromWorker(Message &message, int worker) const; + virtual void messageFromWriter(Message &message, int worker) const; + virtual void sendStatisticsToProducer(const Message &message) const; + + virtual void sendMessageToNextWorker(const Message &message) const; + virtual void getNextWorkMessage(Message &message) const; + virtual void getNextWriteMessage(Message &message) const; + virtual void sendToWriter(int writer, const Message &message) const; + + virtual void sendShutDownMessage() const; + + static const char* tagName(int); + +protected: // members + + Transport& transport_; + std::string title_; + +}; + + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit diff --git a/src/eckit/distributed/CMakeLists.txt b/src/eckit/distributed/CMakeLists.txt new file mode 100644 index 000000000..458502c18 --- /dev/null +++ b/src/eckit/distributed/CMakeLists.txt @@ -0,0 +1,41 @@ +list( APPEND eckit_distributed_srcs +Actor.cc +Actor.h +Consumer.cc +Consumer.h +Message.cc +Message.h +NoTransport.cc +NoTransport.h +Producer.cc +Producer.h +Transport.cc +Transport.h +TransportHandle.cc +TransportHandle.h +TransportStatistics.cc +TransportStatistics.h +tcp/TCPTransport.cc +tcp/TCPTransport.h +) + +if( HAVE_MPI ) + + list( APPEND eckit_distributed_srcs + mpi/MPITransport.cc + mpi/MPITransport.h + ) +endif() + +ecbuild_add_library( TARGET eckit_distributed TYPE SHARED + INSTALL_HEADERS ALL + HEADER_DESTINATION + ${INSTALL_INCLUDE_DIR}/eckit/distributed + SOURCES + ${eckit_distributed_srcs} + PUBLIC_LIBS + eckit ) + +if ( HAVE_MPI ) + target_link_libraries( eckit_distributed PUBLIC eckit_mpi ) +endif() diff --git a/src/eckit/distributed/Consumer.cc b/src/eckit/distributed/Consumer.cc new file mode 100644 index 000000000..9e8397db2 --- /dev/null +++ b/src/eckit/distributed/Consumer.cc @@ -0,0 +1,83 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#include + +#include "eckit/log/Log.h" +#include "eckit/log/TimeStamp.h" +#include "eckit/log/ResourceUsage.h" + +#include "eckit/distributed/Consumer.h" +#include "eckit/distributed/Message.h" +#include "eckit/distributed/Transport.h" + +using eckit::Log; + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +Consumer::Consumer(Transport &transport): + Actor(transport) { +} + +void Consumer::run() { + + // eckit::TraceResourceUsage usage("Message::write()"); + // eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " starting " << std::endl; + + Message msg; + + while (true) { + + msg.rewind(); + // eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " begin getNextMessage" << std::endl; + + getNextMessage(msg); + // eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " end getNextMessage" << std::endl; + + if (msg.shutdownRequested()) { + msg.rewind(); + shutdown(msg); + sendStatisticsToProducer(msg); + break; + } + + try { + // eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " begin consume" << std::endl; + + consume(msg); + // eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " end consume" << std::endl; + + } catch (eckit::Exception &e) { + eckit::Log::info() << "Failure: " << e.what() << std::endl; + failure(msg); + // eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " end failure" << std::endl; + + } + } + + // eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " exiting " << std::endl; + transport_.synchronise(); + +} + +void Consumer::shutdown(Message &message) { + message << "OK"; +} + +void Consumer::failure(Message &message) { + +} + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + diff --git a/src/eckit/distributed/Consumer.h b/src/eckit/distributed/Consumer.h new file mode 100644 index 000000000..7f25f7108 --- /dev/null +++ b/src/eckit/distributed/Consumer.h @@ -0,0 +1,47 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file Consumer.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date May 2016 + +#ifndef eckit_Consumer_H +#define eckit_Consumer_H + +#include "eckit/distributed/Actor.h" + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +class Consumer : public Actor { + + virtual void run(); + + virtual void getNextMessage(Message& message) const = 0; + + +public: // methods + + Consumer(Transport &transport); + + + virtual void consume(Message& message) = 0; + virtual void failure(Message& message); + virtual void shutdown(Message& message); + +}; + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + +#endif diff --git a/src/eckit/distributed/Message.cc b/src/eckit/distributed/Message.cc new file mode 100644 index 000000000..8a4e0eef0 --- /dev/null +++ b/src/eckit/distributed/Message.cc @@ -0,0 +1,172 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#include + +#include "eckit/log/Log.h" +#include "eckit/log/Bytes.h" +#include "eckit/log/ResourceUsage.h" +#include "eckit/maths/Functions.h" + +#include "eckit/distributed/Message.h" +#include "eckit/distributed/Actor.h" + +using eckit::Log; +using eckit::Bytes; + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +class ReadyMessage : public Message { +public: + ReadyMessage(): Message(Actor::READY) { + (*this) << "ready"; + } +}; + +const Message &Message::readyMessage() { + static ReadyMessage message; + return message; +} +//---------------------------------------------------------------------------------------------------------------------- + +class ShutdownMessage : public Message { +public: + ShutdownMessage(): Message(Actor::SHUTDOWN) { + (*this) << "shutdown"; + } +}; + +const Message &Message::shutdownMessage() { + static ShutdownMessage message; + return message; +} + +void Message::print(std::ostream& out) const +{ + out << "Message(tag=" << tag_ << ")" << std::endl; +} + +//---------------------------------------------------------------------------------------------------------------------- + +Message::Message(int tag, size_t size) : + tag_(tag), + source_(-1), + buffer_(eckit::round(size, 1024 * 1024)), + position_(0), + blob_(false) { +} + +Message::~Message() { +} + +void Message::rewind() { + position_ = 0; + blob_ = false; +} + +void *Message::messageData() { + return buffer_; +} + +const void *Message::messageData() const { + return buffer_; +} + +size_t Message::bufferSize() const { + return buffer_.size(); +} + +size_t Message::messageSize() const { + return position_; +} + +void Message::messageReceived(int tag, int source) { + position_ = 0; + tag_ = tag; + source_ = source; +} + +bool Message::shutdownRequested() const { + return tag_ == Actor::SHUTDOWN; +} + +int Message::tag() const { + return tag_; +} + +int Message::source() const { + ASSERT(source_ >= 0); + return source_; +} + +long Message::read(void *buffer, long length) { + + ASSERT(!blob_); // We should not decode once we access the blob directly + + size_t left = buffer_.size() - position_; + size_t size = std::min(left, size_t(length)); + ::memcpy(buffer, buffer_ + position_, size); + position_ += size; + return size; +} + +void Message::reserve(size_t size) { + if (buffer_.size() < size) { + buffer_.resize(eckit::round(size, 1024 * 1024)); + } +} + + +long Message::write(const void *buffer, long length) { + + if (position_ + length > buffer_.size()) { + size_t newsize = eckit::round(position_ + length, 1024 * 1024); + buffer_.resize(newsize, true); + Log::debug() << "Message::write() resizing buffer to " << Bytes(newsize) << std::endl; + } + + // ASSERT(!command_.empty()); + + size_t left = buffer_.size() - position_; + size_t size = std::min(left, size_t(length)); + ::memcpy(buffer_ + position_, buffer, size); + + if (size != size_t(length)) { + std::ostringstream oss; + oss << "Attempt to write " + << length + << " bytes on message, could only write " + << size + << ", buffer is " + << buffer_.size(); + throw eckit::SeriousBug(oss.str()); + } + + + position_ += size; + return size; +} + + +const void* Message::getBlob(size_t& size) { + size = blobSize(); // After that we should not read + blob_ = true; + return buffer_ + position_; +} + +std::string Message::name() const { + return "Message"; +} +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + diff --git a/src/eckit/distributed/Message.h b/src/eckit/distributed/Message.h new file mode 100644 index 000000000..551c2f207 --- /dev/null +++ b/src/eckit/distributed/Message.h @@ -0,0 +1,87 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file Message.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date May 2016 + +#ifndef eckit_Message_H +#define eckit_Message_H + +#include + +#include "eckit/serialisation/Stream.h" +#include "eckit/io/Buffer.h" + +#include "eckit/distributed/Actor.h" + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +class Message : public eckit::Stream { + +public: // methods + + Message(int tag = Actor::WORK, size_t size = 1024 * 1024); + ~Message(); + + void rewind(); + bool shutdownRequested() const; + + void *messageData(); + const void *messageData() const; + size_t bufferSize() const; + size_t messageSize() const; + void messageReceived(int, int); + + const void* getBlob(size_t& size); + + void reserve(size_t); + + int tag() const; + int source() const; + + static const Message &readyMessage(); + static const Message &shutdownMessage(); + + friend std::ostream& operator<<(std::ostream &s, const Message &x) { + x.print(s); + return s; + } + +private: // members + + int tag_; + int source_; + + eckit::Buffer buffer_; + + size_t position_; + + bool blob_; + +private: // methods + + // From Stream + virtual std::string name() const; + virtual long write(const void *, long); + virtual long read(void *, long); + + void print( std::ostream &out ) const; + +}; + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + +#endif diff --git a/src/eckit/distributed/NoTransport.cc b/src/eckit/distributed/NoTransport.cc new file mode 100644 index 000000000..4e7020458 --- /dev/null +++ b/src/eckit/distributed/NoTransport.cc @@ -0,0 +1,96 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#include "eckit/log/Log.h" +#include "eckit/log/Seconds.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/os/AutoAlarm.h" + +#include "eckit/distributed/NoTransport.h" +#include "eckit/distributed/Transport.h" +#include "eckit/distributed/Message.h" +#include "eckit/distributed/Actor.h" + +using eckit::Log; + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +NoTransport::~NoTransport() { +} + +NoTransport::NoTransport(const eckit::option::CmdArgs &args): + Transport(args) { + +} + +void NoTransport::initialise() { + +} + +void NoTransport::abort() { + eckit::Log::info() << "NoTransport::abort() called" << std::endl; + ::exit(1); +} + + +void NoTransport::synchronise() { +} + + +bool NoTransport::single() const { + return true; +} + +bool NoTransport::writer() const { + return false; +} + +void NoTransport::sendMessageToNextWorker(const Message &msg) { + NOTIMP; +} + +void NoTransport::getNextWorkMessage(Message &message) { + NOTIMP; +} + +bool NoTransport::producer() const { + NOTIMP; +} + +void NoTransport::print(std::ostream& out) const { + out << "NoTransport[]"; +} + +void NoTransport::sendToWriter(size_t writer, const Message &message) { + NOTIMP; +} + +void NoTransport::getNextWriteMessage(Message &message) { + NOTIMP; +} + + +void NoTransport::sendStatisticsToProducer(const Message &message) { + NOTIMP; +} + +void NoTransport::sendShutDownMessage(const Actor&) { + NOTIMP; +} + +//---------------------------------------------------------------------------------------------------------------------- + +static TransportBuilder builder("none"); + + +} // namespace eckit + diff --git a/src/eckit/distributed/NoTransport.h b/src/eckit/distributed/NoTransport.h new file mode 100644 index 000000000..88ccc051b --- /dev/null +++ b/src/eckit/distributed/NoTransport.h @@ -0,0 +1,70 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file NoTransport.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date May 2016 + +#ifndef eckit_NoTransport_H +#define eckit_NoTransport_H + + +#include "eckit/distributed/Transport.h" + + +namespace eckit::option { +class Option; +class CmdArgs; +} + +namespace eckit::distributed { + +class Message; + +//---------------------------------------------------------------------------------------------------------------------- + +class NoTransport : public Transport { +public: // methods + + NoTransport(const eckit::option::CmdArgs &args); + virtual ~NoTransport() override; + +protected: // methods + + + virtual void sendMessageToNextWorker(const Message &message) override; + virtual void getNextWorkMessage(Message &message) override; + virtual void sendStatisticsToProducer(const Message &message) override; + virtual void sendShutDownMessage(const Actor&) override; + + virtual bool producer() const override; + virtual bool single() const override; + virtual void initialise() override; + virtual void abort() override; + virtual void synchronise() override; + virtual bool writer() const override; + + virtual void sendToWriter(size_t writer, const Message &message) override; + virtual void getNextWriteMessage(Message &message) override; + + void print(std::ostream& out) const override; + + +protected: // members + + +}; + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + +#endif diff --git a/src/eckit/distributed/Producer.cc b/src/eckit/distributed/Producer.cc new file mode 100644 index 000000000..785be3327 --- /dev/null +++ b/src/eckit/distributed/Producer.cc @@ -0,0 +1,69 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#include "eckit/log/Log.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/log/TimeStamp.h" + +#include "eckit/distributed/Producer.h" +#include "eckit/distributed/Message.h" +#include "eckit/distributed/Transport.h" + +using eckit::Log; + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +Producer::Producer(Transport &transport): + Actor(transport) { +} + +void Producer::run() { + + transport_.initialise(); + eckit::Log::info() << "starting " << std::endl; + + { + eckit::Timer timer("Producing messages"); + Message msg; + + while (produce(msg)) { + // eckit::Log::info() + // << " send message " + // << " tag " << msg.tag() + // << " size " << msg.messageSize() + // << std::endl; + + sendMessageToNextWorker(msg); + // eckit::Log::info() << " done message " << std::endl; + + msg.rewind(); + } + } + + eckit::Log::info() << "all messages sent" << std::endl; + + // send finishing messages + { + eckit::Timer timer("Shuting down"); + sendShutDownMessage(); + } + + eckit::Log::info() << "exiting " << std::endl; + + transport_.synchronise(); + +} + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + diff --git a/src/eckit/distributed/Producer.h b/src/eckit/distributed/Producer.h new file mode 100644 index 000000000..a153e7e5c --- /dev/null +++ b/src/eckit/distributed/Producer.h @@ -0,0 +1,42 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file Producer.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date May 2016 + +#ifndef eckit_Producer_H +#define eckit_Producer_H + +#include "eckit/distributed/Actor.h" + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +class Producer : public Actor { + +public: // methods + + Producer(Transport &transport); + + virtual void run(); + + virtual bool produce(Message &message) = 0; + + +}; + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + +#endif diff --git a/src/eckit/distributed/Transport.cc b/src/eckit/distributed/Transport.cc new file mode 100644 index 000000000..077041383 --- /dev/null +++ b/src/eckit/distributed/Transport.cc @@ -0,0 +1,132 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#include "eckit/distributed/Transport.h" + +#include + +#include "eckit/exception/Exceptions.h" +#include "eckit/thread/AutoLock.h" +#include "eckit/thread/Mutex.h" +#include "eckit/option/CmdArgs.h" + + +namespace eckit::distributed { + + + +namespace { + + +static eckit::Mutex *local_mutex = 0; +static std::map *m = 0; + +static pthread_once_t once = PTHREAD_ONCE_INIT; + +static void init() { + local_mutex = new eckit::Mutex(); + m = new std::map(); +} + + +} // (anonymous namespace) + + + +//---------------------------------------------------------------------------------------------------------------------- + +Transport::Transport(const eckit::option::CmdArgs &args): + title_("eckit-run"), + id_("no-id") { + +} + +Transport::~Transport() { + +} + +const std::string& Transport::title() const { + return title_; +} + +const std::string& Transport::id() const { + return id_; +} + +const TransportStatistics& Transport::statistics() const { + return statistics_; +} + +//---------------------------------------------------------------------------------------------------------------------- + + + +TransportFactory::TransportFactory(const std::string &name): + name_(name) { + + pthread_once(&once, init); + + eckit::AutoLock lock(local_mutex); + + ASSERT(m->find(name) == m->end()); + (*m)[name] = this; +} + + +TransportFactory::~TransportFactory() { + eckit::AutoLock lock(local_mutex); + m->erase(name_); + +} + +Transport *TransportFactory::build( const eckit::option::CmdArgs &args) { + + pthread_once(&once, init); + + + std::string name = "none"; + args.get("transport", name); + + // if (!params.get("gridType", name)) { + // throw eckit::SeriousBug("TransportFactory cannot get gridType"); + // } + + eckit::AutoLock lock(local_mutex); + std::map::const_iterator j = m->find(name); + + if (j == m->end()) { + eckit::Log::error() << "No TransportFactory for [" << name << "]" << std::endl; + eckit::Log::error() << "TransportFactories are:" << std::endl; + for (j = m->begin() ; j != m->end() ; ++j) + eckit::Log::error() << " " << (*j).first << std::endl; + throw eckit::SeriousBug(std::string("No TransportFactory called ") + name); + } + + return (*j).second->make(args); +} + + +void TransportFactory::list(std::ostream& out) { + pthread_once(&once, init); + + eckit::AutoLock lock(local_mutex); + + const char* sep = ""; + for (std::map::const_iterator j = m->begin() ; j != m->end() ; ++j) { + out << sep << (*j).first; + sep = ", "; + } +} + + + + +} // namespace eckit + diff --git a/src/eckit/distributed/Transport.h b/src/eckit/distributed/Transport.h new file mode 100644 index 000000000..1d2ebdbe3 --- /dev/null +++ b/src/eckit/distributed/Transport.h @@ -0,0 +1,113 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file Transport.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date May 2016 + +#pragma once + +#include +#include + +#include "eckit/memory/NonCopyable.h" +#include "eckit/distributed/TransportStatistics.h" + + +namespace eckit::option { +class Option; +class CmdArgs; +} + +namespace eckit::distributed { + +class Actor; +class Message; + +//---------------------------------------------------------------------------------------------------------------------- + +class Transport : private eckit::NonCopyable { +public: // methods + + Transport(const eckit::option::CmdArgs &args); + virtual ~Transport(); + + virtual void sendMessageToNextWorker(const Message &message) = 0; + virtual void getNextWorkMessage(Message &message) = 0; + virtual void sendStatisticsToProducer(const Message &message) = 0; + + virtual void sendToWriter(size_t writer, const Message &message) = 0; + virtual void getNextWriteMessage(Message &message) = 0; + + virtual void sendShutDownMessage(const Actor&) = 0; + + virtual void initialise() = 0; + virtual void abort() = 0; + virtual void synchronise() = 0; + + virtual bool producer() const = 0; + virtual bool single() const = 0; + virtual bool writer() const = 0; + + virtual const std::string& title() const; + virtual const std::string& id() const; + + const TransportStatistics& statistics() const; + +protected: + + std::string title_; + std::string id_; + + TransportStatistics statistics_; + +private: // methods + + virtual void print(std::ostream &out) const = 0; + + friend std::ostream &operator<<(std::ostream &s, const Transport &x) { + x.print(s); + return s; + } + +}; + + +class TransportFactory { + std::string name_; + virtual Transport* make(const eckit::option::CmdArgs &args) = 0 ; + +protected: + + TransportFactory(const std::string&); + virtual ~TransportFactory(); + +public: + + static Transport* build(const eckit::option::CmdArgs &args); + static void list(std::ostream &); + +}; + + +template +class TransportBuilder : public TransportFactory { + virtual Transport* make(const eckit::option::CmdArgs &args) override { + return new T(args); + } +public: + TransportBuilder(const std::string& name) : TransportFactory(name) {} +}; + + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit::distributed diff --git a/src/eckit/distributed/TransportHandle.cc b/src/eckit/distributed/TransportHandle.cc new file mode 100644 index 000000000..c1d4ceac3 --- /dev/null +++ b/src/eckit/distributed/TransportHandle.cc @@ -0,0 +1,148 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#include "TransportHandle.h" + +#include "eckit/maths/Functions.h" +#include "eckit/exception/Exceptions.h" + +#include "eckit/distributed/Transport.h" +#include "eckit/distributed/Message.h" + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +TransportHandle::TransportHandle(Transport& transport, + const std::string& path, + size_t writers, + bool exclusive): + transport_(transport), + path_(path), + writer_(0), + opened_(false), + append_(false), + position_(0) { + + // Compute hash to select writer + for (auto j = path.begin(); j != path.end(); ++j) { + writer_ += (*j - 'A') + (writer_ << 5); + } + + writer_ %= writers; + writer_++; + + // eckit::Log::info() << "New " << *this << std::endl; +} + +TransportHandle::~TransportHandle() { + // eckit::Log::info() << "Delete " << *this << std::endl; + + close(); +} + +eckit::Length TransportHandle::openForRead() { + NOTIMP; +} + +void TransportHandle::openForWrite(const eckit::Length& length) { + append_ = false; + + Message message(Actor::OPEN, eckit::round(path_.length() + 64, 1024)); + message << path_; + message << append_; + + transport_.sendToWriter(writer_, message); + opened_ = true; +} + +void TransportHandle::openForAppend(const eckit::Length& length) { + append_ = true; + + Message message(Actor::OPEN, eckit::round(path_.length() + 64, 1024)); + message << path_; + message << append_; + + transport_.sendToWriter(writer_, message); + opened_ = true; +} + +long TransportHandle::read(void* buffer, long length) { + NOTIMP; +} + +long TransportHandle::write(const void* buffer, long length) { + + ASSERT(opened_); + + position_ += length; + send(buffer, length); + return length; +} + +void TransportHandle::send(const void *buffer, size_t length) { + + if (!length) { + return; + } + + Message message(Actor::WRITE, eckit::round(length + path_.length() + 64, 1024 * 1024)); + message << path_; + message << append_; + message.writeBlob(buffer, length); + + transport_.sendToWriter(writer_, message); + +} + +void TransportHandle::close() { + if (opened_) { + flush(); + + + Message message(Actor::CLOSE, eckit::round(path_.length() + 64, 1024)); + message << path_; + message << append_; + + transport_.sendToWriter(writer_, message); + } + opened_ = false; +} + +void TransportHandle::flush() { + +} + +void TransportHandle::rewind() { + NOTIMP; +} + +void TransportHandle::print(std::ostream& s) const { + s << "TransportHandle[transport=" << transport_; + s << ",path=" << path_; + s << ",write=" << writer_; + s << ']'; +} + +eckit::Length TransportHandle::estimate() { + return 0; +} + +eckit::Offset TransportHandle::position() { + return position_; +} + +std::string TransportHandle::title() const { + return std::string("TransportHandle[") + eckit::PathName::shorten(path_) + "]"; +} + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit diff --git a/src/eckit/distributed/TransportHandle.h b/src/eckit/distributed/TransportHandle.h new file mode 100644 index 000000000..e382be8da --- /dev/null +++ b/src/eckit/distributed/TransportHandle.h @@ -0,0 +1,81 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#ifndef eckit_TransportHandle_h +#define eckit_TransportHandle_h + + +#include "eckit/eckit.h" +#include "eckit/io/Buffer.h" +#include "eckit/io/DataHandle.h" + + + +namespace eckit::distributed { + +class Transport; + +//---------------------------------------------------------------------------------------------------------------------- + +class TransportHandle : public eckit::DataHandle { + +public: // methods + + + /// Contructor + + TransportHandle(Transport& transport, + const std::string& path, + size_t writers, + bool exclusive); + + /// Destructor + + virtual ~TransportHandle() override; + + // From DataHandle + + virtual eckit::Length openForRead() override; + virtual void openForWrite(const eckit::Length&) override; + virtual void openForAppend(const eckit::Length&) override; + + virtual long read(void*, long) override; + virtual long write(const void*, long) override; + virtual void close() override; + virtual void flush() override; + virtual void rewind() override; + virtual void print(std::ostream&) const override; + + virtual eckit::Length estimate() override; + virtual eckit::Offset position() override; + +private: // members + + Transport& transport_; + std::string path_; + size_t writer_; + + bool opened_; + bool append_; + size_t position_; + + virtual std::string title() const override; + + void send(const void *buffer, size_t length); + +}; + + +//---------------------------------------------------------------------------------------------------------------------- + + +} // namespace eckit + +#endif diff --git a/src/eckit/distributed/TransportStatistics.cc b/src/eckit/distributed/TransportStatistics.cc new file mode 100644 index 000000000..413527ab8 --- /dev/null +++ b/src/eckit/distributed/TransportStatistics.cc @@ -0,0 +1,122 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +#include "eckit/distributed/TransportStatistics.h" +#include "eckit/serialisation/Stream.h" + + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +TransportStatistics::TransportStatistics(): + sendCount_(0), + receiveCount_(0), + sendSize_(0), + receiveSize_(0) +{ +} + +TransportStatistics::TransportStatistics(eckit::Stream &s) { + + s >> sendCount_; + s >> receiveCount_; + s >> sendSize_; + s >> receiveSize_; + s >> sendTiming_; + s >> receiveTiming_; + s >> barrierTiming_; + s >> shutdownTiming_; +} + +void TransportStatistics::encode(eckit::Stream &s) const { + + s << sendCount_; + s << receiveCount_; + s << sendSize_; + s << receiveSize_; + s << sendTiming_; + s << receiveTiming_; + s << barrierTiming_; + s << shutdownTiming_; +} + +TransportStatistics &TransportStatistics::operator+=(const TransportStatistics &other) { + + sendCount_ += other.sendCount_; + receiveCount_ += other.receiveCount_; + sendSize_ += other.sendSize_; + receiveSize_ += other.receiveSize_; + sendTiming_ += other.sendTiming_; + receiveTiming_ += other.receiveTiming_; + barrierTiming_ += other.barrierTiming_; + shutdownTiming_ += other.shutdownTiming_; + + return *this; +} + +inline static void divide(size_t& x, size_t n) { + x = (x + 0.5) / n; +} + +TransportStatistics &TransportStatistics::operator/=(size_t n) { + + divide(sendCount_, n); + divide(receiveCount_, n); + receiveSize_ /= n; + sendSize_ /= n; + sendTiming_ /= n; + receiveTiming_ /= n; + barrierTiming_ /= n; + shutdownTiming_ /= n; + + return *this; +} + +void TransportStatistics::report(std::ostream &out, const char *indent) const { + reportCount(out, "Transport: messages sent", sendCount_, indent); + reportBytes(out, "Transport: byte sent", sendSize_, indent); + reportTime(out, "Transport: time in send", sendTiming_, indent); + if (sendTiming_.elapsed_) { + reportRate(out, "Transport: send rate", sendSize_ / sendTiming_.elapsed_, indent); + } + + reportCount(out, "Transport: messages received", receiveCount_, indent); + reportBytes(out, "Transport: byte received", receiveSize_, indent); + reportTime(out, "Transport: time in receive", receiveTiming_, indent); + if (receiveTiming_.elapsed_) { + reportRate(out, "Transport: receive rate", receiveSize_ / receiveTiming_.elapsed_, indent); + } + + reportTime(out, "Transport: barrier", barrierTiming_, indent); + reportTime(out, "Transport: shutdown", shutdownTiming_, indent); + +} + +void TransportStatistics::csvHeader(std::ostream& out) const { + out << "sends,sendSize,send,receives,receiveSize,receive,barrier,shutdown"; +} + +void TransportStatistics::csvRow(std::ostream& out) const { + out << sendCount_ << "," + << sendSize_ << "," + << sendTiming_ << "," + << receiveCount_ << "," + << receiveSize_ << "," + << receiveTiming_ << "," + << barrierTiming_ << "," + << shutdownTiming_; +} + +//---------------------------------------------------------------------------------------------------------------------- + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit diff --git a/src/eckit/distributed/TransportStatistics.h b/src/eckit/distributed/TransportStatistics.h new file mode 100644 index 000000000..4bb168c42 --- /dev/null +++ b/src/eckit/distributed/TransportStatistics.h @@ -0,0 +1,72 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file TransportStatistics.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date April 2016 + +#ifndef eckit_TransportStatistics_H +#define eckit_TransportStatistics_H + +#include + +#include "eckit/log/Statistics.h" + +namespace eckit { +class Stream; +} + +namespace eckit::distributed { + +class TocHandler; + +//---------------------------------------------------------------------------------------------------------------------- + +class TransportStatistics : public eckit::Statistics { +public: + TransportStatistics() ; + TransportStatistics(eckit::Stream &) ; + + + size_t sendCount_; + size_t receiveCount_; + + unsigned long long sendSize_; + unsigned long long receiveSize_; + + eckit::Timing sendTiming_; + eckit::Timing receiveTiming_; + eckit::Timing barrierTiming_; + eckit::Timing shutdownTiming_; + + + TransportStatistics &operator+=(const TransportStatistics &other) ; + TransportStatistics &operator/=(size_t) ; + + void report(std::ostream &out, const char *indent = "") const; + + void csvHeader(std::ostream& out) const; + + void csvRow(std::ostream& out) const; + + void encode(eckit::Stream &) const; + + friend eckit::Stream &operator<<(eckit::Stream &s, const TransportStatistics &x) { + x.encode(s); + return s; + } +}; + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + +#endif diff --git a/src/eckit/distributed/mpi/MPITransport.cc b/src/eckit/distributed/mpi/MPITransport.cc new file mode 100644 index 000000000..bf9ebb875 --- /dev/null +++ b/src/eckit/distributed/mpi/MPITransport.cc @@ -0,0 +1,374 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + + +#include "eckit/distributed/mpi/MPITransport.h" + +#include + +#include +#include + +#include "eckit/exception/Exceptions.h" +#include "eckit/log/Statistics.h" +#include "eckit/log/TimeStamp.h" +#include "eckit/mpi/Comm.h" +#include "eckit/option/CmdArgs.h" +#include "eckit/os/AutoAlarm.h" +#include "eckit/runtime/Main.h" +#include "eckit/thread/AutoLock.h" + +#include "eckit/distributed/Actor.h" +#include "eckit/distributed/Message.h" + + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +MPITransport::MPITransport(const eckit::option::CmdArgs& args) : + Transport(args), + comm_(eckit::mpi::comm("world")) { + + + // eckit::mpi::createCo5mm("atlas", eckit::mpi::comm().self()); + // LibAtlas::setComm("atlas"); + + eckit::mpi::setCommDefault("self"); // for Atlas + + rank_ = comm_.rank(); + totalRanks_ = comm_.size(); + + eckit::Main::instance().taskID(rank_); + + hostname_ = comm_.processorName(); + + size_t writers = 0; + args.get("writers", writers); + + // Assuming aprun is run with --cc cpu, the taks are distributed evenly on each nodes, + // starting from 0 to totalRanks_ - 1 + + if (writers) { + int n = totalRanks_ / writers; + int r = totalRanks_ - 1; + ASSERT(n > 1); + + int w = 1; + for (size_t i = 0; i < writers; ++i) { + + if (r <= 0) { + r = 1; + } + ASSERT(ranksToWriters_.find(r) == ranksToWriters_.end()); + ranksToWriters_[r] = w; + + ASSERT(writersToRanks_.find(w) == writersToRanks_.end()); + writersToRanks_[w] = r; + + w++; + r -= n; + } + + // eckit::Log::info() << "Writers are: " << ranksToWriters_ << " " << writersToRanks_ << std::endl; + } + // eckit::Log::info() << " starting " << std::endl; + + + std::ostringstream oss; + if (rank_) { + if (ranksToWriters_.find(rank_) != ranksToWriters_.end()) { + oss << "Writer-" << rank_ << "@" << hostname_; + } + else { + oss << "Worker-" << rank_ << "@" << hostname_; + } + } + else { + oss << "Producer-0@" << hostname_; + } + + + title_ = oss.str(); + + std::ostringstream oid; + oid << rank_; + id_ = oid.str(); + + + std::string base; + if (args.get("split-logs", base)) { + + + std::ostringstream fname; + fname << base << '.' << rank_; + + if (rank_ == 0) { + eckit::Log::addFile(fname.str()); + } + else { + eckit::Log::setFile(fname.str()); + } + } + + std::string hostname = eckit::Main::hostname(); + + eckit::Log::info() << "Start of " + << title_ + << " host: " << hostname + << " pid: " << ::getpid() + << std::endl; +} + +MPITransport::~MPITransport() { +} + +bool MPITransport::single() const { + return totalRanks_ == 1; +} + +void MPITransport::initialise() { +} + +void MPITransport::abort() { + eckit::AutoAlarm alarm(10); + // eckit::Log::info() << " calling MPI abort" << std::endl; + + comm_.abort(1); +} + +bool MPITransport::producer() const { + return rank_ == 0; +} + +bool MPITransport::writer() const { + return ranksToWriters_.find(rank_) != ranksToWriters_.end(); +} + +void MPITransport::sendMessageToNextWorker(const Message& msg) { + + int worker = comm_.anySource(); + int tag = comm_.anyTag(); + + Message request; + + receive(request, worker, tag); + + ASSERT(ranksToWriters_.find(worker) == ranksToWriters_.end()); + ASSERT(tag == Actor::READY); + + send(msg, worker, msg.tag()); +} + +void MPITransport::getNextWorkMessage(Message& message) { + + int master = 0; + send(Message::readyMessage(), master, Actor::READY); + + + int source = master; + int tag = comm_.anyTag(); + + receive(message, source, tag); + + ASSERT(source == master); + ASSERT(tag == Actor::WORK || tag == Actor::SHUTDOWN); + + message.rewind(); + message.messageReceived(tag, source); +} + +void MPITransport::print(std::ostream& out) const { + out << "MPITransport[" << title_ << "]"; +} + +void MPITransport::synchronise() { + // eckit::Log::info() << " Calling MPI_Barrier " << std::endl; + eckit::AutoTiming timing(statistics_.barrierTiming_); + comm_.barrier(); +} + +void MPITransport::sendStatisticsToProducer(const Message& message) { + int producer = 0; + send(message, producer, Actor::STATISTICS); +} + +void MPITransport::sendToWriter(size_t writer, const Message& message) { + eckit::AutoLock lock(mutex_); + + auto j = writersToRanks_.find(writer); + ASSERT(j != writersToRanks_.end()); + send(message, (*j).second, message.tag()); +} + +void MPITransport::getNextWriteMessage(Message& message) { + // eckit::Log::info() + // << " getNextWriteMessage" + // << std::endl; + + int source = comm_.anySource(); + int tag = comm_.anyTag(); + + receive(message, source, tag); + + ASSERT(ranksToWriters_.find(source) == ranksToWriters_.end()); + + + message.rewind(); + message.messageReceived(tag, source); + + ASSERT(tag == Actor::WRITE + || tag == Actor::OPEN + || tag == Actor::CLOSE + || tag == Actor::SHUTDOWN); +} + +void MPITransport::send(const Message& message, int target, int tag) { + eckit::AutoTiming timing(statistics_.sendTiming_); + ASSERT(message.messageSize() > 0); + + comm_.send(const_cast(message.messageData()), message.messageSize(), target, tag); + + statistics_.sendCount_++; + statistics_.sendSize_ += message.messageSize(); +} + +void MPITransport::synchronisedSend(const Message& message, int target, int tag) { + eckit::AutoTiming timing(statistics_.sendTiming_); + + ASSERT(message.messageSize() > 0); + + comm_.synchronisedSend(const_cast(message.messageData()), message.messageSize(), target, tag); + + statistics_.sendCount_++; + statistics_.sendSize_ += message.messageSize(); +} + +void MPITransport::receive(Message& message, int& source, int& tag) { + eckit::AutoTiming timing(statistics_.receiveTiming_); + + eckit::mpi::Status status = comm_.probe(source, tag); + + size_t size = comm_.getCount(status); + + message.reserve(size); + + statistics_.receiveCount_++; + statistics_.receiveSize_ += size; + + source = status.source(); + tag = status.tag(); + + status = comm_.receive(message.messageData(), message.bufferSize(), source, tag); +} + +void MPITransport::sendShutDownMessage(const Actor& actor) { + eckit::AutoTiming timing(statistics_.shutdownTiming_); + + // Shutdown workers + size_t count = totalRanks_ - ranksToWriters_.size() - 1; + + eckit::Log::info() + << " shutdown workers count=" + << count + << std::endl; + + std::set r; + + for (int i = 1; i < totalRanks_; i++) { + r.insert(i); + } + + while (count > 0) { + int worker = comm_.anySource(); + int tag = comm_.anyTag(); + + Message message; + + receive(message, worker, tag); + + ASSERT(ranksToWriters_.find(worker) == ranksToWriters_.end()); + + switch (tag) { + case Actor::READY: + eckit::Log::info() + << " shutdown worker=" + << worker + << " left=" << count + << std::endl; + send(Message::shutdownMessage(), worker, Actor::SHUTDOWN); + break; + + case Actor::STATISTICS: + actor.messageFromWorker(message, worker); + r.erase(worker); + count--; + eckit::Log::info() + << " stats from worker=" + << worker + << " left=" << count + << " " << r + << std::endl; + break; + + default: + ASSERT(tag == Actor::READY || tag == Actor::STATISTICS); + break; + } + } + + count = ranksToWriters_.size(); + + eckit::Log::info() + << " shutdown workers count=" + << count + << std::endl; + + for (auto j = writersToRanks_.begin(); j != writersToRanks_.end(); ++j) { + send(Message::shutdownMessage(), (*j).second, Actor::SHUTDOWN); + } + + while (count > 0) { + int writer = comm_.anySource(); + int tag = comm_.anyTag(); + + Message message; + + receive(message, writer, tag); + + ASSERT(ranksToWriters_.find(writer) != ranksToWriters_.end()); + + switch (tag) { + + case Actor::STATISTICS: + actor.messageFromWriter(message, writer); + r.erase(writer); + count--; + eckit::Log::info() + << " stats from writer=" + << writer + << " left=" << count + << r + << std::endl; + break; + + default: + ASSERT(tag == Actor::STATISTICS); + break; + } + } +} + + +//---------------------------------------------------------------------------------------------------------------------- + +static TransportBuilder builder("mpi"); + +} // namespace eckit diff --git a/src/eckit/distributed/mpi/MPITransport.h b/src/eckit/distributed/mpi/MPITransport.h new file mode 100644 index 000000000..629d72a05 --- /dev/null +++ b/src/eckit/distributed/mpi/MPITransport.h @@ -0,0 +1,79 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @author Pedro Maciel + + +#pragma once + +#include "eckit/distributed/Transport.h" + +#include + +#include "eckit/mpi/Comm.h" +#include "eckit/thread/Mutex.h" + + +namespace eckit::option { +class Option; +class CmdArgs; +} // namespace eckit::option + +namespace eckit::distributed { + +class Message; + +//---------------------------------------------------------------------------------------------------------------------- + +class MPITransport : public Transport { +public: // methods + MPITransport(const eckit::option::CmdArgs& args); + virtual ~MPITransport() override; + +private: // methods + virtual void sendMessageToNextWorker(const Message& message) override; + virtual void getNextWorkMessage(Message& message) override; + virtual void sendStatisticsToProducer(const Message& message) override; + virtual void sendShutDownMessage(const Actor&) override; + + virtual bool producer() const override; + virtual bool single() const override; + virtual void initialise() override; + virtual void abort() override; + virtual void synchronise() override; + virtual bool writer() const override; + virtual void sendToWriter(size_t writer, const Message& message) override; + virtual void getNextWriteMessage(Message& message) override; + + void print(std::ostream& out) const override; + + void send(const Message& message, int target, int tag); + void receive(Message& message, int& source, int& tag); + void synchronisedSend(const Message& message, int target, int tag); + +private: // members + int totalRanks_; + int rank_; + + std::map writersToRanks_; + std::map ranksToWriters_; + + std::string hostname_; + + mutable eckit::Mutex mutex_; + + eckit::mpi::Comm& comm_; +}; + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit diff --git a/src/eckit/distributed/tcp/TCPTransport.cc b/src/eckit/distributed/tcp/TCPTransport.cc new file mode 100644 index 000000000..de2d85cee --- /dev/null +++ b/src/eckit/distributed/tcp/TCPTransport.cc @@ -0,0 +1,486 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + + +#include + +#include "eckit/log/Log.h" +#include "eckit/log/Seconds.h" +#include "eckit/log/Plural.h" +#include "eckit/log/TimeStamp.h" +#include "eckit/runtime/Main.h" + +#include "eckit/exception/Exceptions.h" +#include "eckit/os/AutoAlarm.h" +#include "eckit/net/TCPClient.h" +#include "eckit/net/TCPStream.h" +#include "eckit/option/CmdArgs.h" +#include "eckit/io/Select.h" + +#include "eckit/distributed/tcp/TCPTransport.h" +#include "eckit/distributed/Transport.h" +#include "eckit/distributed/Message.h" +#include "eckit/distributed/Actor.h" + +using namespace eckit; +using namespace eckit::net; + +namespace eckit::distributed { + +//---------------------------------------------------------------------------------------------------------------------- + +class Connection : public InstantTCPStream { + Select& select_; + TCPSocket socket_; + size_t id_; + bool active_; + +private: + + TCPSocket& socket() { return socket_; } + +public: + Connection(Select& select, TCPSocket &socket, size_t id = 0): + InstantTCPStream(socket), + select_(select), + socket_(socket), + id_(id), + active_(true) { + select_.add(socket_); + } + + ~Connection() { + if (active_) { + disconnect(); + } + } + + size_t id() const { return id_; } + + bool active() const { return active_; } + + void disconnect() { + active_ = false; + select_.remove(socket_); + socket_.close(); + } + + bool ready() { + return active_ && select_.set(socket_); + } + + std::string remoteHost() { + return socket_.remoteHost(); + } + + int remotePort() { + return socket_.remotePort(); + } +}; + +TCPTransport::TCPTransport(const option::CmdArgs &args): + Transport(args), + nextId_(0), + master_(false), + worker_(false), + writer_(false) { + + + size_t port = 7777; + args.get("port", port); + + std::string hostname = Main::hostname(); + + std::ostringstream oss; + + std::string host("localhost"); + if (args.get("host", host)) { + // We are a consumer + TCPClient client; + producer_.reset(new Connection(select_, client.connect(host, port, 10, 60))); + oss << "Consumer-" << ::getpid() << "@" << hostname; + + } else { + // We are the producer + accept_.reset(new TCPServer(port)); + select_.add(*accept_); + oss << "Producer-" << ::getpid() << "@" << hostname; + + } + + title_ = oss.str(); + + std::ostringstream oid; + oid << hostname << "@" << ::getpid(); + id_ = oid.str(); + +} + + +TCPTransport::~TCPTransport() { + for (auto j = connections_.begin(); j != connections_.end(); ++j) { + delete (*j); + } +} + +bool TCPTransport::single() const { + return false; +} + +void TCPTransport::initialise() { + accept(); +} + +void TCPTransport::synchronise() { +} + +void TCPTransport::sendMessageToNextWorker(const Message &message) { + while (!send(message)) { + cleanup(); + Log::info() << TimeStamp() + << " " + << title() + << ", resending..." + << std::endl; + if (connections_.empty()) { + throw SeriousBug("TCPTransport: no more workers"); + } + } +} + + +bool TCPTransport::send(const Message &message) { + + cleanup(); + + bool more = true; + while (more) { + more = false; + + while (!select_.ready(30)) { + Log::info() << TimeStamp() + << " " + << title() + << ", waiting... " + << Plural(connections_.size(), "worker") + << " still active" + << std::endl; + } + + if (select_.set(*accept_)) { + accept(); + more = true; + } + } + + for (auto j = connections_.rbegin(); j != connections_.rend(); ++j) { + Connection &connection = **j; + if (connection.ready()) { + try { + + size_t tag; + connection >> tag; + + ASSERT(tag == Actor::READY); + + // Log::info() << TimeStamp() + // << " " + // << title() + // << " sending to worker " + // << connection.id() + // << " tag " << message.tag() + // << std::endl; + + connection << size_t(message.tag()); + connection << message.messageSize(); + connection.writeBlob(message.messageData(), message.messageSize()); + + // Next time, we consider other connections first + std::swap(*j, connections_[0]); + + return true; + + } catch (std::exception &e) { + disconnect(e, connection); + continue; + } + } + } + return false; +} + + +void TCPTransport::cleanup() { + bool more = true; + while (more) { + more = false; + for (auto j = connections_.begin(); j != connections_.end(); ++j) { + Connection *connection = *j; + if (!connection->active()) { + delete connection; + connections_.erase(j); + more = true; + break; + } + } + } +} + +Connection& TCPTransport::producerConnection() const { + ASSERT(producer_); + return *producer_; +} + + +void TCPTransport::disconnect() const { +// Close connection to producer + ASSERT(producer_); + producer_.reset(nullptr); +} + + +void TCPTransport::getNextWorkMessage(Message &message) { + + auto& connection = producerConnection(); + + Log::info() << TimeStamp() + << " " + << title() + << " TCPTransport::getNextWorkMessage -> send" + << std::endl; + + connection << size_t(Actor::READY); + + size_t tag; + connection >> tag; + Log::info() << TimeStamp() + << " " + << title() + << " TCPTransport::getNextWorkMessage got reply" + << std::endl; + + size_t size; + + switch (tag) { + + case Actor::WORK: + connection >> size; + ASSERT(size <= message.bufferSize()); + connection.readBlob(message.messageData(), size); + break; + + case Actor::SHUTDOWN: + break; + + default: + ASSERT(tag == Actor::WORK || tag == Actor::SHUTDOWN); + break; + } + + message.rewind(); + message.messageReceived(tag, connection.id()); + +} + +void TCPTransport::sendStatisticsToProducer(const Message &message) { + + auto& connection = producerConnection(); + + connection << size_t(Actor::STATISTICS); + connection << message.messageSize(); + connection.writeBlob(message.messageData(), message.messageSize()); + + + // Close connection to producer + // TODO: this is the wrong place + + disconnect(); + + +} + +bool TCPTransport::producer() const { + return accept_ != 0; +} + +void TCPTransport::accept() { + ASSERT(accept_); + Log::info() << TimeStamp() + << " " + << title() + << ", waiting for a connection" + << std::endl; + + TCPSocket incoming(accept_->accept()); + + Connection *connection = new Connection(select_, incoming, ++nextId_); + connections_.push_back(connection); + + Log::info() << TimeStamp() + << " " + << title() + << ", got connection from " + << connection->remoteHost() + << ":" + << connection->remotePort() + << ", worker: " + << connection->id() + << std::endl; +} + +void TCPTransport::print(std::ostream &out) const { + out << "TCPTransport[]"; +} + + +void TCPTransport::abort() { + cleanup(); + for (auto j = connections_.begin(); j != connections_.end(); ++j) { + disconnect(**j); + delete (*j); + } + connections_.clear(); +} + +bool TCPTransport::writer() const { + return false; +} + +void TCPTransport::sendToWriter(size_t writer, const Message &message) { + NOTIMP; +} + +void TCPTransport::getNextWriteMessage(Message &message) { + NOTIMP; +} + +void TCPTransport::disconnect(Connection& connection) const { + Log::error() + << TimeStamp() + << " " + << title() + << " disconnect " + << connection.id() + << std::endl; + connection.disconnect(); +} + +void TCPTransport::disconnect(std::exception& e, Connection& connection) const { + + Log::error() + << TimeStamp() + << " " + << title() + << " " + << e.what() + << std::endl; + + Log::error() + << TimeStamp() + << " " + << title() + << ", lost connection with worker " + << connection.id() + << std::endl; + + disconnect(connection); + +} + + +void TCPTransport::sendShutDownMessage(const Actor& actor) { + + select_.remove(*accept_); + + while (connections_.size()) { + + bool more = true; + while (more) { + more = false; + + while (!select_.ready(30)) { + Log::info() << TimeStamp() + << " " + << title() + << ", waiting... " + << Plural(connections_.size(), "worker") + << " still active" + << std::endl; + } + + // if (select_.set(*accept_)) { + // accept(); + // more = true; + // } + } + + Log::info() << TimeStamp() + << " " + << title() + << " " + << Plural(connections_.size(), "worker") + << " remaining" + << std::endl; + + + for (auto j = connections_.begin(); j != connections_.end(); ++j) { + Connection &connection = **j; + if (connection.ready()) { + try { + + Message message; + + size_t tag; + size_t size; + + connection >> tag; + + switch (tag) { + + case Actor::READY: + Log::info() << TimeStamp() + << " " + << title() + << " shutdown worker " + << connection.id() + << std::endl; + connection << size_t(Actor::SHUTDOWN); + break; + + case Actor::STATISTICS: + connection >> size; + + message.reserve(size); + connection.readBlob(message.messageData(), size); + actor.messageFromWorker(message, connection.id()); + disconnect(connection); + break; + + default: + ASSERT(tag == Actor::READY || tag == Actor::STATISTICS); + break; + } + + } catch (std::exception &e) { + disconnect(e, connection); + } + } + } + + cleanup(); + } +} +//---------------------------------------------------------------------------------------------------------------------- + +static TransportBuilder builder("tcp"); + + +} // namespace eckit + diff --git a/src/eckit/distributed/tcp/TCPTransport.h b/src/eckit/distributed/tcp/TCPTransport.h new file mode 100644 index 000000000..7c50ce036 --- /dev/null +++ b/src/eckit/distributed/tcp/TCPTransport.h @@ -0,0 +1,109 @@ +/* + * (C) Copyright 1996- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @file TCPTransport.h +/// @author Baudouin Raoult +/// @author Tiago Quintino +/// @date May 2016 + +#ifndef eckit_TCPTransport_H +#define eckit_TCPTransport_H + +#include +#include + +#include "eckit/net/TCPServer.h" +#include "eckit/io/Select.h" + +#include "eckit/distributed/Transport.h" + +namespace eckit { +class Stream; +} + +namespace eckit::option { +class Option; +class CmdArgs; +} + +namespace eckit::distributed { + +class Message; + +//---------------------------------------------------------------------------------------------------------------------- + +class Connection; + +class TCPTransport : public Transport { +public: // methods + + TCPTransport(const eckit::option::CmdArgs &args); + virtual ~TCPTransport() override; + +protected: // methods + + + virtual void sendMessageToNextWorker(const Message &message) override; + virtual void getNextWorkMessage(Message &message) override; + virtual void sendStatisticsToProducer(const Message &message) override; + virtual void sendShutDownMessage(const Actor&) override; + + virtual bool producer() const override; + virtual bool single() const override; + virtual void initialise() override; + virtual void abort() override; + virtual void synchronise() override; + virtual bool writer() const override; + virtual void sendToWriter(size_t writer, const Message &message) override; + virtual void getNextWriteMessage(Message &message) override; + + void print(std::ostream& out) const override; + + +private: + + // typedef bool (TCPTransport::*)(Connection&); + + // void all(bool (TCPTransport ::* const)(Connection&)); + + void disconnect() const; + void disconnect(Connection&) const; + void disconnect(std::exception& e, Connection&) const; + Connection& producerConnection() const; + +private: // members + + void accept(); + void connect(); + + bool send(const Message &message); + void cleanup(); + + mutable std::unique_ptr producer_; + + mutable std::unique_ptr accept_; + + mutable std::vector connections_; + + mutable eckit::Select select_; + + mutable size_t nextId_; + + bool master_; + bool worker_; + bool writer_; + +}; + +//---------------------------------------------------------------------------------------------------------------------- + +} // namespace eckit + +#endif diff --git a/src/eckit/log/JSON.h b/src/eckit/log/JSON.h index 15d528469..3ac08d6f5 100644 --- a/src/eckit/log/JSON.h +++ b/src/eckit/log/JSON.h @@ -15,6 +15,7 @@ #ifndef eckit_log_JSON_h #define eckit_log_JSON_h +#include #include #include #include diff --git a/src/eckit/system/SystemInfoFreeBSD.cc b/src/eckit/system/SystemInfoFreeBSD.cc index 0b06729dd..8ac32d627 100644 --- a/src/eckit/system/SystemInfoFreeBSD.cc +++ b/src/eckit/system/SystemInfoFreeBSD.cc @@ -16,9 +16,9 @@ #include #include -#include #include #include // FreeBSD: must appear before sys/sysctl.h +#include #include #include diff --git a/src/eckit/thread/ThreadPool.cc b/src/eckit/thread/ThreadPool.cc index 9b2d9e54a..ec669ea39 100644 --- a/src/eckit/thread/ThreadPool.cc +++ b/src/eckit/thread/ThreadPool.cc @@ -192,6 +192,10 @@ ThreadPoolTask* ThreadPool::next() { return r; } +bool ThreadPool::done() { + return (queue_.empty() && !tasks_); +} + void ThreadPool::wait() { AutoLock lock(active_); diff --git a/src/eckit/thread/ThreadPool.h b/src/eckit/thread/ThreadPool.h index abca49fcd..e10d40bd6 100644 --- a/src/eckit/thread/ThreadPool.h +++ b/src/eckit/thread/ThreadPool.h @@ -56,6 +56,7 @@ class ThreadPool : private NonCopyable { const std::string& name() const { return name_; } void error(const std::string&); + bool done(); void wait(); void resize(size_t);