Skip to content

Commit

Permalink
Move service discovery to objectsmanager and some clean up (QC-193, Q…
Browse files Browse the repository at this point in the history
…C-189) (#184)

* Also some fixes for the ServiceDiscovery
  • Loading branch information
Barthelemy authored Jun 17, 2019
1 parent f1e9ca9 commit 3333d46
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 27 deletions.
6 changes: 6 additions & 0 deletions Framework/advanced.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 6 additions & 0 deletions Framework/basic-no-sampling.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 6 additions & 0 deletions Framework/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 6 additions & 0 deletions Framework/example-default.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 3 additions & 3 deletions Framework/include/QualityControl/ObjectsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ObjectsManager
friend class TaskControl; // TaskControl must be able to call "publish()" whenever needed. Nobody else can.

public:
ObjectsManager(TaskConfig& taskConfig, std::shared_ptr<ServiceDiscovery> serviceDiscovery = nullptr);
ObjectsManager(TaskConfig& taskConfig);
virtual ~ObjectsManager();

/**
Expand Down Expand Up @@ -120,8 +120,8 @@ class ObjectsManager

private:
TObjArray mMonitorObjects;
std::string mTaskName;
std::shared_ptr<ServiceDiscovery> mServiceDiscovery;
TaskConfig& mTaskConfig;
std::unique_ptr<ServiceDiscovery> mServiceDiscovery;
bool mUpdateServiceDiscovery;
};

Expand Down
3 changes: 1 addition & 2 deletions Framework/include/QualityControl/TaskConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ namespace o2::quality_control::core
{

/// \brief Container for the configuration of a Task
///
/// \author Barthelemy von Haller
struct TaskConfig {
std::string taskName;
std::string moduleName;
std::string className;
int cycleDurationSeconds;
int maxNumberCycles;
std::string consulUrl;
};

} // namespace o2::quality_control::core
Expand Down
1 change: 0 additions & 1 deletion Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class TaskRunner : public framework::Task
std::shared_ptr<TaskInterface> mTask;
bool mResetAfterPublish;
std::shared_ptr<ObjectsManager> mObjectsManager;
std::shared_ptr<ServiceDiscovery> mServiceDiscovery;

// consider moving these to TaskConfig
framework::Inputs mInputSpecs;
Expand Down
6 changes: 6 additions & 0 deletions Framework/readout-no-sampling.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
5 changes: 4 additions & 1 deletion Framework/readout.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
7 changes: 5 additions & 2 deletions Framework/src/ObjectsManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ using namespace std;
namespace o2::quality_control::core
{

ObjectsManager::ObjectsManager(TaskConfig& taskConfig, std::shared_ptr<ServiceDiscovery> serviceDiscovery) : mTaskName(taskConfig.taskName), mServiceDiscovery(serviceDiscovery), mUpdateServiceDiscovery(false)
ObjectsManager::ObjectsManager(TaskConfig& taskConfig) : mTaskConfig(taskConfig), mUpdateServiceDiscovery(false)
{
mMonitorObjects.SetOwner(true);

// register with the discovery service
mServiceDiscovery = std::make_unique<ServiceDiscovery>(taskConfig.consulUrl, taskConfig.taskName);
}

ObjectsManager::~ObjectsManager()
Expand All @@ -40,7 +43,7 @@ void ObjectsManager::startPublishing(TObject* object)
<< infologger::endm;
BOOST_THROW_EXCEPTION(DuplicateObjectError() << errinfo_object_name(object->GetName()));
}
auto* newObject = new MonitorObject(object, mTaskName);
auto* newObject = new MonitorObject(object, mTaskConfig.taskName);
newObject->setIsOwner(false);
mMonitorObjects.Add(newObject);
mUpdateServiceDiscovery = true;
Expand Down
23 changes: 18 additions & 5 deletions Framework/src/ServiceDiscovery.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ ServiceDiscovery::ServiceDiscovery(const std::string& url, const std::string& id
ServiceDiscovery::~ServiceDiscovery()
{
mThreadRunning = false;
mHealthThread.join();
if (mHealthThread.joinable()) {
mHealthThread.join();
}
deregister();
}

Expand Down Expand Up @@ -102,12 +104,23 @@ void ServiceDiscovery::runHealthServer(unsigned int port)
try {
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port));
boost::asio::deadline_timer timer(io_service);
while (mThreadRunning) {
io_service.reset();
timer.expires_from_now(boost::posix_time::seconds(1));
timer.async_wait([&](boost::system::error_code ec) {
if (!ec)
acceptor.cancel();
});
tcp::socket socket(io_service);
acceptor.accept(socket);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
acceptor.async_accept(socket, [&](boost::system::error_code ec) {
if (!ec)
timer.cancel();
});
std::this_thread::sleep_for(std::chrono::seconds(1));
}
} catch (std::exception& e) {
mThreadRunning = false;
std::cerr << e.what() << std::endl;
}
}
Expand All @@ -129,10 +142,10 @@ void ServiceDiscovery::send(const std::string& path, std::string&& post)
response = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode);
if (response != CURLE_OK) {
std::cerr << curl_easy_strerror(response) << std::endl;
std::cerr << "ServiceDiscovery: " << curl_easy_strerror(response) << ": " << uri << std::endl;
}
if (responseCode < 200 || responseCode > 206) {
std::cerr << "Response code : " << responseCode << std::endl;
std::cerr << "ServiceDiscovery: Response code: " << responseCode << std::endl;
}
}
} // namespace o2::quality_control::core
8 changes: 2 additions & 6 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ TaskRunner::TaskRunner(const std::string& taskName, const std::string& configura
: mDeviceName(createTaskRunnerIdString() + "-" + taskName),
mTask(nullptr),
mResetAfterPublish(false),
mServiceDiscovery(std::make_shared<ServiceDiscovery>("http://consul-test.cern.ch:8500", taskName, "")),
mMonitorObjectsSpec({ "mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id),
mNumberBlocks(0),
mLastNumberObjects(0),
Expand All @@ -57,10 +56,6 @@ TaskRunner::TaskRunner(const std::string& taskName, const std::string& configura
// setup configuration
mConfigFile = ConfigurationFactory::getConfiguration(configurationSource);
populateConfig(taskName);

// register with the discovery service
mServiceDiscovery = std::make_unique<ServiceDiscovery>("http://consul-test.cern.ch:8500", mTaskConfig.taskName, "");
mServiceDiscovery->_register("obj1,obj2,obj3");
}

TaskRunner::~TaskRunner() = default;
Expand All @@ -80,7 +75,7 @@ void TaskRunner::init(InitContext& iCtx)
mCollector->enableProcessMonitoring();

// setup publisher
mObjectsManager = std::make_shared<ObjectsManager>(mTaskConfig, mServiceDiscovery);
mObjectsManager = std::make_shared<ObjectsManager>(mTaskConfig);

// setup user's task
TaskFactory f;
Expand Down Expand Up @@ -257,6 +252,7 @@ void TaskRunner::populateConfig(std::string taskName)
mTaskConfig.className = taskConfigTree->second.get<std::string>("className");
mTaskConfig.cycleDurationSeconds = taskConfigTree->second.get<int>("cycleDurationSeconds", 10);
mTaskConfig.maxNumberCycles = taskConfigTree->second.get<int>("maxNumberCycles", -1);
mTaskConfig.consulUrl = mConfigFile->get<std::string>("qc.config.consul.url", "http://consul-test.cern.ch:8500");

auto policiesFilePath = mConfigFile->get<std::string>("dataSamplingPolicyFile", "");
ConfigurationInterface* config = policiesFilePath.empty() ? mConfigFile.get() : ConfigurationFactory::getConfiguration(policiesFilePath).get();
Expand Down
14 changes: 7 additions & 7 deletions Framework/test/testDbFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ BOOST_AUTO_TEST_CASE(db_ccdb_listing)

// test getting objects list from task
auto objectNames = ccdb->getPublishedObjectNames("functional_test");
// cout << "objects in task functional_test" << endl;
// for (auto name : objectNames) {
// cout << " - object : " << name << endl;
// }
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "object1") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "object2") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "path/to/object3") != objectNames.end());
// cout << "objects in task functional_test" << endl;
// for (auto name : objectNames) {
// cout << " - object : " << name << endl;
// }
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/object1") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/object2") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/path\\/to\\/object3") != objectNames.end());

// store list of streamer infos
// ccdb->storeStreamerInfosToFile("streamerinfos.root");
Expand Down
1 change: 1 addition & 0 deletions Framework/test/testInfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ using namespace o2::framework;

BOOST_AUTO_TEST_CASE(qc_factory_local_test)
{
BOOST_REQUIRE_NE(getenv("QUALITYCONTROL_ROOT"), nullptr);
std::string configFilePath = std::string("json:/") + getenv("QUALITYCONTROL_ROOT") + "/tests/testQCFactory.json";

{
Expand Down
9 changes: 9 additions & 0 deletions Framework/test/testObjectsManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@ using namespace AliceO2::Common;
namespace o2::quality_control::core
{

BOOST_AUTO_TEST_CASE(invalid_url_test)
{
TaskConfig config;
config.taskName = "test";
config.consulUrl = "bad-url:1234";
ObjectsManager objectsManager(config);
}

BOOST_AUTO_TEST_CASE(duplicate_object_test)
{
TaskConfig config;
config.taskName = "test";
config.consulUrl = "http://consul-test.cern.ch:8500";
ObjectsManager objectsManager(config);
TObjString s("content");
objectsManager.startPublishing(&s);
Expand Down

0 comments on commit 3333d46

Please sign in to comment.