diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 8033936b9a..3be9c2bddf 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -74,6 +74,8 @@ set(SRCS src/SpyDevice.cxx src/SpyMainFrame.cxx src/CcdbDatabase.cxx + src/InformationService.cxx + src/InformationServiceDump.cxx ) set(HEADERS # needed for the dictionary generation @@ -134,6 +136,20 @@ O2_GENERATE_EXECUTABLE( BUCKET_NAME ${BUCKET_NAME} ) +O2_GENERATE_EXECUTABLE( + EXE_NAME qcInfoService + SOURCES src/runInformationService.cxx + MODULE_LIBRARY_NAME ${LIBRARY_NAME} + BUCKET_NAME ${BUCKET_NAME} +) + +O2_GENERATE_EXECUTABLE( + EXE_NAME qcInfoServiceDump + SOURCES src/runInformationServiceDump.cxx + MODULE_LIBRARY_NAME ${LIBRARY_NAME} + BUCKET_NAME ${BUCKET_NAME} +) + if (FAIRROOT_FOUND) O2_GENERATE_EXECUTABLE( EXE_NAME alfaTestReceiver diff --git a/Framework/alfa.json b/Framework/alfa.json index 6c6658a291..5af6373d30 100644 --- a/Framework/alfa.json +++ b/Framework/alfa.json @@ -16,6 +16,19 @@ "rateLogging": 0 } ] + }, + { + "name": "information-service-out", + "sockets": [ + { + "type": "pub", + "method": "connect", + "address": "tcp://localhost:5560", + "sndBufSize": 10, + "rcvBufSize": 10, + "rateLogging": 0 + } + ] } ] }, @@ -28,9 +41,22 @@ { "type": "pub", "method": "bind", - "address": "tcp://*:5556", - "sndBufSize": 100, - "rcvBufSize": 100, + "address": "tcp://*:5557", + "sndBufSize": 1, + "rcvBufSize": 1, + "rateLogging": 0 + } + ] + }, + { + "name": "information-service-out", + "sockets": [ + { + "type": "pub", + "method": "connect", + "address": "tcp://localhost:5560", + "sndBufSize": 1, + "rcvBufSize": 1, "rateLogging": 0 } ] diff --git a/Framework/include/QualityControl/ObjectsManager.h b/Framework/include/QualityControl/ObjectsManager.h index cb5d91c415..5f0f098f26 100644 --- a/Framework/include/QualityControl/ObjectsManager.h +++ b/Framework/include/QualityControl/ObjectsManager.h @@ -29,15 +29,10 @@ class ObjectsManager public: ObjectsManager(TaskConfig &taskConfig); - - /// Destructor virtual ~ObjectsManager(); - void startPublishing(TObject *obj, std::string objectName = ""); - + // todo stoppublishing void setQuality(std::string objectName, Quality quality); - // todo stop publishing - Quality getQuality(std::string objectName); /// \brief Add a check to the object defined by objectName. @@ -71,6 +66,9 @@ class ObjectsManager iterator end() { return mMonitorObjects.end(); } + std::string getObjectsListString() + { return mObjectsList.GetString().Data(); } + private: void UpdateIndex(const std::string &nonEmptyName) ; diff --git a/Framework/include/QualityControl/TaskDevice.h b/Framework/include/QualityControl/TaskDevice.h index 60dafa9cdd..86f310d526 100644 --- a/Framework/include/QualityControl/TaskDevice.h +++ b/Framework/include/QualityControl/TaskDevice.h @@ -56,6 +56,7 @@ class TaskDevice : public FairMQDevice void monitorCycle(); unsigned long publish(); static void CustomCleanupTMessage(void *data, void *object); + void sendToInformationService(std::string objectsListString); private: std::string mTaskName; @@ -65,6 +66,8 @@ class TaskDevice : public FairMQDevice std::unique_ptr mSampler; o2::quality_control::core::TaskInterface *mTask; std::shared_ptr mObjectsManager; +// InformationServiceSender infoServiceSender; + std::string lastListSent; // stats int mTotalNumberObjectsPublished; diff --git a/Framework/src/InformationService.cxx b/Framework/src/InformationService.cxx new file mode 100644 index 0000000000..a2232191ff --- /dev/null +++ b/Framework/src/InformationService.cxx @@ -0,0 +1,217 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationService.cxx +/// + +#include "InformationService.h" +#include "QualityControl/QcInfoLogger.h" +#include + +using namespace std; +typedef boost::tokenizer > t_tokenizer; +using namespace o2::quality_control::core; + +int timeOutIntervals = 5; // in seconds + +InformationService::InformationService() : th(nullptr), mFakeDataIndex(0) +{ + OnData("tasks_input", &InformationService::handleTaskInputData); + OnData("request_data", &InformationService::handleRequestData); +} + +void InformationService::Init() +{ + string fakeDataFile = fConfig->GetValue("fake-data-file"); + + // todo put this in a method + if (fakeDataFile != "") { + readFakeDataFile(fakeDataFile); + } +} + +InformationService::~InformationService() +{ +} + +void InformationService::checkTimedOut() +{ + string line = mFakeData[mFakeDataIndex % mFakeData.size()]; + handleTaskInputData(line); + mFakeDataIndex++; + + // restart timer + mTimer->expires_at(mTimer->expires_at() + boost::posix_time::seconds(timeOutIntervals)); + mTimer->async_wait(boost::bind(&InformationService::checkTimedOut, this)); +} + +bool InformationService::handleRequestData(FairMQMessagePtr &request, int /*index*/) +{ + string requestParam = string(static_cast(request->GetData()), request->GetSize()); + LOG(INFO) << "Received request from client: \"" << requestParam << "\""; + + string *result = nullptr; + if (requestParam == "all") { + result = new string(produceJsonAll()); + } else { + if (mCacheTasksData.count(requestParam) > 0) { + result = new string(produceJson(requestParam)); + } else { + result = new string("{\"error\": \"no such task\"}"); + } + } + + LOG(INFO) << "Sending reply to client."; + FairMQMessagePtr reply(NewMessage(const_cast(result->c_str()), // data + result->length(), // size + [](void * /*data*/, + void *object) { delete static_cast(object); }, // deletion callback + result)); // object that manages the data + if (Send(reply, "request_data") <= 0) { + LOG(ERROR) << "error sending reply"; + } + return true; // keep running +} + +bool InformationService::handleTaskInputData(FairMQMessagePtr &msg, int /*index*/) +{ + string *receivedData = new std::string(static_cast(msg->GetData()), msg->GetSize()); + LOG(INFO) << "Received data, processing..."; + LOG(INFO) << " " << *receivedData; + + handleTaskInputData(*receivedData); + + return true; // keep running +} + +bool InformationService::handleTaskInputData(std::string receivedData) +{ + std::string taskName = getTaskName(&receivedData); + LOG(DEBUG) << "task : " << taskName; + + // check if new data + boost::hash string_hash; + size_t hash = string_hash(receivedData); + if (mCacheTasksObjectsHash.count(taskName) > 0) { + if (mCacheTasksObjectsHash.count(taskName) > 0 && hash == mCacheTasksObjectsHash[taskName]) { + LOG(INFO) << "Data already known, we skip it"; + return true; + } + } + mCacheTasksObjectsHash[taskName] = hash; + + // parse + vector objects = getObjects(&receivedData); + + // store + mCacheTasksData[taskName] = objects; + + // json + string *json = new std::string(produceJson(taskName)); + + // publish + sendJson(json); +} + +void InformationService::readFakeDataFile(std::string fakeDataFile) +{ + std::string line; + std::ifstream myfile(fakeDataFile); + if (!myfile) //Always test the file open. + { + LOG(ERROR) << "Error opening fake data file"; + return; + } + mFakeData.clear(); + while (std::getline(myfile, line)) { + mFakeData.push_back(line); + } + + // start a timer to use the fake data + mTimer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(timeOutIntervals)); + mTimer->async_wait(boost::bind(&InformationService::checkTimedOut, this)); + th = new thread([&] { io.run(); }); +} + +vector InformationService::getObjects(string *receivedData) +{ + vector objects; + std::string objectsString = receivedData->substr(receivedData->find(":") + 1, receivedData->length()); + LOG(DEBUG) << "objects : " << objectsString; + boost::char_separator sep(","); + t_tokenizer tok(objectsString, sep); + for (t_tokenizer::iterator beg = tok.begin(); beg != tok.end(); ++beg) { + objects.push_back(*beg); + } + return objects; +} + +std::string InformationService::getTaskName(std::string *receivedData) +{ + return receivedData->substr(0, receivedData->find(":")); +} + +pt::ptree InformationService::buildTaskNode(std::string taskName) +{ + pt::ptree task_node; + task_node.put("name", taskName); + pt::ptree objects_node; + for (auto &object : mCacheTasksData[taskName]) { + pt::ptree object_node; + object_node.put("id", object); + objects_node.push_back(std::make_pair("", object_node)); + } + task_node.add_child("objects", objects_node); + return task_node; +} + +std::string InformationService::produceJson(std::string taskName) +{ + pt::ptree taskNode = buildTaskNode(taskName); + + std::stringstream ss; + pt::json_parser::write_json(ss, taskNode); + LOG(DEBUG) << "json : " << endl << ss.str(); +// QcInfoLogger::GetInstance() << infologger::Debug << "json : \n" << *json << infologger::endm; + return ss.str(); +} + +std::string InformationService::produceJsonAll() +{ + string result; + pt::ptree main_node; + + pt::ptree tasksListNode; + for (const auto &taskTuple : mCacheTasksData) { + pt::ptree taskNode = buildTaskNode(taskTuple.first); + tasksListNode.push_back(std::make_pair("", taskNode)); + } + main_node.add_child("tasks", tasksListNode); + + std::stringstream ss; + pt::json_parser::write_json(ss, main_node); + LOG(DEBUG) << "json : " << endl << ss.str(); + return ss.str(); +} + +void InformationService::sendJson(std::string *json) +{ + FairMQMessagePtr msg2(NewMessage(const_cast(json->c_str()), + json->length(), + [](void * /*data*/, void *object) { delete static_cast(object); }, + json)); + int ret = Send(msg2, "updates_output"); + if (ret < 0) { + LOG(ERROR) << "Error sending update"; + } +} diff --git a/Framework/src/InformationService.h b/Framework/src/InformationService.h new file mode 100644 index 0000000000..ef05d16adb --- /dev/null +++ b/Framework/src/InformationService.h @@ -0,0 +1,98 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationService.h +/// + + +#ifndef PROJECT_INFORMATIONSERVICE_H +#define PROJECT_INFORMATIONSERVICE_H + +#include +#include "FairMQDevice.h" + +#include +#include +#include + +namespace pt = boost::property_tree; + +/// \brief Collect the list of objects published by all the tasks and make it available to clients. +/// +/// The InformationService receives the list of objects published by each task. +/// It keeps a list of all tasks and objects and send it upon request to clients. It also publishes updates when +/// new information comes from the tasks. +/// +/// See InformationService.json to know the port where updates are published. +/// See InformationService.json to know the port where to request information for all tasks (param "all") or +/// for a specific task (param ""). +/// See runInformationService.cxx for the steering code. +/// +/// Example usage : +/// qcInfoService -c /absolute/path/to/InformationService.json -n information_service \\ +/// --id information_service --mq-config /absolute/path/to/InformationService.json +/// +/// Format of the string coming from the tasks : +/// `task_id:obj0,obj1,obj2` +/// Format of the JSON output for one task or all tasks : +/// See README +/// +/// \todo Handle tasks dying and their removal from the cache and the publication of an update (heartbeat ?). +/// \todo Handle tasks sending information that they are disappearing. + +class InformationService : public FairMQDevice +{ + public: + InformationService(); + virtual ~InformationService(); + + protected: + /// Callback for data coming from qcTasks + bool handleTaskInputData(FairMQMessagePtr&, int); + /// Callback for the requests coming from clients + bool handleRequestData(FairMQMessagePtr&, int); + void Init(); + + private: + /// Extract the list of objects from the string received from the tasks + std::vector getObjects(std::string *receivedData); + /// Extract the task name from the string received from the tasks + std::string getTaskName(std::string *receivedData); + /// Produce the JSON string for the specified task + std::string produceJson(std::string taskName); + /// Produce the JSON string for all tasks and objects + std::string produceJsonAll(); + /// Send the JSON string to all clients (subscribers) + void sendJson(std::string *json); + pt::ptree buildTaskNode(std::string taskName); + void checkTimedOut(); + /// Compute and send the JSON using the inputString from a task + bool handleTaskInputData(std::string inputString); + /// Reads a file containing data in format as received from the tasks. + /// Store the items and use them at regular intervals to simulate tasks inputs. + /// Calling again this method will delete the former fake data cache. + void readFakeDataFile(std::string filePath); + + private: + std::map> mCacheTasksData; /// the list of objects names for each task + std::map mCacheTasksObjectsHash; /// used to check whether we already have received this list of objects + boost::asio::deadline_timer *mTimer; /// the asynchronous timer to check if agents have timed out + std::vector mFakeData; /// container for the fake data (if any). Each line is in a string and used in turn. + int mFakeDataIndex; + // variables for the timer + boost::asio::io_service io; + std::thread *th; + +}; + +#endif //PROJECT_INFORMATIONSERVICE_H diff --git a/Framework/src/InformationServiceDump.cxx b/Framework/src/InformationServiceDump.cxx new file mode 100644 index 0000000000..ff79e82518 --- /dev/null +++ b/Framework/src/InformationServiceDump.cxx @@ -0,0 +1,64 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationServiceDump.cxx +/// + +#include "InformationServiceDump.h" + +#include +#include +#include + +#include "FairMQLogger.h" +#include + +using namespace std; +namespace pt = boost::property_tree; + +InformationServiceDump::InformationServiceDump() +{ + OnData("info_service_input", &InformationServiceDump::HandleData); +} + +InformationServiceDump::~InformationServiceDump() +{ +} + +bool InformationServiceDump::HandleData(FairMQMessagePtr &msg, int /*index*/) +{ + string *receivedData = new std::string(static_cast(msg->GetData()), msg->GetSize()); + LOG(INFO) << "Received data : "; + LOG(INFO) << " " << *receivedData; + + string* text = new string( fConfig->GetValue("request-task")); + LOG(INFO) << "Preparing request for \"" << *text << "\""; + FairMQMessagePtr request(NewMessage(const_cast(text->c_str()), // data + text->length(), // size + [](void* /*data*/, void* object) { delete static_cast(object); }, // deletion callback + text)); // object that manages the data + LOG(INFO) << "Sending request "; + if (Send(request, "send_request") > 0) { + FairMQMessagePtr reply(NewMessage()); + if (Receive(reply, "send_request") >= 0) + { + LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; + } else { + LOG(ERROR) << "Problem receiving reply"; + } + } else { + LOG(ERROR) << "problem sending request"; + } + + return true; // keep running +} diff --git a/Framework/src/InformationServiceDump.h b/Framework/src/InformationServiceDump.h new file mode 100644 index 0000000000..b8d84ba1a0 --- /dev/null +++ b/Framework/src/InformationServiceDump.h @@ -0,0 +1,48 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationServiceDump.h +/// + + +#ifndef PROJECT_INFORMATIONSERVICEDUMP_H +#define PROJECT_INFORMATIONSERVICEDUMP_H + +#include "FairMQDevice.h" +#include + +/// \brief Dump the publications received from the InformationService. +/// +/// Useful for checking the InformationService. +/// It will receive the updates from the tasks. Upon reception , it dumps it and send a request for all or a single +/// task data and displays the reply. +/// To decide which task the request should target, use parameter "request-task". By default it asks for all. +/// +/// See runInformationServiceDump.cxx for the steering code. +/// +/// Example usage : +/// qcInfoServiceDump -c /absolute/path/to/InformationService.json -n information_service_dump +/// --id information_service_dump --mq-config /absolute/path/to/InformationService.json +/// --request-task myTask1 +class InformationServiceDump : public FairMQDevice +{ + public: + InformationServiceDump(); + virtual ~InformationServiceDump(); + + protected: + /// Callback for data coming from InformationService + bool HandleData(FairMQMessagePtr &, int); +}; + +#endif //PROJECT_InformationServiceDump_H diff --git a/Framework/src/ObjectsManager.cxx b/Framework/src/ObjectsManager.cxx index 890dbdf689..d6454402f2 100644 --- a/Framework/src/ObjectsManager.cxx +++ b/Framework/src/ObjectsManager.cxx @@ -36,7 +36,9 @@ void ObjectsManager::startPublishing(TObject *object, std::string objectName) mMonitorObjects[nonEmptyName] = newObject; //update index - UpdateIndex(nonEmptyName); + if(objectName != MonitorObject::SYSTEM_OBJECT_PUBLICATION_LIST) { + UpdateIndex(nonEmptyName); + } } void ObjectsManager::UpdateIndex(const string &nonEmptyName) diff --git a/Framework/src/TaskDevice.cxx b/Framework/src/TaskDevice.cxx index b67e268bf3..5c09e8bad6 100644 --- a/Framework/src/TaskDevice.cxx +++ b/Framework/src/TaskDevice.cxx @@ -199,6 +199,8 @@ unsigned long TaskDevice::publish() sentMessages++; } + sendToInformationService(mObjectsManager->getObjectsListString()); + return sentMessages; } @@ -231,6 +233,34 @@ void TaskDevice::endOfActivity() mCollector->send(ba::mean(pmems), "QC_task_Mean_pmem_whole_run"); } +void TaskDevice::sendToInformationService(string objectsListString) +{ + string* text = new std::string(mTaskName); + *text += ":" + objectsListString; + // todo escape names with a comma or a colon + + // create message object with a pointer to the data buffer, + // its size, + // custom deletion function (called when transfer is done), + // and pointer to the object managing the data buffer + FairMQMessagePtr msg(NewMessage(const_cast(text->c_str()), + text->length(), + [](void* /*data*/, void* object) { delete static_cast(object); }, + text)); + + LOG(info) << "Sending \"" << *text << "\""; + LOG(info) << " llength : " << text->length(); + + // in case of error or transfer interruption, return false to go to IDLE state + // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). + int ret = Send(msg, "information-service-out"); + if(ret < 0) + { + LOG(error) << "Error sending" << endl; + } + +} + } } } diff --git a/Framework/src/runInformationService.cxx b/Framework/src/runInformationService.cxx new file mode 100644 index 0000000000..00ace33bcd --- /dev/null +++ b/Framework/src/runInformationService.cxx @@ -0,0 +1,35 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file runInformationService.cxx +/// + +#include "runFairMQDevice.h" +#include "InformationService.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description &options) +{ + options.add_options() + ("fake-data-file", bpo::value()->default_value(""), + "File containing JSON to use as input (useful for tests if no tasks is running). It is used to reply to requests. " + "It is reloaded every 10 seconds and if it changed it is published to the clients."); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions & /*config*/) +{ + InformationService *is = new InformationService(); + + return is; +} \ No newline at end of file diff --git a/Framework/src/runInformationServiceDump.cxx b/Framework/src/runInformationServiceDump.cxx new file mode 100644 index 0000000000..7beea76ce8 --- /dev/null +++ b/Framework/src/runInformationServiceDump.cxx @@ -0,0 +1,32 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationServiceDump.cxx +/// + +#include "runFairMQDevice.h" +#include "InformationServiceDump.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description &options) +{ + options.add_options() + ("request-task", bpo::value()->default_value("all"), + "The name of the task it will request (default: all)"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions & /*config*/) +{ + return new InformationServiceDump(); +} \ No newline at end of file diff --git a/InformationService.json b/InformationService.json new file mode 100644 index 0000000000..71b245c071 --- /dev/null +++ b/InformationService.json @@ -0,0 +1,81 @@ +{ + "fairMQOptions": { + "devices": [ + { + "key": "information_service", + "channels": [ + { + "name": "tasks_input", + "sockets": [ + { + "type": "sub", + "method": "bind", + "address": "tcp://*:5560", + "sndBufSize": 1, + "rcvBufSize": 100, + "rateLogging": 0 + } + ] + }, + { + "name": "updates_output", + "sockets": [ + { + "type": "pub", + "method": "bind", + "address": "tcp://*:5561", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + }, + { + "name": "request_data", + "sockets": [ + { + "type": "rep", + "method": "bind", + "address": "tcp://*:5562", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + } + ] + }, + { + "key": "information_service_dump", + "channels": [ + { + "name": "info_service_input", + "sockets": [ + { + "type": "sub", + "method": "connect", + "address": "tcp://localhost:5561", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + }, + { + "name": "send_request", + "sockets": [ + { + "type": "req", + "method": "connect", + "address": "tcp://localhost:5562", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + } + ] + } + ] + } +} \ No newline at end of file diff --git a/Modules/Example/include/Example/ExampleTask.h b/Modules/Example/include/Example/ExampleTask.h index 4c645219eb..75fc3f5d3d 100644 --- a/Modules/Example/include/Example/ExampleTask.h +++ b/Modules/Example/include/Example/ExampleTask.h @@ -47,8 +47,9 @@ class ExampleTask /*final*/: public TaskInterface // todo add back the "final" w } private: - + int mNumberCycles; TH1F *mHistos[25]; + void publishHisto(int i); }; } diff --git a/Modules/Example/src/ExampleTask.cxx b/Modules/Example/src/ExampleTask.cxx index 6120e54ba4..e0855476b2 100644 --- a/Modules/Example/src/ExampleTask.cxx +++ b/Modules/Example/src/ExampleTask.cxx @@ -21,6 +21,7 @@ ExampleTask::ExampleTask() for (auto &mHisto : mHistos) { mHisto = nullptr; } + mNumberCycles = 0; } ExampleTask::~ExampleTask() @@ -36,11 +37,8 @@ void ExampleTask::initialize() { QcInfoLogger::GetInstance() << "initialize ExampleTask" << AliceO2::InfoLogger::InfoLogger::endm; - for (int i = 0; i < 25; i++) { - stringstream name; - name << "array-" << i; - mHistos[i] = new TH1F(name.str().c_str(), name.str().c_str(), 100, 0, 99); - getObjectsManager()->startPublishing(mHistos[i], name.str()); + for (int i = 0; i < 24; i++) { + publishHisto(i); } // Extendable axis @@ -55,11 +53,21 @@ void ExampleTask::initialize() "QcExample"); } +void ExampleTask::publishHisto(int i) +{ + stringstream name; + name << "array-" << i; + mHistos[i] = new TH1F(name.str().c_str(), name.str().c_str(), 100, 0, 99); + getObjectsManager()->startPublishing(mHistos[i], name.str()); +} + void ExampleTask::startOfActivity(Activity &activity) { QcInfoLogger::GetInstance() << "startOfActivity" << AliceO2::InfoLogger::InfoLogger::endm; for (auto &mHisto : mHistos) { - mHisto->Reset(); + if (mHisto) { + mHisto->Reset(); + } } } @@ -71,14 +79,22 @@ void ExampleTask::startOfCycle() void ExampleTask::monitorDataBlock(DataSetReference dataSet) { mHistos[0]->Fill(dataSet->at(0)->getData()->header.dataSize); - for (int i = 1; i < 25; i++) { - mHistos[i]->FillRandom("gaus", 1); + for (auto &mHisto : mHistos) { + if (mHisto) { + mHisto->FillRandom("gaus", 1); + } } } void ExampleTask::endOfCycle() { QcInfoLogger::GetInstance() << "endOfCycle" << AliceO2::InfoLogger::InfoLogger::endm; + mNumberCycles++; + + // Add one more object just to show that we can do it + if(mNumberCycles == 3) { + publishHisto(24); + } } void ExampleTask::endOfActivity(Activity &activity) diff --git a/README.md b/README.md index 8cc2899c74..e1d3167211 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,90 @@ One can also check what is stored in the database by clicking `Stop` and then switching to `Database` source. This will only work if a config file was passed to the `qcSpy` utility. +### Information Service + +The information service publishes information about the tasks currently +running and the objects they publish. It is needed by some GUIs, or +other clients. + +By default it will publish on port 5561 the json description of a task +when it is updated. A client can also request on port 5562 the information +about a specific task or about all the tasks, by passing the name of the +task as a parameter or "all" respectively. + +The JSON for a task looks like : +``` +{ + "name": "myTask_1", + "objects": [ + { + "id": "array-0" + }, + { + "id": "array-1" + }, + { + "id": "array-2" + }, + { + "id": "array-3" + }, + { + "id": "array-4" + } + ] +} +``` + +The JSON for all tasks looks like : +``` +{ + "tasks": [ + { + "name": "myTask_1", + "objects": [ + { + "id": "array-0" + }, + { + "id": "array-1" + } + ] + }, + { + "name": "myTask_2", + "objects": [ + { + "id": "array-0" + }, + { + "id": "array-1" + } + ] + } + ] +} +``` +#### Usage +``` +qcInfoService -c /absolute/path/to/InformationService.json -n information_service \ + --id information_service --mq-config /absolute/path/to/InformationService.json +``` + +The `qcInfoService` can provide fake data from a file. This is useful +to test the clients. Use the option `--fake-data-file` and provide the +absolute path to the file. The file `infoServiceFake.json` is provided +as an example. + +To check what is being output by the Information Service, one can +run the InformationServiceDump : +``` +qcInfoServiceDump -c /absolute/path/to/InformationService.json -n information_service_dump \ + --id information_service_dump --mq-config /absolute/path/to/InformationService.json + --request-task myTask1 +``` +The last parameter can be omitted to receive information about all tasks. + ## Modules development Steps to create a new module Abc @@ -184,4 +268,4 @@ From here, fill in the methods in AbcTask.cxx, and AbcCheck.cxx if needed. In case special additional dependencies are needed, create a new bucket in QualityControlModules/cmake/QualityControlModulesDependencies.cmake. - + diff --git a/infoServiceFake.json b/infoServiceFake.json new file mode 100644 index 0000000000..1f64744a8b --- /dev/null +++ b/infoServiceFake.json @@ -0,0 +1,5 @@ +myTask_1:array-0,array-1,array-2,array-3,array-4, +myTask_1:array-0,array-1,array-2,array-3,array-4,array-5, +myTask_1:array-0,array-1,array-2,array-3,array-4,array-5, +myTask_1:array-0,array-1,array-2,array-3,array-4, +myTask_2:array-0,array-1,array-2,array-3,array-4, \ No newline at end of file