From 13e4e3ae4a37a980670eaa196567bb4e8279634b Mon Sep 17 00:00:00 2001 From: Konstantinos Parasyris Date: Wed, 25 Oct 2023 16:44:54 -0700 Subject: [PATCH] Allow multiple instantions of workflow which use the same model/db. (#13) --- examples/main.cpp | 30 ++++-- src/ml/hdcache.hpp | 240 ++++++++++++++++++++++++++++++++---------- src/ml/surrogate.hpp | 123 +++++++++++++++++----- src/wf/basedb.hpp | 161 +++++++++++++++++++++------- src/wf/utils.hpp | 6 +- src/wf/workflow.hpp | 134 ++++++++++++----------- tests/CMakeLists.txt | 3 +- tests/torch_model.cpp | 18 +++- 8 files changed, 515 insertions(+), 200 deletions(-) diff --git a/examples/main.cpp b/examples/main.cpp index f1b645c7..e7ae3e87 100644 --- a/examples/main.cpp +++ b/examples/main.cpp @@ -176,7 +176,8 @@ int main(int argc, char **argv) args.AddOption(&db_config, "-db", "--dbconfig", - "Path to directory where applications will store their data (or Path to JSON configuration if RabbitMQ is chosen)", + "Path to directory where applications will store their data " + "(or Path to JSON configuration if RabbitMQ is chosen)", reqDB); args.AddOption(&db_type, @@ -187,14 +188,19 @@ int main(int argc, char **argv) "\t 'hdf5': use hdf5 as a back end\n" "\t 'rmq': use RabbitMQ as a back end\n"); - args.AddOption(&k_nearest, "-knn", "--k-nearest-neighbors", "Number of closest neightbors we should look at"); + args.AddOption(&k_nearest, + "-knn", + "--k-nearest-neighbors", + "Number of closest neightbors we should look at"); args.AddOption(&uq_policy_opt, "-uq", "--uqtype", "Types of UQ to select from: \n" - "\t 'mean' Uncertainty is computed in comparison against the mean distance of k-nearest neighbors\n" - "\t 'max': Uncertainty is computed in comparison with the k'st cluster \n" + "\t 'mean' Uncertainty is computed in comparison against the " + "mean distance of k-nearest neighbors\n" + "\t 'max': Uncertainty is computed in comparison with the " + "k'st cluster \n" "\t 'deltauq': Uncertainty through DUQ (not supported)\n"); args.AddOption( @@ -266,12 +272,14 @@ int main(int argc, char **argv) dbType = AMSDBType::RMQ; } - AMSUQPolicy uq_policy = - (std::strcmp(uq_policy_opt, "max") == 0) ? AMSUQPolicy::FAISSMax: AMSUQPolicy::FAISSMean; + AMSUQPolicy uq_policy = (std::strcmp(uq_policy_opt, "max") == 0) + ? AMSUQPolicy::FAISSMax + : AMSUQPolicy::FAISSMean; - if ( uq_policy != AMSUQPolicy::FAISSMax ) + if (uq_policy != AMSUQPolicy::FAISSMax) uq_policy = ((std::strcmp(uq_policy_opt, "deltauq") == 0)) - ? AMSUQPolicy::DeltaUQ : AMSUQPolicy::FAISSMean; + ? AMSUQPolicy::DeltaUQ + : AMSUQPolicy::FAISSMean; // set up a randomization seed srand(seed + rId); @@ -423,7 +431,7 @@ int main(int argc, char **argv) AMSResourceType ams_device = AMSResourceType::HOST; if (use_device) ams_device = AMSResourceType::DEVICE; AMSExecPolicy ams_loadBalance = AMSExecPolicy::UBALANCED; - if ( lbalance ) ams_loadBalance = AMSExecPolicy::BALANCED; + if (lbalance) ams_loadBalance = AMSExecPolicy::BALANCED; AMSConfig amsConf = {ams_loadBalance, AMSDType::Double, @@ -437,10 +445,10 @@ int main(int argc, char **argv) uq_policy, k_nearest, rId, - wS }; - AMSExecutor wf = AMSCreateExecutor(amsConf); + wS}; for (int mat_idx = 0; mat_idx < num_mats; ++mat_idx) { + AMSExecutor wf = AMSCreateExecutor(amsConf); workflow[mat_idx] = wf; } #endif diff --git a/src/ml/hdcache.hpp b/src/ml/hdcache.hpp index bf72e579..a610fcff 100644 --- a/src/ml/hdcache.hpp +++ b/src/ml/hdcache.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include "AMS.h" #include "wf/data_handler.hpp" #include "wf/resource_manager.hpp" +#include "wf/utils.hpp" //! ---------------------------------------------------------------------------- //! An implementation of FAISS-based HDCache @@ -87,12 +89,15 @@ class HDCache // faiss::gpu::GpuIndexIVFPQ *index_gpu; #endif -public: +protected: + // A mechanism to keep track of all unique HDCaches + static std::unordered_map>> + instances; + //! ------------------------------------------------------------------------ //! constructors //! ------------------------------------------------------------------------ - HDCache(bool use_device, - TypeInValue threshold = 0.5) + HDCache(bool use_device, TypeInValue threshold = 0.5) : m_index(nullptr), m_dim(0), m_use_random(true), @@ -135,6 +140,7 @@ class HDCache HDCache(const std::string &cache_path, int knbrs, bool use_device, + const AMSUQPolicy uqPolicy, TypeInValue threshold = 0.5) : m_index(nullptr), m_dim(0), @@ -150,17 +156,114 @@ class HDCache } #endif +public: + static std::shared_ptr> find_cache( + const std::string &cache_path, + bool use_device, + const AMSUQPolicy uqPolicy, + int knbrs, + TypeInValue threshold = 0.5) + { + auto model = HDCache::instances.find(cache_path); + + if (model != instances.end()) { + // Model Found + auto cache = model->second; + if (use_device != cache->m_use_device) + throw std::runtime_error( + "Currently we do not support loading the same index on different " + "devices."); + + if (uqPolicy != cache->m_policy) + throw std::runtime_error( + "We do not support caches of different policies."); + + if (knbrs != cache->m_knbrs) + throw std::runtime_error( + "We do not support caches of different number of neighbors."); + + // FIXME: Here we need to cast both to float. FAISS index only works for + // single precision and we shoehorn FAISS inability to support arbitary real + // types by forcing TypeValue to be 'float'. In our case this results in having + // cases where input data are of type(TypeInValue) double. Thus here, threshold can + // be of different type than 'acceptable_error' and at compile time we cannot decide + // which overloaded function to pick. + if (!is_real_equal(static_cast(threshold), + static_cast(cache->acceptable_error))) + throw std::runtime_error( + "We do not support caches of different thresholds"); + + return cache; + } + return nullptr; + } + + static std::shared_ptr> getInstance( + const std::string &cache_path, + bool use_device, + const AMSUQPolicy uqPolicy, + int knbrs, + TypeInValue threshold = 0.5) + { + + // Cache does not exist. We need to create one + // + std::shared_ptr> cache = + find_cache(cache_path, use_device, uqPolicy, knbrs, threshold); + if (cache) { + DBG(UQModule, "Returning existing cache under (%s)", cache_path.c_str()) + return cache; + } + + DBG(UQModule, "Generating new cache under (%s)", cache_path.c_str()) + std::shared_ptr> new_cache = + std::shared_ptr>(new HDCache( + cache_path, use_device, uqPolicy, knbrs, threshold)); + + instances.insert(std::make_pair(cache_path, new_cache)); + return new_cache; + } + + static std::shared_ptr> getInstance( + bool use_device, + float threshold = 0.5) + { + static std::string random_path("random"); + std::shared_ptr> cache = find_cache( + random_path, use_device, AMSUQPolicy::FAISSMean, -1, threshold); + if (cache) { + DBG(UQModule, "Returning existing cache under (%s)", random_path.c_str()) + return cache; + } + + DBG(UQModule, + "Generating new cache under (%s, threshold:%f)", + random_path.c_str(), + threshold) + std::shared_ptr> new_cache = + std::shared_ptr>( + new HDCache(use_device, threshold)); + + instances.insert(std::make_pair(random_path, new_cache)); + return new_cache; + } + + ~HDCache() { DBG(Surrogate, "Destroying UQ-cache") } + //! ------------------------------------------------------------------------ //! simple queries //! ------------------------------------------------------------------------ inline void print() const { std::string info("index = null"); - if ( has_index() ) { - info = "npoints = " + std::to_string(count()); + if (has_index()) { + info = "npoints = " + std::to_string(count()); } - DBG(UQModule, "HDCache (on_device = %d random = %d %s)", - m_use_device, m_use_random, info.c_str()); + DBG(UQModule, + "HDCache (on_device = %d random = %d %s)", + m_use_device, + m_use_random, + info.c_str()); } inline bool has_index() const @@ -207,27 +310,33 @@ class HDCache //! add points to the faiss cache //! ----------------------------------------------------------------------- //! add the data that comes as linearized features -PERFFASPECT() + PERFFASPECT() void add(const size_t ndata, const size_t d, TypeInValue *data) { if (m_use_random) return; DBG(UQModule, "Add %ld %ld points to HDCache", ndata, d); CFATAL(UQModule, d != m_dim, "Mismatch in data dimensionality!") - CFATAL(UQModule, !has_index(), "HDCache does not have a valid and trained index!") + CFATAL(UQModule, + !has_index(), + "HDCache does not have a valid and trained index!") _add(ndata, data); } //! add the data that comes as separate features (a vector of pointers) -PERFFASPECT() + PERFFASPECT() void add(const size_t ndata, const std::vector &inputs) { if (m_use_random) return; if (inputs.size() != m_dim) - CFATAL(UQModule, inputs.size() != m_dim, "Mismatch in data dimensionality") - CFATAL(UQModule, !has_index(), "HDCache does not have a valid and trained index!") + CFATAL(UQModule, + inputs.size() != m_dim, + "Mismatch in data dimensionality") + CFATAL(UQModule, + !has_index(), + "HDCache does not have a valid and trained index!") TypeValue *lin_data = data_handler::linearize_features(ndata, inputs); _add(ndata, lin_data); @@ -238,20 +347,22 @@ PERFFASPECT() //! train a faiss cache //! ----------------------------------------------------------------------- //! train on data that comes as linearized features -PERFFASPECT() + PERFFASPECT() void train(const size_t ndata, const size_t d, TypeInValue *data) { if (m_use_random) return; DBG(UQModule, "Add %ld %ld points to HDCache", ndata, d); CFATAL(UQModule, d != m_dim, "Mismatch in data dimensionality!") - CFATAL(UQModule, !has_index(), "HDCache does not have a valid and trained index!") + CFATAL(UQModule, + !has_index(), + "HDCache does not have a valid and trained index!") _train(ndata, data); DBG(UQModule, "Successfully Trained HDCache"); } //! train on data that comes separate features (a vector of pointers) -PERFFASPECT() + PERFFASPECT() void train(const size_t ndata, const std::vector &inputs) { if (m_use_random) return; @@ -268,37 +379,50 @@ PERFFASPECT() //! https://github.com/facebookresearch/faiss/wiki/Faiss-on-the-GPU#passing-in-pytorch-tensors //! so, we should use Dino's code to linearize data into torch tensor and then //! pass it here -PERFFASPECT() + PERFFASPECT() void evaluate(const size_t ndata, const size_t d, TypeInValue *data, bool *is_acceptable) const { - CFATAL(UQModule, !has_index(), "HDCache does not have a valid and trained index!") + CFATAL(UQModule, + !has_index(), + "HDCache does not have a valid and trained index!") DBG(UQModule, "Evaluating %ld %ld points using HDCache", ndata, d); - CFATAL(UQModule, (!m_use_random) && (d != m_dim), "Mismatch in data dimensionality!") + CFATAL(UQModule, + (!m_use_random) && (d != m_dim), + "Mismatch in data dimensionality!") if (m_use_random) { _evaluate(ndata, is_acceptable); } else { _evaluate(ndata, data, is_acceptable); } - } //! train on data that comes separate features (a vector of pointers) -PERFFASPECT() + PERFFASPECT() void evaluate(const size_t ndata, const std::vector &inputs, bool *is_acceptable) const { - CFATAL(UQModule, !has_index(), "HDCache does not have a valid and trained index!") - DBG(UQModule, "Evaluating %ld %ld points using HDCache configured with %d neighbors, %f threshold, %d policy", - ndata, inputs.size(), m_knbrs, acceptable_error, m_policy); - CFATAL(UQModule, ((!m_use_random) && inputs.size() != m_dim), "Mismatch in data dimensionality!") + CFATAL(UQModule, + !has_index(), + "HDCache does not have a valid and trained index!") + DBG(UQModule, + "Evaluating %ld %ld points using HDCache configured with %d neighbors, " + "%f threshold, %d policy", + ndata, + inputs.size(), + m_knbrs, + acceptable_error, + m_policy); + CFATAL(UQModule, + ((!m_use_random) && inputs.size() != m_dim), + "Mismatch in data dimensionality!") if (m_use_random) { _evaluate(ndata, is_acceptable); @@ -320,7 +444,7 @@ PERFFASPECT() //! add points to index when (data type = TypeValue) template ::value> * = nullptr> -PERFFASPECT() + PERFFASPECT() inline void _add(const size_t ndata, const T *data) { m_index->add(ndata, data); @@ -329,7 +453,7 @@ PERFFASPECT() //! add points to index when (data type != TypeValue) template ::value> * = nullptr> -PERFFASPECT() + PERFFASPECT() inline void _add(const size_t ndata, const T *data) { TypeValue *vdata = data_handler::cast_to_typevalue(ndata, data); @@ -341,30 +465,27 @@ PERFFASPECT() //! train an index when (data type = TypeValue) template ::value> * = nullptr> -PERFFASPECT() + PERFFASPECT() inline void _train(const size_t ndata, const T *data) { if (m_index != nullptr && m_index->is_trained) - throw std::invalid_argument( - "!"); + throw std::invalid_argument("!"); CFATAL(UQModule, - (m_index != nullptr && m_index->is_trained), - "Trying to re-train an already trained index") + (m_index != nullptr && m_index->is_trained), + "Trying to re-train an already trained index") m_index = faiss::index_factory(m_dim, index_key); m_index->train(ndata, data); - CFATAL(UQModule, - ((!m_index->is_trained)), - "Failed to train index") + CFATAL(UQModule, ((!m_index->is_trained)), "Failed to train index") } //! train an index when (data type != TypeValue) template ::value> * = nullptr> -PERFFASPECT() + PERFFASPECT() inline void _train(const size_t ndata, const T *data) { TypeValue *vdata = data_handler::cast_to_typevalue(ndata, data); @@ -376,7 +497,7 @@ PERFFASPECT() //! evaluate cache uncertainty when (data type = TypeValue) template ::value> * = nullptr> -PERFFASPECT() + PERFFASPECT() void _evaluate(const size_t ndata, T *data, bool *is_acceptable) const { @@ -388,10 +509,12 @@ PERFFASPECT() ams::ResourceManager::is_on_device(is_acceptable); if (input_on_device != output_on_device) { - WARNING(UQModule, "Input is ( on_device: %d)" - " Output is ( on_device: %d)" - " on different devices", - input_on_device, output_on_device) + WARNING(UQModule, + "Input is ( on_device: %d)" + " Output is ( on_device: %d)" + " on different devices", + input_on_device, + output_on_device) } TypeValue *kdists = @@ -406,29 +529,35 @@ PERFFASPECT() for (int start = 0; start < ndata; start += MAGIC_NUMBER) { unsigned int nElems = ((ndata - start) < MAGIC_NUMBER) ? ndata - start : MAGIC_NUMBER; - m_index->search( - nElems, &data[start], knbrs, &kdists[start*knbrs], &kidxs[start*knbrs]); + m_index->search(nElems, + &data[start], + knbrs, + &kdists[start * knbrs], + &kidxs[start * knbrs]); } // compute means if (defaultRes == AMSResourceType::HOST) { TypeValue total_dist = 0; for (size_t i = 0; i < ndata; ++i) { - CFATAL(UQModule, m_policy==AMSUQPolicy::DeltaUQ, "DeltaUQ is not supported yet"); - if ( m_policy == AMSUQPolicy::FAISSMean ) { + CFATAL(UQModule, + m_policy == AMSUQPolicy::DeltaUQ, + "DeltaUQ is not supported yet"); + if (m_policy == AMSUQPolicy::FAISSMean) { total_dist = std::accumulate(kdists + i * knbrs, kdists + (i + 1) * knbrs, 0.); is_acceptable[i] = (ook * total_dist) < acceptable_error; - } - else if ( m_policy == AMSUQPolicy::FAISSMax ) { + } else if (m_policy == AMSUQPolicy::FAISSMax) { // Take the furtherst cluster as the distance metric - total_dist = kdists[i*knbrs + knbrs -1]; + total_dist = kdists[i * knbrs + knbrs - 1]; is_acceptable[i] = (total_dist) < acceptable_error; } } } else { - CFATAL(UQModule, (m_policy==AMSUQPolicy::DeltaUQ) || (m_policy==AMSUQPolicy::FAISSMax), - "DeltaUQ is not supported yet"); + CFATAL(UQModule, + (m_policy == AMSUQPolicy::DeltaUQ) || + (m_policy == AMSUQPolicy::FAISSMax), + "DeltaUQ is not supported yet"); ams::Device::computePredicate( kdists, is_acceptable, ndata, knbrs, acceptable_error); @@ -455,24 +584,24 @@ PERFFASPECT() inline uint8_t _dim() const { return 0; } template -PERFFASPECT() + PERFFASPECT() inline void _add(const size_t, const T *) { } template -PERFFASPECT() + PERFFASPECT() inline void _train(const size_t, const T *) { } template -PERFFASPECT() + PERFFASPECT() inline void _evaluate(const size_t, T *, bool *) const { } #endif -PERFFASPECT() + PERFFASPECT() inline void _evaluate(const size_t ndata, bool *is_acceptable) const { const bool data_on_device = @@ -488,4 +617,9 @@ PERFFASPECT() } // ------------------------------------------------------------------------- }; + +template +std::unordered_map>> + HDCache::instances; + #endif diff --git a/src/ml/surrogate.hpp b/src/ml/surrogate.hpp index babb5157..8f56d6c2 100644 --- a/src/ml/surrogate.hpp +++ b/src/ml/surrogate.hpp @@ -8,14 +8,16 @@ #ifndef __AMS_SURROGATE_HPP__ #define __AMS_SURROGATE_HPP__ +#include +#include #include +#include #ifdef __ENABLE_TORCH__ #include // One-stop header. #endif #include "wf/data_handler.hpp" - #include "wf/debug.h" //! ---------------------------------------------------------------------------- @@ -34,7 +36,7 @@ class SurrogateModel private: const std::string model_path; - const bool is_cpu; + const bool _is_cpu; #ifdef __ENABLE_TORCH__ @@ -48,7 +50,7 @@ class SurrogateModel // ------------------------------------------------------------------------- // conversion to and from torch // ------------------------------------------------------------------------- -PERFFASPECT() + PERFFASPECT() inline at::Tensor arrayToTensor(long numRows, long numCols, TypeInValue** array) @@ -63,7 +65,7 @@ PERFFASPECT() return tensor; } -PERFFASPECT() + PERFFASPECT() inline at::Tensor arrayToTensor(long numRows, long numCols, const TypeInValue** array) @@ -78,7 +80,7 @@ PERFFASPECT() return tensor; } -PERFFASPECT() + PERFFASPECT() inline void tensorToArray(at::Tensor tensor, long numRows, long numCols, @@ -87,7 +89,7 @@ PERFFASPECT() // Transpose to get continuous memory and // perform single memcpy. tensor = tensor.transpose(1, 0); - if (is_cpu) { + if (_is_cpu) { for (long j = 0; j < numCols; j++) { auto tmp = tensor[j].contiguous(); TypeInValue* ptr = tmp.data_ptr(); @@ -105,7 +107,7 @@ PERFFASPECT() // ------------------------------------------------------------------------- // loading a surrogate model! // ------------------------------------------------------------------------- -PERFFASPECT() + PERFFASPECT() void _load_torch(const std::string& model_path, c10::Device&& device, at::ScalarType dType) @@ -122,28 +124,28 @@ PERFFASPECT() template ::value>* = nullptr> -PERFFASPECT() + PERFFASPECT() inline void _load(const std::string& model_path, const std::string& device_name) { - DBG(Surrogate, "Using model at double precision") + DBG(Surrogate, "Using model at double precision"); _load_torch(model_path, torch::Device(device_name), torch::kFloat64); } template ::value>* = nullptr> -PERFFASPECT() + PERFFASPECT() inline void _load(const std::string& model_path, const std::string& device_name) { - DBG(Surrogate, "Using model at single precision") + DBG(Surrogate, "Using model at single precision"); _load_torch(model_path, torch::Device(device_name), torch::kFloat32); } // ------------------------------------------------------------------------- // evaluate a torch model // ------------------------------------------------------------------------- -PERFFASPECT() + PERFFASPECT() inline void _evaluate(long num_elements, size_t num_in, size_t num_out, @@ -156,20 +158,24 @@ PERFFASPECT() input.set_requires_grad(false); at::Tensor output = module.forward({input}).toTensor().detach(); - DBG(Surrogate, "Evaluate surrogate model (%ld, %ld) -> (%ld, %ld)", - num_elements, num_in, num_elements, num_out); + DBG(Surrogate, + "Evaluate surrogate model (%ld, %ld) -> (%ld, %ld)", + num_elements, + num_in, + num_elements, + num_out); tensorToArray(output, num_elements, num_out, outputs); } #else template -PERFFASPECT() + PERFFASPECT() inline void _load(const std::string& model_path, const std::string& device_name) { } -PERFFASPECT() + PERFFASPECT() inline void _evaluate(long num_elements, long num_in, size_t num_out, @@ -180,21 +186,80 @@ PERFFASPECT() #endif - // ------------------------------------------------------------------------- - // public interface - // ------------------------------------------------------------------------- -public: SurrogateModel(const char* model_path, bool is_cpu = true) - : model_path(model_path), is_cpu(is_cpu) + : model_path(model_path), _is_cpu(is_cpu) { - if (is_cpu) + if (_is_cpu) _load(model_path, "cpu"); else _load(model_path, "cuda"); } -PERFFASPECT() +protected: + template ::value>* = nullptr> + static bool same_type(bool is_double) + { + return !is_double; + } + + template ::value>* = nullptr> + static bool same_type(bool is_double) + { + return is_double; + } + + static std::unordered_map>> + instances; + +public: + // ------------------------------------------------------------------------- + // public interface + // ------------------------------------------------------------------------- + + static std::shared_ptr> getInstance( + const char* model_path, + bool is_cpu = true) + { + auto model = + SurrogateModel::instances.find(std::string(model_path)); + if (model != instances.end()) { + // Model Found + auto torch_model = model->second; + if (is_cpu != torch_model->is_cpu()) + throw std::runtime_error( + "Currently we are not supporting loading the same model file on " + "different devices."); + + if (!same_type(torch_model->is_double())) + throw std::runtime_error( + "Requesting model loading of different data types."); + + DBG(Surrogate, + "Returning existing model represented under (%s)", + model_path); + return torch_model; + } + + // Model does not exist. We need to create one + DBG(Surrogate, "Generating new model under (%s)", model_path); + std::shared_ptr> torch_model = + std::shared_ptr>( + new SurrogateModel(model_path, is_cpu)); + instances.insert(std::make_pair(std::string(model_path), torch_model)); + return torch_model; + }; + + ~SurrogateModel() + { + DBG(Surrogate, "Destroying surrogate model at %s", model_path.c_str()); + } + + + PERFFASPECT() inline void evaluate(long num_elements, size_t num_in, size_t num_out, @@ -204,7 +269,7 @@ PERFFASPECT() _evaluate(num_elements, num_in, num_out, inputs, outputs); } -PERFFASPECT() + PERFFASPECT() inline void evaluate(long num_elements, std::vector inputs, std::vector outputs) @@ -212,9 +277,17 @@ PERFFASPECT() _evaluate(num_elements, inputs.size(), outputs.size(), - static_cast(inputs.data()), + static_cast< const TypeInValue**>(inputs.data()), static_cast(outputs.data())); } + + bool is_double() { return (tensorOptions.dtype() == torch::kFloat64); } + + bool is_cpu() { return _is_cpu; } }; +template +std::unordered_map>> + SurrogateModel::instances; + #endif diff --git a/src/wf/basedb.hpp b/src/wf/basedb.hpp index 74bb2e3e..5147c7bd 100644 --- a/src/wf/basedb.hpp +++ b/src/wf/basedb.hpp @@ -12,7 +12,9 @@ #include #include #include +#include #include +#include #include #include "AMS.h" @@ -97,6 +99,8 @@ class BaseDB */ virtual std::string type() = 0; + virtual AMSDBType dbType() = 0; + /** * @brief Takes an input and an output vector each holding 1-D vectors data, and * store. them in persistent data storage. @@ -110,6 +114,8 @@ class BaseDB virtual void store(size_t num_elements, std::vector& inputs, std::vector& outputs) = 0; + + uint64_t getId() const { return id; } }; /** @@ -200,19 +206,28 @@ class csvDB final : public FileDB if (!fd.is_open()) { std::cerr << "Cannot open db file: " << this->fn << std::endl; } - DBG(DB, "DB Type: %s", type()) + DBG(DB, "DB Type: %s", type().c_str()) } /** * @brief deconstructs the class and closes the file */ - ~csvDB() { fd.close(); } + ~csvDB() + { + DBG(DB, "Closing File: %s %s", type().c_str(), this->fn.c_str()) + fd.close(); + } /** * @brief Define the type of the DB (File, Redis etc) */ std::string type() override { return "csv"; } + /** + * @brief Return the DB enumerationt type (File, Redis etc) + */ + AMSDBType dbType() { return AMSDBType::CSV; }; + /** * @brief Takes an input and an output vector each holding 1-D vectors data, and * store them into a csv file delimited by ':'. This should never be used for @@ -446,17 +461,26 @@ class hdf5DB final : public FileDB /** * @brief deconstructs the class and closes the file */ - ~hdf5DB() - { - // HDF5 Automatically closes all opened fds at exit of application. - // herr_t err = H5Fclose(HFile); - // HDF5_ERROR(err); + ~hdf5DB(){ + DBG(DB, "Closing File: %s %s", type().c_str(), this->fn.c_str()) + // HDF5 Automatically closes all opened fds at exit of application. + // herr_t err = H5Fclose(HFile); + // HDF5_ERROR(err); } /** * @brief Define the type of the DB */ - std::string type() override { return "hdf5"; } + std::string type() override + { + return "hdf5"; + } + + /** + * @brief Return the DB enumerationt type (File, Redis etc) + */ + AMSDBType dbType() { return AMSDBType::HDF5; }; + /** * @brief Takes an input and an output vector each holding 1-D vectors data, @@ -550,6 +574,12 @@ class RedisDB : public BaseDB inline std::string type() override { return "RedisDB"; } + /** + * @brief Return the DB enumerationt type (File, Redis etc) + */ + AMSDBType dbType() { return AMSDBType::REDIS; }; + + inline std::string info() { return _redis->info(); } // Return the number of keys in the DB @@ -939,18 +969,18 @@ class EventBuffer } evbuffer_unlock(buffer); - std::string result = - std::to_string(self->_rank) + ":"; + std::string result = std::to_string(self->_rank) + ":"; for (int i = 0; i < k - 1; i++) { result.append(std::to_string(data[i]) + ":"); } result.append(std::to_string(data[k - 1]) + "\n"); - + // For resiliency reasons we encode the result in base64 // Not that it increases the size (n) of messages by approx 4*(n/3) std::string result_b64 = self->encode64(result); DBG(RabbitMQDB, - "[rank=%d] #elements (float/double) = %d, stringify size = %d, size in base64 " + "[rank=%d] #elements (float/double) = %d, stringify size = %d, size " + "in base64 " "= %d", self->_rank, k, @@ -1023,7 +1053,8 @@ class EventBuffer : _rank(rank), _loop(loop), _buffer(nullptr), - _rchannel(std::make_shared>(*channel.get())), + _rchannel( + std::make_shared>(*channel.get())), _queue(std::move(queue)), _byte_to_send(0), _counter_ack(0), @@ -1236,19 +1267,20 @@ class RabbitMQDB final : public BaseDB size_t offset = 0; // Offset to have space to write off the dimensions at the begiginning of the array - if (add_dims) - offset = 2; + if (add_dims) offset = 2; - TypeValue* data = (TypeValue*) malloc((nvalues + offset) * sizeof(TypeValue)); + TypeValue* data = + (TypeValue*)malloc((nvalues + offset) * sizeof(TypeValue)); if (add_dims) { - data[0] = (TypeValue) n; - data[1] = (TypeValue) features.size(); + data[0] = (TypeValue)n; + data[1] = (TypeValue)features.size(); } for (size_t d = 0; d < nfeatures; d++) { for (size_t i = 0; i < n; i++) { - data[offset + (i * nfeatures) + d] = static_cast(features[d][i]); + data[offset + (i * nfeatures) + d] = + static_cast(features[d][i]); } } return data; @@ -1262,8 +1294,7 @@ class RabbitMQDB final : public BaseDB */ void start_sender(const std::string& queue) { - _channel_send = - std::make_shared(_connection); + _channel_send = std::make_shared(_connection); _channel_send->onError([&_rank = _rank](const char* message) { CFATAL(RabbitMQDB, false, @@ -1303,13 +1334,14 @@ class RabbitMQDB final : public BaseDB throw std::runtime_error(message); }); - _sender = std::make_shared(); + _sender = std::make_shared(); _sender->loop = _loop_sender; _evbuffer = new EventBuffer(_rank, _loop_sender, _channel_send, _queue_sender); - if (pthread_create(&_sender->id, NULL, start_worker_sender, _sender.get())) { + if (pthread_create( + &_sender->id, NULL, start_worker_sender, _sender.get())) { FATAL(RabbitMQDB, "error pthread_create for sender worker"); } } @@ -1322,8 +1354,7 @@ class RabbitMQDB final : public BaseDB */ void start_receiver(const std::string& queue) { - _channel_receive = - std::make_shared(_connection); + _channel_receive = std::make_shared(_connection); _channel_receive->onError([&_rank = _rank](const char* message) { CFATAL(RabbitMQDB, false, @@ -1363,7 +1394,7 @@ class RabbitMQDB final : public BaseDB throw std::runtime_error(message); }); - _receiver = std::make_shared(); + _receiver = std::make_shared(); _receiver->loop = _loop_receiver; _receiver->channel = _channel_receive; // Structure that will contain all messages received @@ -1433,10 +1464,14 @@ class RabbitMQDB final : public BaseDB #else OPENSSL_init_ssl(0, NULL); #endif - _handler_sender = - std::make_shared(_rank, _loop_sender, rmq_config["rabbitmq-cert"]); - _handler_receiver = - std::make_shared(_rank, _loop_receiver, rmq_config["rabbitmq-cert"]); + _handler_sender = std::make_shared(_rank, + _loop_sender, + rmq_config["rabbitmq-" + "cert"]); + _handler_receiver = std::make_shared(_rank, + _loop_receiver, + rmq_config["rabbitmq-" + "cert"]); AMQP::Login login(rmq_config["rabbitmq-user"], rmq_config["rabbitmq-password"]); @@ -1588,6 +1623,8 @@ class RabbitMQDB final : public BaseDB */ std::string type() override { return "rabbitmq"; } + AMSDBType dbType() { return AMSDBType::RMQ; }; + /** * @brief Return the number of messages that * has been push to the buffer @@ -1626,15 +1663,6 @@ class RabbitMQDB final : public BaseDB template BaseDB* createDB(char* dbPath, AMSDBType dbType, uint64_t rId = 0) { - DBG(DB, "Instantiating data base"); -#ifdef __ENABLE_DB__ - if (dbPath == nullptr) { - std::cerr << " [WARNING] Path of DB is NULL, Please provide a valid path " - "to enable db\n"; - std::cerr << " [WARNING] Continueing\n"; - return nullptr; - } - switch (dbType) { case AMSDBType::CSV: return new csvDB(dbPath, rId); @@ -1654,8 +1682,61 @@ BaseDB* createDB(char* dbPath, AMSDBType dbType, uint64_t rId = 0) return nullptr; } #else - return nullptr; +return nullptr; #endif } + +/** + * @brief get a data base object referred by this string. + * This should never be used for large scale simulations as txt/csv format will + * be extremely slow. + * @param[in] dbPath path to the directory storing the data + * @param[in] dbType Type of the database to create + * @param[in] rId a unique Id for each process taking part in a distributed + * execution (rank-id) + */ +template +std::shared_ptr> getDB(char* dbPath, + AMSDBType dbType, + uint64_t rId = 0) +{ + static std::unordered_map>> + instances; +#ifdef __ENABLE_DB__ + if (dbPath == nullptr) { + std::cerr << " [WARNING] Path of DB is NULL, Please provide a valid path " + "to enable db\n"; + std::cerr << " [WARNING] Continueing\n"; + return nullptr; + } + + auto db_iter = instances.find(std::string(dbPath)); + if (db_iter == instances.end()) { + DBG(DB, "Creating new Database writting to file: %s", dbPath); + std::shared_ptr> db = std::shared_ptr>( + createDB(dbPath, dbType, rId)); + instances.insert(std::make_pair(std::string(dbPath), db)); + return db; + } + + auto db = db_iter->second; + // Corner case where creation of the db failed and someone is requesting + // the same entry point + if (db == nullptr) { + return db; + } + + if (db->dbType() != dbType) { + throw std::runtime_error("Requesting databases of different types"); + } + + if (db->getId() != rId) { + throw std::runtime_error("Requesting databases from different ranks"); + } + DBG(DB, "Using existing Database writting to file: %s", dbPath); + + return db; +} + #endif // __AMS_BASE_DB__ diff --git a/src/wf/utils.hpp b/src/wf/utils.hpp index 3665fbe4..711a6b86 100644 --- a/src/wf/utils.hpp +++ b/src/wf/utils.hpp @@ -14,7 +14,6 @@ #include #include - // ----------------------------------------------------------------------------- // ----------------------------------------------------------------------------- @@ -56,6 +55,11 @@ void random_uq_host(bool *uq_flags, int ndata, double acceptable_error) } } +template +inline bool is_real_equal(T l, T r) +{ + return r == std::nextafter(l, r); +} // ----------------------------------------------------------------------------- #endif diff --git a/src/wf/workflow.hpp b/src/wf/workflow.hpp index 88c72902..02ec40e1 100644 --- a/src/wf/workflow.hpp +++ b/src/wf/workflow.hpp @@ -18,7 +18,6 @@ #include "AMS.h" #include "ml/hdcache.hpp" #include "ml/surrogate.hpp" - #include "wf/basedb.hpp" #ifdef __ENABLE_MPI__ @@ -50,21 +49,21 @@ class AMSWorkflow AMSPhysicFn AppCall; /** @brief The module that performs uncertainty quantification (UQ) */ - HDCache *hdcache; + std::shared_ptr> hdcache; /** The metric/type of UQ we will use to select between physics and ml computations **/ const AMSUQPolicy uqPolicy = AMSUQPolicy::FAISSMean; /** The Number of clusters we will use to compute FAISS UQ **/ - const int nClusters=10; + const int nClusters = 10; /** @brief The torch surrogate model to replace the original physics function */ - SurrogateModel *surrogate; + std::shared_ptr> surrogate; /** @brief The database to store data for which we cannot apply the current * model */ - BaseDB *DB; + std::shared_ptr> DB; /** @brief The type of the database we will use (HDF5, CSV, etc) */ AMSDBType dbType = AMSDBType::None; @@ -108,8 +107,7 @@ class AMSWorkflow const int numOut = outputs.size(); // No database, so just de-allocate and return - if (DB == nullptr) - return; + if (DB) return; std::vector hInputs, hOutputs; @@ -136,12 +134,16 @@ class AMSWorkflow size_t actualElems = std::min(elPerDim, num_elements - i); // Copy input data to host for (int k = 0; k < numIn; k++) { - ams::ResourceManager::copy(&inputs[k][i], hInputs[k], actualElems * sizeof(FPTypeValue)); + ams::ResourceManager::copy(&inputs[k][i], + hInputs[k], + actualElems * sizeof(FPTypeValue)); } // Copy output data to host for (int k = 0; k < numIn; k++) { - ams::ResourceManager::copy(&outputs[k][i], hOutputs[k], actualElems * sizeof(FPTypeValue)); + ams::ResourceManager::copy(&outputs[k][i], + hOutputs[k], + actualElems * sizeof(FPTypeValue)); } // Store to database @@ -163,7 +165,7 @@ class AMSWorkflow mLoc(AMSResourceType::DEVICE), ePolicy(AMSExecPolicy::UBALANCED) { - if (isCPU){ + if (isCPU) { mLoc = AMSResourceType::HOST; } @@ -184,7 +186,7 @@ class AMSWorkflow const int nClusters, int _pId = 0, int _wSize = 1, - AMSExecPolicy policy= AMSExecPolicy::UBALANCED) + AMSExecPolicy policy = AMSExecPolicy::UBALANCED) : AppCall(_AppCall), dbType(dbType), rId(_pId), @@ -193,27 +195,28 @@ class AMSWorkflow mLoc(AMSResourceType::DEVICE), ePolicy(policy) { - if (isCPU){ + if (isCPU) { mLoc = AMSResourceType::HOST; } surrogate = nullptr; - if (surrogate_path != nullptr) - surrogate = new SurrogateModel(surrogate_path, is_cpu); + if (surrogate_path) + surrogate = + SurrogateModel::getInstance(surrogate_path, is_cpu); // TODO: Fix magic number. 10 represents the number of neighbours I am // looking at. - if (uq_path != nullptr) - hdcache = new HDCache(uq_path, !is_cpu, - uqPolicy, nClusters, threshold); + if (uq_path) + hdcache = HDCache::getInstance( + uq_path, !is_cpu, uqPolicy, nClusters, threshold); else // This is a random hdcache returning true %threshold queries - hdcache = new HDCache(!is_cpu, threshold); + hdcache = HDCache::getInstance(!is_cpu, threshold); DB = nullptr; - if (db_path != nullptr) { + if (db_path) { DBG(Workflow, "Creating Database"); - DB = createDB(db_path, dbType, rId); + DB = getDB(db_path, dbType, rId); } } @@ -229,11 +232,6 @@ class AMSWorkflow ~AMSWorkflow() { DBG(Workflow, "Destroying Workflow Handler"); - if (hdcache) delete hdcache; - - if (surrogate) delete surrogate; - - if (DB) delete DB; } @@ -291,17 +289,22 @@ class AMSWorkflow int outputDim, MPI_Comm Comm = nullptr) { - CDEBUG(Workflow, rId==0, "Entering Evaluate " - "with problem dimensions [(%d, %d, %d, %d)]", - totalElements, inputDim, totalElements, outputDim); + CDEBUG(Workflow, + rId == 0, + "Entering Evaluate " + "with problem dimensions [(%d, %d, %d, %d)]", + totalElements, + inputDim, + totalElements, + outputDim); // To move around the inputs, outputs we bundle them as std::vectors std::vector origInputs(inputs, inputs + inputDim); std::vector origOutputs(outputs, outputs + outputDim); REPORT_MEM_USAGE(Workflow, "Start") - if ( surrogate == nullptr ){ - FPTypeValue **tmpInputs = const_cast(inputs); + if (!surrogate) { + FPTypeValue **tmpInputs = const_cast(inputs); std::vector tmpIn(tmpInputs, tmpInputs + inputDim); DBG(Workflow, "No-Model, I am calling Physics code (for all data)"); @@ -309,10 +312,9 @@ class AMSWorkflow totalElements, reinterpret_cast(origInputs.data()), reinterpret_cast(origOutputs.data())); - if (DB != nullptr) { + if (DB) { CALIPER(CALI_MARK_BEGIN("DBSTORE");) - Store(totalElements, tmpIn, - origOutputs); + Store(totalElements, tmpIn, origOutputs); CALIPER(CALI_MARK_END("DBSTORE");) } return; @@ -324,7 +326,7 @@ class AMSWorkflow // STEP 1: call the hdcache to look at input uncertainties // to decide if making a ML inference makes sense // ------------------------------------------------------------- - if (hdcache != nullptr) { + if (hdcache) { CALIPER(CALI_MARK_BEGIN("UQ_MODULE");) hdcache->evaluate(totalElements, origInputs, p_ml_acceptable); CALIPER(CALI_MARK_END("UQ_MODULE");) @@ -369,38 +371,36 @@ class AMSWorkflow } { - void** iPtr = reinterpret_cast(packedInputs.data()); - void** oPtr = reinterpret_cast(packedOutputs.data()); + void **iPtr = reinterpret_cast(packedInputs.data()); + void **oPtr = reinterpret_cast(packedOutputs.data()); long lbElements = packedElements; #ifdef __ENABLE_MPI__ - CALIPER(CALI_MARK_BEGIN("LOAD BALANCE MODULE");) - AMSLoadBalancer lBalancer(rId, wSize, packedElements, Comm, inputDim, outputDim, mLoc); - if (ePolicy == AMSExecPolicy::BALANCED && Comm) { - lBalancer.scatterInputs(packedInputs, mLoc); - iPtr = reinterpret_cast(lBalancer.inputs()); - oPtr = reinterpret_cast(lBalancer.outputs()); - lbElements = lBalancer.getBalancedSize(); - } - CALIPER(CALI_MARK_END("LOAD BALANCE MODULE");) + CALIPER(CALI_MARK_BEGIN("LOAD BALANCE MODULE");) + AMSLoadBalancer lBalancer( + rId, wSize, packedElements, Comm, inputDim, outputDim, mLoc); + if (ePolicy == AMSExecPolicy::BALANCED && Comm) { + lBalancer.scatterInputs(packedInputs, mLoc); + iPtr = reinterpret_cast(lBalancer.inputs()); + oPtr = reinterpret_cast(lBalancer.outputs()); + lbElements = lBalancer.getBalancedSize(); + } + CALIPER(CALI_MARK_END("LOAD BALANCE MODULE");) #endif - // ---- 3b: call the physics module and store in the data base - if (packedElements > 0 ) { - CALIPER(CALI_MARK_BEGIN("PHYSICS MODULE");) - AppCall(probDescr, - lbElements, - iPtr, - oPtr); - CALIPER(CALI_MARK_END("PHYSICS MODULE");) - } + // ---- 3b: call the physics module and store in the data base + if (packedElements > 0) { + CALIPER(CALI_MARK_BEGIN("PHYSICS MODULE");) + AppCall(probDescr, lbElements, iPtr, oPtr); + CALIPER(CALI_MARK_END("PHYSICS MODULE");) + } #ifdef __ENABLE_MPI__ - CALIPER(CALI_MARK_BEGIN("LOAD BALANCE MODULE");) - if (ePolicy == AMSExecPolicy::BALANCED && Comm) { - lBalancer.gatherOutputs(packedOutputs, mLoc); - } - CALIPER(CALI_MARK_END("LOAD BALANCE MODULE");) + CALIPER(CALI_MARK_BEGIN("LOAD BALANCE MODULE");) + if (ePolicy == AMSExecPolicy::BALANCED && Comm) { + lBalancer.gatherOutputs(packedOutputs, mLoc); + } + CALIPER(CALI_MARK_END("LOAD BALANCE MODULE");) #endif } @@ -409,9 +409,11 @@ class AMSWorkflow DBG(Workflow, "Finished physics evaluation") - if (DB != nullptr) { + if (DB) { CALIPER(CALI_MARK_BEGIN("DBSTORE");) - DBG(Workflow, "Storing data (#elements = %d) to database", packedElements); + DBG(Workflow, + "Storing data (#elements = %d) to database", + packedElements); Store(packedElements, packedInputs, packedOutputs); CALIPER(CALI_MARK_END("DBSTORE");) } @@ -427,9 +429,13 @@ class AMSWorkflow ams::ResourceManager::deallocate(p_ml_acceptable, mLoc); DBG(Workflow, "Finished AMSExecution") - CINFO(Workflow, rId == 0, "Computed %ld " - "using physics out of the %ld items (%.2f)", - packedElements, totalElements, (float) (packedElements) / float( totalElements)) + CINFO(Workflow, + rId == 0, + "Computed %ld " + "using physics out of the %ld items (%.2f)", + packedElements, + totalElements, + (float)(packedElements) / float(totalElements)) REPORT_MEM_USAGE(Workflow, "End") } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1b45e5c2..268ba56d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -24,4 +24,5 @@ endfunction() ADDTEST(ams_allocator ams_allocate.cpp AMSAllocate) ADDTEST(ams_packing cpu_packing_test.cpp AMSPack) -ADDTEST(ams_inference torch_model.cpp AMSInfer ${CMAKE_CURRENT_SOURCE_DIR}/debug_model.pt) +ADDTEST(ams_inference_double torch_model.cpp AMSInferDouble ${CMAKE_CURRENT_SOURCE_DIR}/debug_model.pt "double") +ADDTEST(ams_inference_single torch_model.cpp AMSInferSingle ${CMAKE_CURRENT_SOURCE_DIR}/debug_model.pt "single") diff --git a/tests/torch_model.cpp b/tests/torch_model.cpp index 2be4dfef..074800dc 100644 --- a/tests/torch_model.cpp +++ b/tests/torch_model.cpp @@ -7,21 +7,20 @@ #include +#include #include #include #include #include #include -#include #include #define SIZE (32L * 1024L + 3L) template -void inference(char *path, int device, AMSResourceType resource) +void inference(SurrogateModel &model, AMSResourceType resource) { using namespace ams; - SurrogateModel model(path, !device); std::vector inputs; std::vector outputs; @@ -46,7 +45,6 @@ void inference(char *path, int device, AMSResourceType resource) int main(int argc, char *argv[]) { using namespace ams; - using data_handler = DataHandler; auto &rm = umpire::ResourceManager::getInstance(); int use_device = std::atoi(argv[1]); char *model_path = argv[2]; @@ -60,7 +58,17 @@ int main(int argc, char *argv[]) resource = AMSResourceType::DEVICE; } - inference(model_path, use_device, resource); + if (std::strcmp("double", data_type) == 0) { + std::shared_ptr> model = + SurrogateModel::getInstance(model_path, !use_device); + assert(model->is_double()); + inference(*model, resource); + } else if (std::strcmp("single", data_type) == 0) { + std::shared_ptr> model = + SurrogateModel::getInstance(model_path, !use_device); + assert(!model->is_double()); + inference(*model, resource); + } return 0; }