From 4b2a6c3d98e9b9e370df869ee8d80a93088958d6 Mon Sep 17 00:00:00 2001 From: mradul Date: Wed, 25 Sep 2024 19:17:05 +0530 Subject: [PATCH 1/8] 1. added simple control module test 2. added healthcb ext 3. minor refactor --- base/CMakeLists.txt | 1 + base/include/AbsControlModule.h | 10 +- base/include/SimpleControlModule.h | 5 +- base/src/AbsControlModule.cpp | 16 ++ base/src/SimpleControlModule.cpp | 5 + base/test/simpleControlModuleTests.cpp | 273 +++++++++++++++++++++++++ 6 files changed, 303 insertions(+), 7 deletions(-) create mode 100644 base/test/simpleControlModuleTests.cpp diff --git a/base/CMakeLists.txt b/base/CMakeLists.txt index 864ad0c80..e16b7821a 100755 --- a/base/CMakeLists.txt +++ b/base/CMakeLists.txt @@ -627,6 +627,7 @@ SET(UT_FILES test/overlaymodule_tests.cpp test/testSignalGeneratorSrc_tests.cpp test/audioToTextXform_tests.cpp + test/simpleControlModuleTests.cpp ${ARM64_UT_FILES} ${CUDA_UT_FILES} ) diff --git a/base/include/AbsControlModule.h b/base/include/AbsControlModule.h index e0d04aea3..622e14093 100644 --- a/base/include/AbsControlModule.h +++ b/base/include/AbsControlModule.h @@ -18,6 +18,7 @@ class AbsControlModule : public Module { bool term(); bool enrollModule(std::string role, boost::shared_ptr module); boost::shared_ptr getModuleofRole(std::string role); + virtual std::string getStatus() { return ""; }; virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {} virtual void handleMMQExport(Command cmd, bool priority = false) {} virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {} @@ -28,9 +29,10 @@ class AbsControlModule : public Module { boost::container::deque> pipelineModules; std::map> moduleRoles; virtual void handleError(const APErrorObject &error) {} - virtual void handleHealthCallback(const APHealthObject &healthObj) {} - - + virtual void handleHealthCallback(const APHealthObject& healthObj); + // this will be called + void register_healthCallback_extention( + boost::function callbackFunction); protected: bool process(frame_container& frames); bool handleCommand(Command::CommandType type, frame_sp& frame); @@ -38,7 +40,7 @@ class AbsControlModule : public Module { virtual void sendEOS() {} virtual void sendEOS(frame_sp& frame) {} virtual void sendEOPFrame() {} - + boost::function healthCallbackExtention; private: class Detail; boost::shared_ptr mDetail; diff --git a/base/include/SimpleControlModule.h b/base/include/SimpleControlModule.h index be151463b..0dc907a0f 100644 --- a/base/include/SimpleControlModule.h +++ b/base/include/SimpleControlModule.h @@ -18,9 +18,8 @@ class SimpleControlModule : public AbsControlModule } - void handleError(const APErrorObject &error); - void handleHealthCallback(const APHealthObject &healthObj); - + void handleError(const APErrorObject &error) override; + void handleHealthCallback(const APHealthObject &healthObj) override; // ErrorCallbacks protected: void sendEOS(); diff --git a/base/src/AbsControlModule.cpp b/base/src/AbsControlModule.cpp index 387397d6f..42f2b975a 100644 --- a/base/src/AbsControlModule.cpp +++ b/base/src/AbsControlModule.cpp @@ -91,4 +91,20 @@ boost::shared_ptr AbsControlModule::getModuleofRole(std::string role) LOG_ERROR << "no module with the role <" << role << "> registered with the control module."; } return moduleWithRole; +} + +void AbsControlModule::register_healthCallback_extention( + boost::function callbackFunction) +{ + healthCallbackExtention = callbackFunction; +}; + +void AbsControlModule::handleHealthCallback(const APHealthObject& healthObj) +{ + LOG_INFO << "Health Callback from module " << healthObj.getModuleId(); + if (!healthCallbackExtention.empty()) + { + LOG_INFO << "Calling the registered Health Callback Extention..."; + healthCallbackExtention(&healthObj, 1); + } } \ No newline at end of file diff --git a/base/src/SimpleControlModule.cpp b/base/src/SimpleControlModule.cpp index 0960c75c6..4cf851961 100644 --- a/base/src/SimpleControlModule.cpp +++ b/base/src/SimpleControlModule.cpp @@ -26,4 +26,9 @@ void SimpleControlModule::handleError(const APErrorObject &error) void SimpleControlModule::handleHealthCallback(const APHealthObject &healthObj) { LOG_ERROR << "Health Callback from module " << healthObj.getModuleId(); + if (!healthCallbackExtention.empty()) + { + LOG_INFO << "Calling Health Callback Extention..."; + healthCallbackExtention(&healthObj,1); + } } diff --git a/base/test/simpleControlModuleTests.cpp b/base/test/simpleControlModuleTests.cpp new file mode 100644 index 000000000..4b001ec67 --- /dev/null +++ b/base/test/simpleControlModuleTests.cpp @@ -0,0 +1,273 @@ +#include +#include +#include + +#include "PipeLine.h" +#include "Module.h" +#include "SimpleControlModule.h" +#include "FrameMetadata.h" +#include "Frame.h" +#include "FrameContainerQueue.h" +#include "AIPExceptions.h" +#include "ExternalSourceModule.h" +#include "ExternalSinkModule.h" + +BOOST_AUTO_TEST_SUITE(simpleControlModule_tests) + +class TestModuleSrcProps :public ModuleProps +{ +public: + TestModuleSrcProps() : ModuleProps() + { + } +}; + +class TestModuleSrc : public Module +{ +public: + TestModuleSrc(TestModuleSrcProps props = TestModuleSrcProps()) : Module(SOURCE, "TestModuleSrc", props) + { + addOutputPin(); + } + + std::string addOutputPin() + { + outMetadata = framemetadata_sp(new FrameMetadata(FrameMetadata::FrameType::GENERAL)); + return Module::addOutputPin(outMetadata); + } + + bool produce() + { + auto outPin = getOutputPinIdByType(FrameMetadata::FrameType::GENERAL); + auto frame = makeFrame(10); + frames.insert(make_pair(outPin, frame)); + + send(frames); + return true; + } + + bool validateInputPins() + { + return true; + } + + bool validateOutputPins() + { + return true; + } + + bool validateInputOutputPins() + { + return true; + } + + frame_container frames; + framemetadata_sp outMetadata; +}; + +class TestModuleTransformProps : public ModuleProps +{ +public: + TestModuleTransformProps() : ModuleProps() + {} + ~TestModuleTransformProps() + {} +}; + +class TestModuleTransform : public Module +{ +public: + TestModuleTransform(TestModuleTransformProps props) : Module(TRANSFORM, "TestTransform", props) + { + addOutputPin(); + } + + bool init() + { + Module::init(); + return true; + } + + std::string addOutputPin() + { + outMetadata = framemetadata_sp(new FrameMetadata(FrameMetadata::FrameType::GENERAL)); + return Module::addOutputPin(outMetadata); + } + +protected: + + bool process(frame_container &frames) + { + auto outPin = getOutputPinIdByType(FrameMetadata::FrameType::GENERAL); + auto frame = makeFrame(10, outPin); + frames.insert(make_pair(outPin, frame)); + send(frames); + return true; + } + + bool validateOutputPins() + { + return true; + } + + bool validateInputPins() + { + return true; + } + + bool validateInputOutputPins() + { + return true; + } +private: + std::string outPin; + framemetadata_sp outMetadata; +}; + +class TestSink : public Module +{ +public: + TestSink() : Module(SINK, "TestSink", ModuleProps()) + { + + } + + virtual ~TestSink() {} + +protected: + bool validateInputPins() + { + return true; + } + + bool process(frame_container& frames) + { + return true; + } +}; + +struct SimpleControlModuleTests +{ + SimpleControlModuleTests(bool enableHealthCallback = true, int intervalInSecs = 1) + { + LoggerProps loggerProps; + loggerProps.logLevel = boost::log::trivial::severity_level::info; + Logger::setLogLevel(boost::log::trivial::severity_level::info); + Logger::initLogger(loggerProps); + + auto metadata = framemetadata_sp(new FrameMetadata(FrameMetadata::GENERAL)); + sourceMod = boost::shared_ptr(new TestModuleSrc); + //auto source_pin_1 = sourceMod->addOutputPin(metadata); + + /* set transform module health callbacks */ + TestModuleTransformProps props; + props.logHealth = true; + props.enableHealthCallBack = enableHealthCallback; + props.healthUpdateIntervalInSec = intervalInSecs; + transformMod1 = boost::shared_ptr(new TestModuleTransform(props)); + + sinkMod = boost::shared_ptr(new TestSink); + + auto simpleCtrlProps = SimpleControlModuleProps(); + simpleCtrl = boost::shared_ptr(new SimpleControlModule(simpleCtrlProps)); + + p = boost::shared_ptr(new PipeLine("test")); + } + ~SimpleControlModuleTests() {} + + void initModules() + { + BOOST_TEST(sourceMod->init()); + BOOST_TEST(transformMod1->init()); + BOOST_TEST(sinkMod->init()); + } + + void createPipeline() + { + sourceMod->setNext(transformMod1); + transformMod1->setNext(sinkMod); + } + + void enrollModules() + { + simpleCtrl->enrollModule("transform_test_module", transformMod1); + } + + void attachModulesToPipeline() + { + p->appendModule(sourceMod); + p->addControlModule(simpleCtrl); + } + + void initPipeline() + { + p->init(); + simpleCtrl->init(); + } + + void runPipeline() + { + p->run_all_threaded(); + } + + void startPipeline() + { + createPipeline(); + enrollModules(); + + attachModulesToPipeline(); + + initPipeline(); + + runPipeline(); + + return; + } + + bool termModules() + { + sourceMod->term(); + transformMod1->term(); + sinkMod->term(); + return true; + } + + bool stopPipeline() + { + p->stop(); + p->term(); + p->wait_for_all(); + p.reset(); + return true; + } + + void addControlModule() + { + p->addControlModule(simpleCtrl); + } + + boost::shared_ptr sourceMod; + boost::shared_ptr transformMod1; + boost::shared_ptr sinkMod; + boost::shared_ptr simpleCtrl; + boost::shared_ptr p; +}; + +void TestCallackExtention(const APHealthObject* healthObj, unsigned int eventId) +{ + auto moduleId = healthObj->getModuleId(); + BOOST_TEST(moduleId.find("TestTransform") != std::string::npos); +} + +BOOST_AUTO_TEST_CASE(simpleControlModule_healthCallback) +{ + SimpleControlModuleTests t; + t.simpleCtrl->register_healthCallback_extention(TestCallackExtention); + + t.startPipeline(); + t.addControlModule(); + boost::this_thread::sleep_for(boost::chrono::milliseconds(5000)); + t.stopPipeline(); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file From b53e4cd2f0e2956f7b35b6abe51a93b333c83fc3 Mon Sep 17 00:00:00 2001 From: mradul Date: Wed, 25 Sep 2024 19:17:22 +0530 Subject: [PATCH 2/8] setting thread name for control module --- base/src/PipeLine.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/base/src/PipeLine.cpp b/base/src/PipeLine.cpp index 0bc0f9d79..c58b19acd 100755 --- a/base/src/PipeLine.cpp +++ b/base/src/PipeLine.cpp @@ -163,6 +163,7 @@ void PipeLine::run_all_threaded() { Module& m = *(modules[0]->controlModule); m.myThread = boost::thread(ref(m)); + Utils::setModuleThreadName(m.myThread, m.getId()); } mPlay = true; } From 557cab2c73df9f434b698db8fcf4762692adc207 Mon Sep 17 00:00:00 2001 From: mradul Date: Thu, 26 Sep 2024 14:20:53 +0530 Subject: [PATCH 3/8] removed bug: have the list of modules in ctrl is not reqd, may create issues --- base/include/AbsControlModule.h | 8 ++++++-- base/src/PipeLine.cpp | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/base/include/AbsControlModule.h b/base/include/AbsControlModule.h index 622e14093..580e1471b 100644 --- a/base/include/AbsControlModule.h +++ b/base/include/AbsControlModule.h @@ -26,11 +26,15 @@ class AbsControlModule : public Module { virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {} virtual void handleGoLive(bool goLive, bool priority) {} virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {} - boost::container::deque> pipelineModules; std::map> moduleRoles; virtual void handleError(const APErrorObject &error) {} virtual void handleHealthCallback(const APHealthObject& healthObj); - // this will be called + /** + * @brief Register external function to be triggered on every health callBack that control modules recieves from the modules. + * In SimpleControlModule, this extention is called at the end of handleHealthCallback function. + * @param function with signature void f(const APHealthObject*, unsigned short) + * @return nothing. + */ void register_healthCallback_extention( boost::function callbackFunction); protected: diff --git a/base/src/PipeLine.cpp b/base/src/PipeLine.cpp index c58b19acd..a1697ee5e 100755 --- a/base/src/PipeLine.cpp +++ b/base/src/PipeLine.cpp @@ -37,7 +37,6 @@ bool PipeLine::addControlModule(boost::shared_ptr cModule) for (int i = 0; i < modules.size(); i++) { modules[i]->addControlModule(cModule); - cModule->pipelineModules.push_back(modules[i]); } return true; } @@ -106,6 +105,7 @@ bool PipeLine::init() { myStatus = PL_INITFAILED; return false; + return false; } LOG_TRACE << " Initializing pipeline"; From 366acb35985c846ff553764d6f46069569c55bfd Mon Sep 17 00:00:00 2001 From: mradul Date: Mon, 30 Sep 2024 14:48:40 +0530 Subject: [PATCH 4/8] 1. fixed control module thread join bug 2. added interface to flushQ --- base/include/PipeLine.h | 5 +-- base/src/Module.cpp | 25 +++++++++++++++ base/src/PipeLine.cpp | 43 ++++++++++++++++++++++---- base/test/simpleControlModuleTests.cpp | 17 +--------- 4 files changed, 66 insertions(+), 24 deletions(-) diff --git a/base/include/PipeLine.h b/base/include/PipeLine.h index b47b0d035..1d6c607a5 100755 --- a/base/include/PipeLine.h +++ b/base/include/PipeLine.h @@ -27,7 +27,8 @@ class PipeLine { Status myStatus; typedef boost::shared_ptr item_type; typedef boost::container::deque< item_type > container_type; - + boost::shared_ptr controlModule = nullptr; + std::string mName; container_type modules; bool validate(); @@ -48,7 +49,7 @@ class PipeLine { void term(); void wait_for_all(bool ignoreStatus = false); void interrup_wait_for_all(); - void flushAllQueues(); + void flushAllQueues(bool flushControlModuleQ=false); const char* getStatus(); }; diff --git a/base/src/Module.cpp b/base/src/Module.cpp index 661b7ee15..99984235e 100644 --- a/base/src/Module.cpp +++ b/base/src/Module.cpp @@ -1295,6 +1295,16 @@ bool Module::step() { throw AIPException(CTRL_MODULE_INVALID_STATE, "Unexpected: " + std::to_string(frames.size()) + " frames remain unprocessed in control module."); } + if (mPlay) + { + mProfiler->startProcessingLap(); + ret = stepNonSource(frames); + mProfiler->endLap(mQue->size()); + } + else + { + ret = true; + } } else { @@ -1563,6 +1573,14 @@ bool Module::addEoPFrame(frame_container &frames) frames.insert(make_pair(me.first, frame)); } + if (myNature == CONTROL) + { + auto frame = frame_sp(new EoPFrame()); + auto metadata = framemetadata_sp(new FrameMetadata(FrameMetadata::GENERAL)); + frame->setMetadata((metadata)); + frames.insert(make_pair("dummy_eop_ctrl_pin", frame)); + } + // if sieve is disabled for atleast one connection - send additional EOP // frames - extra EOP frames downstream shouldn't matter if (mIsSieveDisabledForAny) @@ -1586,6 +1604,13 @@ bool Module::handleStop() { return true; } + if (myNature == CONTROL) + { + mRunning = false; + term(); + return true; + } + // handle SOURCE, TRANSFORM, SINK below mStopCount++; if (myNature != SOURCE && mStopCount != mForwardPins) { diff --git a/base/src/PipeLine.cpp b/base/src/PipeLine.cpp index a1697ee5e..934e8618a 100755 --- a/base/src/PipeLine.cpp +++ b/base/src/PipeLine.cpp @@ -38,6 +38,7 @@ bool PipeLine::addControlModule(boost::shared_ptr cModule) { modules[i]->addControlModule(cModule); } + controlModule = cModule; return true; } @@ -133,6 +134,10 @@ bool PipeLine::init() return false; } } + if (controlModule != nullptr) + { + controlModule->init(); + } myStatus = PL_INITED; LOG_TRACE << " Pipeline initialized"; return true; @@ -159,9 +164,9 @@ void PipeLine::run_all_threaded() m.myThread = boost::thread(ref(m)); Utils::setModuleThreadName(m.myThread, m.getId()); } - if ((modules[0]->controlModule) != nullptr) + if (controlModule != nullptr) { - Module& m = *(modules[0]->controlModule); + Module& m = *(controlModule); m.myThread = boost::thread(ref(m)); Utils::setModuleThreadName(m.myThread, m.getId()); } @@ -184,7 +189,7 @@ void PipeLine::pause() i->get()->play(false); } } - + //Note: controlModule should not be paused mPlay = false; } @@ -197,7 +202,11 @@ void PipeLine::play() i->get()->play(true); } } - + // Control module should continue running anyways + if (controlModule != nullptr) + { + controlModule->play(true); + } mPlay = true; } @@ -216,6 +225,7 @@ void PipeLine::step() i->get()->queueStep(); } } + // should controlModule step ? } void PipeLine::stop() @@ -233,6 +243,10 @@ void PipeLine::stop() i->get()->stop(); } } + if (controlModule != nullptr) + { + controlModule->stop(); + } } void PipeLine::wait_for_all(bool ignoreStatus) @@ -248,6 +262,11 @@ void PipeLine::wait_for_all(bool ignoreStatus) Module& m = *(i->get()); m.myThread.join(); } + + if (controlModule != nullptr) + { + controlModule->myThread.join(); + } } @@ -270,6 +289,12 @@ void PipeLine::interrup_wait_for_all() Module& m = *(i->get()); m.myThread.join(); } + + if (controlModule != nullptr) + { + controlModule->myThread.interrupt(); + controlModule->myThread.join(); + } myStatus = PL_STOPPED; } @@ -281,7 +306,12 @@ const char * PipeLine::getStatus() return StatusNames[myStatus]; } -void PipeLine::flushAllQueues() { +void PipeLine::flushAllQueues(bool flushControlModuleQ) +{ + if (flushControlModuleQ && controlModule != nullptr) + { + controlModule->flushQue(); + } for (auto& m : modules) { if (m->myNature == Module::Kind::SOURCE) @@ -289,4 +319,5 @@ void PipeLine::flushAllQueues() { m->flushQueRecursive(); } } -} \ No newline at end of file +} + diff --git a/base/test/simpleControlModuleTests.cpp b/base/test/simpleControlModuleTests.cpp index 4b001ec67..1f24c4d85 100644 --- a/base/test/simpleControlModuleTests.cpp +++ b/base/test/simpleControlModuleTests.cpp @@ -175,13 +175,6 @@ struct SimpleControlModuleTests } ~SimpleControlModuleTests() {} - void initModules() - { - BOOST_TEST(sourceMod->init()); - BOOST_TEST(transformMod1->init()); - BOOST_TEST(sinkMod->init()); - } - void createPipeline() { sourceMod->setNext(transformMod1); @@ -202,7 +195,6 @@ struct SimpleControlModuleTests void initPipeline() { p->init(); - simpleCtrl->init(); } void runPipeline() @@ -224,14 +216,6 @@ struct SimpleControlModuleTests return; } - bool termModules() - { - sourceMod->term(); - transformMod1->term(); - sinkMod->term(); - return true; - } - bool stopPipeline() { p->stop(); @@ -268,6 +252,7 @@ BOOST_AUTO_TEST_CASE(simpleControlModule_healthCallback) t.addControlModule(); boost::this_thread::sleep_for(boost::chrono::milliseconds(5000)); t.stopPipeline(); + boost::this_thread::sleep_for(boost::chrono::milliseconds(3000)); } BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file From 9fa858f3dc5c1ac09d5a45331744d864a5e12323 Mon Sep 17 00:00:00 2001 From: mradul Date: Mon, 30 Sep 2024 17:04:09 +0530 Subject: [PATCH 5/8] resolved cyclic dependency bug with control module --- base/include/AbsControlModule.h | 3 ++- base/include/SimpleControlModule.h | 1 - base/src/AbsControlModule.cpp | 3 ++- base/src/Module.cpp | 5 ++++- base/test/simpleControlModuleTests.cpp | 4 +++- 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/base/include/AbsControlModule.h b/base/include/AbsControlModule.h index 580e1471b..bfed60cfe 100644 --- a/base/include/AbsControlModule.h +++ b/base/include/AbsControlModule.h @@ -26,7 +26,8 @@ class AbsControlModule : public Module { virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {} virtual void handleGoLive(bool goLive, bool priority) {} virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {} - std::map> moduleRoles; + // Note: weak pointers to avoid cyclic dependency and mem leaks + std::map> moduleRoles; virtual void handleError(const APErrorObject &error) {} virtual void handleHealthCallback(const APHealthObject& healthObj); /** diff --git a/base/include/SimpleControlModule.h b/base/include/SimpleControlModule.h index 0dc907a0f..b059c725c 100644 --- a/base/include/SimpleControlModule.h +++ b/base/include/SimpleControlModule.h @@ -15,7 +15,6 @@ class SimpleControlModule : public AbsControlModule ~SimpleControlModule() { - } void handleError(const APErrorObject &error) override; diff --git a/base/src/AbsControlModule.cpp b/base/src/AbsControlModule.cpp index 42f2b975a..bd8088841 100644 --- a/base/src/AbsControlModule.cpp +++ b/base/src/AbsControlModule.cpp @@ -24,6 +24,7 @@ AbsControlModule::AbsControlModule(AbsControlModuleProps _props) { mDetail.reset(new Detail(_props)); } + AbsControlModule::~AbsControlModule() {} bool AbsControlModule::handleCommand(Command::CommandType type, frame_sp& frame) @@ -84,7 +85,7 @@ boost::shared_ptr AbsControlModule::getModuleofRole(std::string role) boost::shared_ptr moduleWithRole = nullptr; try { - moduleWithRole = moduleRoles[role]; + moduleWithRole = moduleRoles[role].lock(); } catch (std::out_of_range) { diff --git a/base/src/Module.cpp b/base/src/Module.cpp index 99984235e..92788a7d1 100644 --- a/base/src/Module.cpp +++ b/base/src/Module.cpp @@ -178,7 +178,10 @@ Module::Module(Kind nature, string name, ModuleProps _props) mPropsChangeMetadata.reset( new FrameMetadata(FrameMetadata::FrameType::PROPS_CHANGE)); } -Module::~Module() {} +Module::~Module() +{ + LOG_INFO << "Module destructor <" << myId << ">"; +} bool Module::term() { diff --git a/base/test/simpleControlModuleTests.cpp b/base/test/simpleControlModuleTests.cpp index 1f24c4d85..27488cb5b 100644 --- a/base/test/simpleControlModuleTests.cpp +++ b/base/test/simpleControlModuleTests.cpp @@ -173,7 +173,9 @@ struct SimpleControlModuleTests p = boost::shared_ptr(new PipeLine("test")); } - ~SimpleControlModuleTests() {} + ~SimpleControlModuleTests() + { + } void createPipeline() { From bdb977644bb77751d8340f25346fb2feadaade84 Mon Sep 17 00:00:00 2001 From: mradul Date: Mon, 30 Sep 2024 19:17:22 +0530 Subject: [PATCH 6/8] added serializing control module info --- base/include/AbsControlModule.h | 3 ++- base/include/SimpleControlModule.h | 2 +- base/src/AbsControlModule.cpp | 34 ++++++++++++++++++++++++++ base/src/SimpleControlModule.cpp | 5 ++++ base/test/simpleControlModuleTests.cpp | 1 + 5 files changed, 43 insertions(+), 2 deletions(-) diff --git a/base/include/AbsControlModule.h b/base/include/AbsControlModule.h index bfed60cfe..06477f4a0 100644 --- a/base/include/AbsControlModule.h +++ b/base/include/AbsControlModule.h @@ -18,7 +18,7 @@ class AbsControlModule : public Module { bool term(); bool enrollModule(std::string role, boost::shared_ptr module); boost::shared_ptr getModuleofRole(std::string role); - virtual std::string getStatus() { return ""; }; + std::string printStatus(); virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {} virtual void handleMMQExport(Command cmd, bool priority = false) {} virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {} @@ -45,6 +45,7 @@ class AbsControlModule : public Module { virtual void sendEOS() {} virtual void sendEOS(frame_sp& frame) {} virtual void sendEOPFrame() {} + std::vector serializeControlModule(); boost::function healthCallbackExtention; private: class Detail; diff --git a/base/include/SimpleControlModule.h b/base/include/SimpleControlModule.h index b059c725c..13746deec 100644 --- a/base/include/SimpleControlModule.h +++ b/base/include/SimpleControlModule.h @@ -16,7 +16,7 @@ class SimpleControlModule : public AbsControlModule ~SimpleControlModule() { } - + std::string printStatus(); void handleError(const APErrorObject &error) override; void handleHealthCallback(const APHealthObject &healthObj) override; // ErrorCallbacks diff --git a/base/src/AbsControlModule.cpp b/base/src/AbsControlModule.cpp index bd8088841..7c47ff973 100644 --- a/base/src/AbsControlModule.cpp +++ b/base/src/AbsControlModule.cpp @@ -4,6 +4,7 @@ #include "Module.h" #include "Command.h" #include "PipeLine.h" +#include "boost/algorithm/string/join.hpp" class AbsControlModule::Detail { @@ -57,6 +58,12 @@ bool AbsControlModule::process(frame_container& frames) return true; } +/** + * @brief Enroll your module to use healthcallback, errorcallback and other control module functions + * @param boost::shared_ptr the module to be registered + * @param role unique string for role of the module + * @return bool. + */ bool AbsControlModule::enrollModule(std::string role, boost::shared_ptr module) { if (moduleRoles.find(role) != moduleRoles.end()) @@ -108,4 +115,31 @@ void AbsControlModule::handleHealthCallback(const APHealthObject& healthObj) LOG_INFO << "Calling the registered Health Callback Extention..."; healthCallbackExtention(&healthObj, 1); } +} + +std::vector AbsControlModule::serializeControlModule() +{ + std::string spacedLineFmt = "\t-->"; + std::vector status; + status.push_back("Module <" + this->getId() + "> \n"); + status.push_back("Enrolled Modules \n"); + for (auto it : moduleRoles) + { + status.push_back("module <" + it.second.lock()->getId() + "> role <" + it.first + ">\n"); + std::string cbStatus = "registered for...\n"; + if (it.second.lock()->getProps().enableHealthCallBack) + { + cbStatus += spacedLineFmt + " \n"; + } + cbStatus += spacedLineFmt + ""; + status.push_back(spacedLineFmt + cbStatus); + } + return status; +} + +std::string AbsControlModule::printStatus() +{ + auto ser = boost::algorithm::join(serializeControlModule(), "|"); + LOG_INFO << ser; + return ser; } \ No newline at end of file diff --git a/base/src/SimpleControlModule.cpp b/base/src/SimpleControlModule.cpp index 4cf851961..36ea575c0 100644 --- a/base/src/SimpleControlModule.cpp +++ b/base/src/SimpleControlModule.cpp @@ -15,6 +15,11 @@ void SimpleControlModule::sendEOPFrame() return Module::sendEoPFrame(); } +std::string SimpleControlModule::printStatus() +{ + return AbsControlModule::printStatus(); +} + // Right Now, Just Logging But Can be used to Do bunch of other things void SimpleControlModule::handleError(const APErrorObject &error) { diff --git a/base/test/simpleControlModuleTests.cpp b/base/test/simpleControlModuleTests.cpp index 27488cb5b..f0ce866a7 100644 --- a/base/test/simpleControlModuleTests.cpp +++ b/base/test/simpleControlModuleTests.cpp @@ -252,6 +252,7 @@ BOOST_AUTO_TEST_CASE(simpleControlModule_healthCallback) t.startPipeline(); t.addControlModule(); + t.simpleCtrl->printStatus(); boost::this_thread::sleep_for(boost::chrono::milliseconds(5000)); t.stopPipeline(); boost::this_thread::sleep_for(boost::chrono::milliseconds(3000)); From b4f29a47af0fd91e6bb1431ff7b424e3eefc3468 Mon Sep 17 00:00:00 2001 From: mradul Date: Tue, 1 Oct 2024 16:47:20 +0530 Subject: [PATCH 7/8] review feedback, minor refactoring, added a disabled test in simplectrl --- base/include/AbsControlModule.h | 4 +- base/include/PipeLine.h | 2 +- base/include/SimpleControlModule.h | 1 - base/src/AbsControlModule.cpp | 6 +-- base/src/PipeLine.cpp | 3 +- base/test/simpleControlModuleTests.cpp | 63 +++++++++++++++++++++++++- 6 files changed, 68 insertions(+), 11 deletions(-) diff --git a/base/include/AbsControlModule.h b/base/include/AbsControlModule.h index 06477f4a0..83e6d2f6a 100644 --- a/base/include/AbsControlModule.h +++ b/base/include/AbsControlModule.h @@ -32,11 +32,11 @@ class AbsControlModule : public Module { virtual void handleHealthCallback(const APHealthObject& healthObj); /** * @brief Register external function to be triggered on every health callBack that control modules recieves from the modules. - * In SimpleControlModule, this extention is called at the end of handleHealthCallback function. + * For eg. In SimpleControlModule, this extention is called at the end of handleHealthCallback function. * @param function with signature void f(const APHealthObject*, unsigned short) * @return nothing. */ - void register_healthCallback_extention( + void registerHealthCallbackExtention( boost::function callbackFunction); protected: bool process(frame_container& frames); diff --git a/base/include/PipeLine.h b/base/include/PipeLine.h index 1d6c607a5..7e134dad4 100755 --- a/base/include/PipeLine.h +++ b/base/include/PipeLine.h @@ -48,7 +48,7 @@ class PipeLine { void stop(); void term(); void wait_for_all(bool ignoreStatus = false); - void interrup_wait_for_all(); + void interrupt_wait_for_all(); void flushAllQueues(bool flushControlModuleQ=false); const char* getStatus(); }; diff --git a/base/include/SimpleControlModule.h b/base/include/SimpleControlModule.h index 13746deec..681c867da 100644 --- a/base/include/SimpleControlModule.h +++ b/base/include/SimpleControlModule.h @@ -19,7 +19,6 @@ class SimpleControlModule : public AbsControlModule std::string printStatus(); void handleError(const APErrorObject &error) override; void handleHealthCallback(const APHealthObject &healthObj) override; - // ErrorCallbacks protected: void sendEOS(); void sendEOS(frame_sp& frame); diff --git a/base/src/AbsControlModule.cpp b/base/src/AbsControlModule.cpp index 7c47ff973..f8cdc0f04 100644 --- a/base/src/AbsControlModule.cpp +++ b/base/src/AbsControlModule.cpp @@ -101,7 +101,7 @@ boost::shared_ptr AbsControlModule::getModuleofRole(std::string role) return moduleWithRole; } -void AbsControlModule::register_healthCallback_extention( +void AbsControlModule::registerHealthCallbackExtention( boost::function callbackFunction) { healthCallbackExtention = callbackFunction; @@ -129,9 +129,9 @@ std::vector AbsControlModule::serializeControlModule() std::string cbStatus = "registered for...\n"; if (it.second.lock()->getProps().enableHealthCallBack) { - cbStatus += spacedLineFmt + " \n"; + cbStatus += spacedLineFmt + "health callbacks \n"; } - cbStatus += spacedLineFmt + ""; + cbStatus += spacedLineFmt + "error callbacks \n"; status.push_back(spacedLineFmt + cbStatus); } return status; diff --git a/base/src/PipeLine.cpp b/base/src/PipeLine.cpp index 934e8618a..83998c154 100755 --- a/base/src/PipeLine.cpp +++ b/base/src/PipeLine.cpp @@ -106,7 +106,6 @@ bool PipeLine::init() { myStatus = PL_INITFAILED; return false; - return false; } LOG_TRACE << " Initializing pipeline"; @@ -270,7 +269,7 @@ void PipeLine::wait_for_all(bool ignoreStatus) } -void PipeLine::interrup_wait_for_all() +void PipeLine::interrupt_wait_for_all() { if (myStatus > PL_STOPPING) { diff --git a/base/test/simpleControlModuleTests.cpp b/base/test/simpleControlModuleTests.cpp index f0ce866a7..5853032b7 100644 --- a/base/test/simpleControlModuleTests.cpp +++ b/base/test/simpleControlModuleTests.cpp @@ -157,7 +157,6 @@ struct SimpleControlModuleTests auto metadata = framemetadata_sp(new FrameMetadata(FrameMetadata::GENERAL)); sourceMod = boost::shared_ptr(new TestModuleSrc); - //auto source_pin_1 = sourceMod->addOutputPin(metadata); /* set transform module health callbacks */ TestModuleTransformProps props; @@ -168,6 +167,10 @@ struct SimpleControlModuleTests sinkMod = boost::shared_ptr(new TestSink); + // pins connection + sourceMod->setNext(transformMod1); + transformMod1->setNext(sinkMod); + auto simpleCtrlProps = SimpleControlModuleProps(); simpleCtrl = boost::shared_ptr(new SimpleControlModule(simpleCtrlProps)); @@ -248,7 +251,7 @@ void TestCallackExtention(const APHealthObject* healthObj, unsigned int eventId) BOOST_AUTO_TEST_CASE(simpleControlModule_healthCallback) { SimpleControlModuleTests t; - t.simpleCtrl->register_healthCallback_extention(TestCallackExtention); + t.simpleCtrl->registerHealthCallbackExtention(TestCallackExtention); t.startPipeline(); t.addControlModule(); @@ -258,4 +261,60 @@ BOOST_AUTO_TEST_CASE(simpleControlModule_healthCallback) boost::this_thread::sleep_for(boost::chrono::milliseconds(3000)); } +BOOST_AUTO_TEST_CASE(simpleControlModule_enroll_ctrlMod_step_test, *boost::unit_test::disabled()) +{ + SimpleControlModuleTests t; + + t.simpleCtrl->registerHealthCallbackExtention(TestCallackExtention); + t.addControlModule(); + + t.sourceMod->init(); + t.transformMod1->init(); + t.sinkMod->init(); + t.simpleCtrl->init(); + + t.sourceMod->step(); + t.transformMod1->step(); + t.sinkMod->step(); + + t.simpleCtrl->enrollModule("transform_test_module", t.transformMod1); + t.simpleCtrl->enrollModule("source_test_module", t.sourceMod); + + // BOOSTASSERT the printStatus for enrollment + auto status = t.simpleCtrl->printStatus(); + BOOST_ASSERT(status.find("transform_test_module") != std::string::npos); + BOOST_ASSERT(status.find("source_test_module") != std::string::npos); + + // since we are queueing any command in control module, the step should remain blocked at mQue->pop inside step() + // the following code tests exactly that. + auto future = std::async(std::launch::async, &SimpleControlModule::step, t.simpleCtrl.get()); + if (future.wait_for(std::chrono::seconds(2)) == std::future_status::ready) + { + try + { + bool result = future.get(); + LOG_ERROR << "Simple control module step() unexpectedly returned a value <" << result << ">"; + } + catch (const std::exception& e) + { + std::cout << "Task threw an exception: " << e.what() << std::endl; + } + BOOST_ASSERT(false); + } + BOOST_ASSERT(true); + + t.sourceMod->stop(); + t.transformMod1->stop(); + t.sinkMod->stop(); + t.simpleCtrl->stop(); + + t.sourceMod->term(); + t.transformMod1->term(); + t.sinkMod->term(); + t.simpleCtrl->term(); + + LOG_INFO << "SUCCESS: do not wait for step() to finish..."; // future.get() + return; +} + BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file From 6f90a5d5c2fc2ceb2e115e373ac1579e8969a57e Mon Sep 17 00:00:00 2001 From: mradul Date: Thu, 3 Oct 2024 14:40:59 +0530 Subject: [PATCH 8/8] code review feedback --- base/include/Module.h | 2 ++ base/include/SimpleControlModule.h | 4 ++-- base/src/Module.cpp | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/base/include/Module.h b/base/include/Module.h index 842dd2df7..53955d780 100644 --- a/base/include/Module.h +++ b/base/include/Module.h @@ -21,6 +21,8 @@ #include "BufferMaker.h" #include "APCallback.h" +#define DUMMY_CTRL_EOP_PIN "dummy_ctrl_eop_pin" + using namespace std; class FrameContainerQueue; diff --git a/base/include/SimpleControlModule.h b/base/include/SimpleControlModule.h index 681c867da..bb31d358a 100644 --- a/base/include/SimpleControlModule.h +++ b/base/include/SimpleControlModule.h @@ -17,8 +17,8 @@ class SimpleControlModule : public AbsControlModule { } std::string printStatus(); - void handleError(const APErrorObject &error) override; - void handleHealthCallback(const APHealthObject &healthObj) override; + void handleError(const APErrorObject& error) override; + void handleHealthCallback(const APHealthObject& healthObj) override; protected: void sendEOS(); void sendEOS(frame_sp& frame); diff --git a/base/src/Module.cpp b/base/src/Module.cpp index 92788a7d1..0e6a72f06 100644 --- a/base/src/Module.cpp +++ b/base/src/Module.cpp @@ -1581,7 +1581,7 @@ bool Module::addEoPFrame(frame_container &frames) auto frame = frame_sp(new EoPFrame()); auto metadata = framemetadata_sp(new FrameMetadata(FrameMetadata::GENERAL)); frame->setMetadata((metadata)); - frames.insert(make_pair("dummy_eop_ctrl_pin", frame)); + frames.insert(make_pair(DUMMY_CTRL_EOP_PIN, frame)); } // if sieve is disabled for atleast one connection - send additional EOP