From 3333d46a6b8afde58c728935eceb7f6acccdd14f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barth=C3=A9l=C3=A9my=20von=20Haller?= Date: Mon, 17 Jun 2019 17:01:55 +0200 Subject: [PATCH] Move service discovery to objectsmanager and some clean up (QC-193, QC-189) (#184) * Also some fixes for the ServiceDiscovery --- Framework/advanced.json | 6 +++++ Framework/basic-no-sampling.json | 6 +++++ Framework/basic.json | 6 +++++ Framework/example-default.json | 6 +++++ .../include/QualityControl/ObjectsManager.h | 6 ++--- Framework/include/QualityControl/TaskConfig.h | 3 +-- Framework/include/QualityControl/TaskRunner.h | 1 - Framework/readout-no-sampling.json | 6 +++++ Framework/readout.json | 5 +++- Framework/src/ObjectsManager.cxx | 7 ++++-- Framework/src/ServiceDiscovery.cxx | 23 +++++++++++++++---- Framework/src/TaskRunner.cxx | 8 ++----- Framework/test/testDbFactory.cxx | 14 +++++------ .../test/testInfrastructureGenerator.cxx | 1 + Framework/test/testObjectsManager.cxx | 9 ++++++++ 15 files changed, 80 insertions(+), 27 deletions(-) diff --git a/Framework/advanced.json b/Framework/advanced.json index 283656d5de..6daad6da86 100644 --- a/Framework/advanced.json +++ b/Framework/advanced.json @@ -11,6 +11,12 @@ "Activity": { "number": "42", "type": "2" + }, + "monitoring": { + "url": "infologger:///debug?qc" + }, + "consul": { + "url": "http://consul-test.cern.ch:8500" } }, "tasks": { diff --git a/Framework/basic-no-sampling.json b/Framework/basic-no-sampling.json index 1147fb581d..371c5088ec 100644 --- a/Framework/basic-no-sampling.json +++ b/Framework/basic-no-sampling.json @@ -11,6 +11,12 @@ "Activity": { "number": "42", "type": "2" + }, + "monitoring": { + "url": "infologger:///debug?qc" + }, + "consul": { + "url": "http://consul-test.cern.ch:8500" } }, "tasks": { diff --git a/Framework/basic.json b/Framework/basic.json index 38689eb5df..0fa5616605 100644 --- a/Framework/basic.json +++ b/Framework/basic.json @@ -11,6 +11,12 @@ "Activity": { "number": "42", "type": "2" + }, + "monitoring": { + "url": "infologger:///debug?qc" + }, + "consul": { + "url": "http://consul-test.cern.ch:8500" } }, "tasks": { diff --git a/Framework/example-default.json b/Framework/example-default.json index 22ea380c87..bf76fde3fa 100644 --- a/Framework/example-default.json +++ b/Framework/example-default.json @@ -11,6 +11,12 @@ "Activity": { "number": "42", "type": "2" + }, + "monitoring": { + "url": "infologger:///debug?qc" + }, + "consul": { + "url": "http://consul-test.cern.ch:8500" } }, "tasks": { diff --git a/Framework/include/QualityControl/ObjectsManager.h b/Framework/include/QualityControl/ObjectsManager.h index d1c69e9ba3..13af1fcef0 100644 --- a/Framework/include/QualityControl/ObjectsManager.h +++ b/Framework/include/QualityControl/ObjectsManager.h @@ -43,7 +43,7 @@ class ObjectsManager friend class TaskControl; // TaskControl must be able to call "publish()" whenever needed. Nobody else can. public: - ObjectsManager(TaskConfig& taskConfig, std::shared_ptr serviceDiscovery = nullptr); + ObjectsManager(TaskConfig& taskConfig); virtual ~ObjectsManager(); /** @@ -120,8 +120,8 @@ class ObjectsManager private: TObjArray mMonitorObjects; - std::string mTaskName; - std::shared_ptr mServiceDiscovery; + TaskConfig& mTaskConfig; + std::unique_ptr mServiceDiscovery; bool mUpdateServiceDiscovery; }; diff --git a/Framework/include/QualityControl/TaskConfig.h b/Framework/include/QualityControl/TaskConfig.h index 6828f46f3a..a77c5d8c7a 100644 --- a/Framework/include/QualityControl/TaskConfig.h +++ b/Framework/include/QualityControl/TaskConfig.h @@ -22,14 +22,13 @@ namespace o2::quality_control::core { /// \brief Container for the configuration of a Task -/// -/// \author Barthelemy von Haller struct TaskConfig { std::string taskName; std::string moduleName; std::string className; int cycleDurationSeconds; int maxNumberCycles; + std::string consulUrl; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index b3c99eefbf..fffcddad2f 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -117,7 +117,6 @@ class TaskRunner : public framework::Task std::shared_ptr mTask; bool mResetAfterPublish; std::shared_ptr mObjectsManager; - std::shared_ptr mServiceDiscovery; // consider moving these to TaskConfig framework::Inputs mInputSpecs; diff --git a/Framework/readout-no-sampling.json b/Framework/readout-no-sampling.json index 91bdce967b..eaee957f7a 100644 --- a/Framework/readout-no-sampling.json +++ b/Framework/readout-no-sampling.json @@ -11,6 +11,12 @@ "Activity": { "number": "42", "type": "2" + }, + "monitoring": { + "url": "infologger:///debug?qc" + }, + "consul": { + "url": "http://consul-test.cern.ch:8500" } }, "tasks": { diff --git a/Framework/readout.json b/Framework/readout.json index 59bec23672..d47d5de167 100644 --- a/Framework/readout.json +++ b/Framework/readout.json @@ -13,7 +13,10 @@ "type": "2" }, "monitoring": { - "url": "infologger:///debug?qc" + "url": "infologger:///debug?qc" + }, + "consul": { + "url": "http://consul-test.cern.ch:8500" } }, "tasks": { diff --git a/Framework/src/ObjectsManager.cxx b/Framework/src/ObjectsManager.cxx index 1e8f8a1c57..70a0da7221 100644 --- a/Framework/src/ObjectsManager.cxx +++ b/Framework/src/ObjectsManager.cxx @@ -24,9 +24,12 @@ using namespace std; namespace o2::quality_control::core { -ObjectsManager::ObjectsManager(TaskConfig& taskConfig, std::shared_ptr serviceDiscovery) : mTaskName(taskConfig.taskName), mServiceDiscovery(serviceDiscovery), mUpdateServiceDiscovery(false) +ObjectsManager::ObjectsManager(TaskConfig& taskConfig) : mTaskConfig(taskConfig), mUpdateServiceDiscovery(false) { mMonitorObjects.SetOwner(true); + + // register with the discovery service + mServiceDiscovery = std::make_unique(taskConfig.consulUrl, taskConfig.taskName); } ObjectsManager::~ObjectsManager() @@ -40,7 +43,7 @@ void ObjectsManager::startPublishing(TObject* object) << infologger::endm; BOOST_THROW_EXCEPTION(DuplicateObjectError() << errinfo_object_name(object->GetName())); } - auto* newObject = new MonitorObject(object, mTaskName); + auto* newObject = new MonitorObject(object, mTaskConfig.taskName); newObject->setIsOwner(false); mMonitorObjects.Add(newObject); mUpdateServiceDiscovery = true; diff --git a/Framework/src/ServiceDiscovery.cxx b/Framework/src/ServiceDiscovery.cxx index c4ab7cd3cf..4c2c87f48b 100644 --- a/Framework/src/ServiceDiscovery.cxx +++ b/Framework/src/ServiceDiscovery.cxx @@ -38,7 +38,9 @@ ServiceDiscovery::ServiceDiscovery(const std::string& url, const std::string& id ServiceDiscovery::~ServiceDiscovery() { mThreadRunning = false; - mHealthThread.join(); + if (mHealthThread.joinable()) { + mHealthThread.join(); + } deregister(); } @@ -102,12 +104,23 @@ void ServiceDiscovery::runHealthServer(unsigned int port) try { boost::asio::io_service io_service; tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port)); + boost::asio::deadline_timer timer(io_service); while (mThreadRunning) { + io_service.reset(); + timer.expires_from_now(boost::posix_time::seconds(1)); + timer.async_wait([&](boost::system::error_code ec) { + if (!ec) + acceptor.cancel(); + }); tcp::socket socket(io_service); - acceptor.accept(socket); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + acceptor.async_accept(socket, [&](boost::system::error_code ec) { + if (!ec) + timer.cancel(); + }); + std::this_thread::sleep_for(std::chrono::seconds(1)); } } catch (std::exception& e) { + mThreadRunning = false; std::cerr << e.what() << std::endl; } } @@ -129,10 +142,10 @@ void ServiceDiscovery::send(const std::string& path, std::string&& post) response = curl_easy_perform(curl); curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode); if (response != CURLE_OK) { - std::cerr << curl_easy_strerror(response) << std::endl; + std::cerr << "ServiceDiscovery: " << curl_easy_strerror(response) << ": " << uri << std::endl; } if (responseCode < 200 || responseCode > 206) { - std::cerr << "Response code : " << responseCode << std::endl; + std::cerr << "ServiceDiscovery: Response code: " << responseCode << std::endl; } } } // namespace o2::quality_control::core diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 8d4012a868..bd57b469d5 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -46,7 +46,6 @@ TaskRunner::TaskRunner(const std::string& taskName, const std::string& configura : mDeviceName(createTaskRunnerIdString() + "-" + taskName), mTask(nullptr), mResetAfterPublish(false), - mServiceDiscovery(std::make_shared("http://consul-test.cern.ch:8500", taskName, "")), mMonitorObjectsSpec({ "mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id), mNumberBlocks(0), mLastNumberObjects(0), @@ -57,10 +56,6 @@ TaskRunner::TaskRunner(const std::string& taskName, const std::string& configura // setup configuration mConfigFile = ConfigurationFactory::getConfiguration(configurationSource); populateConfig(taskName); - - // register with the discovery service - mServiceDiscovery = std::make_unique("http://consul-test.cern.ch:8500", mTaskConfig.taskName, ""); - mServiceDiscovery->_register("obj1,obj2,obj3"); } TaskRunner::~TaskRunner() = default; @@ -80,7 +75,7 @@ void TaskRunner::init(InitContext& iCtx) mCollector->enableProcessMonitoring(); // setup publisher - mObjectsManager = std::make_shared(mTaskConfig, mServiceDiscovery); + mObjectsManager = std::make_shared(mTaskConfig); // setup user's task TaskFactory f; @@ -257,6 +252,7 @@ void TaskRunner::populateConfig(std::string taskName) mTaskConfig.className = taskConfigTree->second.get("className"); mTaskConfig.cycleDurationSeconds = taskConfigTree->second.get("cycleDurationSeconds", 10); mTaskConfig.maxNumberCycles = taskConfigTree->second.get("maxNumberCycles", -1); + mTaskConfig.consulUrl = mConfigFile->get("qc.config.consul.url", "http://consul-test.cern.ch:8500"); auto policiesFilePath = mConfigFile->get("dataSamplingPolicyFile", ""); ConfigurationInterface* config = policiesFilePath.empty() ? mConfigFile.get() : ConfigurationFactory::getConfiguration(policiesFilePath).get(); diff --git a/Framework/test/testDbFactory.cxx b/Framework/test/testDbFactory.cxx index a0d5f86028..b4502278d7 100644 --- a/Framework/test/testDbFactory.cxx +++ b/Framework/test/testDbFactory.cxx @@ -92,13 +92,13 @@ BOOST_AUTO_TEST_CASE(db_ccdb_listing) // test getting objects list from task auto objectNames = ccdb->getPublishedObjectNames("functional_test"); - // cout << "objects in task functional_test" << endl; - // for (auto name : objectNames) { - // cout << " - object : " << name << endl; - // } - BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "object1") != objectNames.end()); - BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "object2") != objectNames.end()); - BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "path/to/object3") != objectNames.end()); + // cout << "objects in task functional_test" << endl; + // for (auto name : objectNames) { + // cout << " - object : " << name << endl; + // } + BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/object1") != objectNames.end()); + BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/object2") != objectNames.end()); + BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/path\\/to\\/object3") != objectNames.end()); // store list of streamer infos // ccdb->storeStreamerInfosToFile("streamerinfos.root"); diff --git a/Framework/test/testInfrastructureGenerator.cxx b/Framework/test/testInfrastructureGenerator.cxx index 8c79fec0ca..c98dd2b64d 100644 --- a/Framework/test/testInfrastructureGenerator.cxx +++ b/Framework/test/testInfrastructureGenerator.cxx @@ -29,6 +29,7 @@ using namespace o2::framework; BOOST_AUTO_TEST_CASE(qc_factory_local_test) { + BOOST_REQUIRE_NE(getenv("QUALITYCONTROL_ROOT"), nullptr); std::string configFilePath = std::string("json:/") + getenv("QUALITYCONTROL_ROOT") + "/tests/testQCFactory.json"; { diff --git a/Framework/test/testObjectsManager.cxx b/Framework/test/testObjectsManager.cxx index 5c7fac2f6f..a8ab0faa7b 100644 --- a/Framework/test/testObjectsManager.cxx +++ b/Framework/test/testObjectsManager.cxx @@ -30,10 +30,19 @@ using namespace AliceO2::Common; namespace o2::quality_control::core { +BOOST_AUTO_TEST_CASE(invalid_url_test) +{ + TaskConfig config; + config.taskName = "test"; + config.consulUrl = "bad-url:1234"; + ObjectsManager objectsManager(config); +} + BOOST_AUTO_TEST_CASE(duplicate_object_test) { TaskConfig config; config.taskName = "test"; + config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config); TObjString s("content"); objectsManager.startPublishing(&s);