Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control module pipeline fixes & added control module UTs #388

Merged
merged 8 commits into from
Oct 3, 2024
1 change: 1 addition & 0 deletions base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ SET(UT_FILES
test/overlaymodule_tests.cpp
test/testSignalGeneratorSrc_tests.cpp
test/audioToTextXform_tests.cpp
test/simpleControlModuleTests.cpp
${ARM64_UT_FILES}
${CUDA_UT_FILES}
)
Expand Down
20 changes: 14 additions & 6 deletions base/include/AbsControlModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,35 @@ class AbsControlModule : public Module {
bool term();
bool enrollModule(std::string role, boost::shared_ptr<Module> module);
boost::shared_ptr<Module> getModuleofRole(std::string role);
std::string printStatus();
virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {}
virtual void handleMMQExport(Command cmd, bool priority = false) {}
virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {}
virtual void handleSendMMQTSCmd(uint64_t mmqBeginTS, uint64_t mmqEndTS, bool priority = false) {}
virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {}
virtual void handleGoLive(bool goLive, bool priority) {}
virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {}
boost::container::deque<boost::shared_ptr<Module>> pipelineModules;
std::map<std::string, boost::shared_ptr<Module>> moduleRoles;
// Note: weak pointers to avoid cyclic dependency and mem leaks
std::map<std::string, boost::weak_ptr<Module>> moduleRoles;
virtual void handleError(const APErrorObject &error) {}
virtual void handleHealthCallback(const APHealthObject &healthObj) {}


virtual void handleHealthCallback(const APHealthObject& healthObj);
/**
* @brief Register external function to be triggered on every health callBack that control modules recieves from the modules.
* For eg. In SimpleControlModule, this extention is called at the end of handleHealthCallback function.
* @param function with signature void f(const APHealthObject*, unsigned short)
* @return nothing.
*/
void registerHealthCallbackExtention(
boost::function<void(const APHealthObject*, unsigned short)> callbackFunction);
protected:
bool process(frame_container& frames);
bool handleCommand(Command::CommandType type, frame_sp& frame);
bool handlePropsChange(frame_sp& frame);
virtual void sendEOS() {}
virtual void sendEOS(frame_sp& frame) {}
virtual void sendEOPFrame() {}

std::vector<std::string> serializeControlModule();
boost::function<void(const APHealthObject*, unsigned short)> healthCallbackExtention;
private:
class Detail;
boost::shared_ptr<Detail> mDetail;
Expand Down
2 changes: 2 additions & 0 deletions base/include/Module.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "BufferMaker.h"
#include "APCallback.h"

#define DUMMY_CTRL_EOP_PIN "dummy_ctrl_eop_pin"

using namespace std;

class FrameContainerQueue;
Expand Down
7 changes: 4 additions & 3 deletions base/include/PipeLine.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class PipeLine {
Status myStatus;
typedef boost::shared_ptr<Module> item_type;
typedef boost::container::deque< item_type > container_type;

boost::shared_ptr<Module> controlModule = nullptr;

std::string mName;
container_type modules;
bool validate();
Expand All @@ -47,8 +48,8 @@ class PipeLine {
void stop();
void term();
void wait_for_all(bool ignoreStatus = false);
void interrup_wait_for_all();
void flushAllQueues();
void interrupt_wait_for_all();
void flushAllQueues(bool flushControlModuleQ=false);
const char* getStatus();
};

9 changes: 3 additions & 6 deletions base/include/SimpleControlModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ class SimpleControlModule : public AbsControlModule

~SimpleControlModule()
{

}

void handleError(const APErrorObject &error);
void handleHealthCallback(const APHealthObject &healthObj);

// ErrorCallbacks
std::string printStatus();
void handleError(const APErrorObject& error) override;
void handleHealthCallback(const APHealthObject& healthObj) override;
protected:
void sendEOS();
void sendEOS(frame_sp& frame);
Expand Down
53 changes: 52 additions & 1 deletion base/src/AbsControlModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "Module.h"
#include "Command.h"
#include "PipeLine.h"
#include "boost/algorithm/string/join.hpp"

class AbsControlModule::Detail
{
Expand All @@ -24,6 +25,7 @@ AbsControlModule::AbsControlModule(AbsControlModuleProps _props)
{
mDetail.reset(new Detail(_props));
}

AbsControlModule::~AbsControlModule() {}

bool AbsControlModule::handleCommand(Command::CommandType type, frame_sp& frame)
Expand Down Expand Up @@ -56,6 +58,12 @@ bool AbsControlModule::process(frame_container& frames)
return true;
}

/**
* @brief Enroll your module to use healthcallback, errorcallback and other control module functions
* @param boost::shared_ptr<Module> the module to be registered
* @param role unique string for role of the module
* @return bool.
*/
bool AbsControlModule::enrollModule(std::string role, boost::shared_ptr<Module> module)
{
if (moduleRoles.find(role) != moduleRoles.end())
Expand Down Expand Up @@ -84,11 +92,54 @@ boost::shared_ptr<Module> AbsControlModule::getModuleofRole(std::string role)
boost::shared_ptr<Module> moduleWithRole = nullptr;
try
{
moduleWithRole = moduleRoles[role];
moduleWithRole = moduleRoles[role].lock();
mraduldubey marked this conversation as resolved.
Show resolved Hide resolved
}
catch (std::out_of_range)
{
LOG_ERROR << "no module with the role <" << role << "> registered with the control module.";
}
return moduleWithRole;
}

void AbsControlModule::registerHealthCallbackExtention(
boost::function<void(const APHealthObject*, unsigned short)> callbackFunction)
{
healthCallbackExtention = callbackFunction;
};

void AbsControlModule::handleHealthCallback(const APHealthObject& healthObj)
{
LOG_INFO << "Health Callback from module " << healthObj.getModuleId();
if (!healthCallbackExtention.empty())
{
LOG_INFO << "Calling the registered Health Callback Extention...";
healthCallbackExtention(&healthObj, 1);
}
}

std::vector<std::string> AbsControlModule::serializeControlModule()
{
std::string spacedLineFmt = "\t-->";
std::vector<std::string> status;
status.push_back("Module <" + this->getId() + "> \n");
status.push_back("Enrolled Modules \n");
for (auto it : moduleRoles)
{
status.push_back("module <" + it.second.lock()->getId() + "> role <" + it.first + ">\n");
std::string cbStatus = "registered for...\n";
if (it.second.lock()->getProps().enableHealthCallBack)
{
cbStatus += spacedLineFmt + "health callbacks \n";
}
cbStatus += spacedLineFmt + "error callbacks \n";
status.push_back(spacedLineFmt + cbStatus);
}
return status;
}

std::string AbsControlModule::printStatus()
{
auto ser = boost::algorithm::join(serializeControlModule(), "|");
LOG_INFO << ser;
return ser;
}
30 changes: 29 additions & 1 deletion base/src/Module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ Module::Module(Kind nature, string name, ModuleProps _props)
mPropsChangeMetadata.reset(
new FrameMetadata(FrameMetadata::FrameType::PROPS_CHANGE));
}
Module::~Module() {}
Module::~Module()
{
LOG_INFO << "Module destructor <" << myId << ">";
mraduldubey marked this conversation as resolved.
Show resolved Hide resolved
}

bool Module::term()
{
Expand Down Expand Up @@ -1295,6 +1298,16 @@ bool Module::step()
{
throw AIPException(CTRL_MODULE_INVALID_STATE, "Unexpected: " + std::to_string(frames.size()) + " frames remain unprocessed in control module.");
}
if (mPlay)
{
mProfiler->startProcessingLap();
ret = stepNonSource(frames);
mraduldubey marked this conversation as resolved.
Show resolved Hide resolved
mProfiler->endLap(mQue->size());
}
else
{
ret = true;
}
}
else
{
Expand Down Expand Up @@ -1563,6 +1576,14 @@ bool Module::addEoPFrame(frame_container &frames)
frames.insert(make_pair(me.first, frame));
}

if (myNature == CONTROL)
{
auto frame = frame_sp(new EoPFrame());
auto metadata = framemetadata_sp(new FrameMetadata(FrameMetadata::GENERAL));
frame->setMetadata((metadata));
frames.insert(make_pair(DUMMY_CTRL_EOP_PIN, frame));
}

// if sieve is disabled for atleast one connection - send additional EOP
// frames - extra EOP frames downstream shouldn't matter
if (mIsSieveDisabledForAny)
Expand All @@ -1586,6 +1607,13 @@ bool Module::handleStop()
{
return true;
}
if (myNature == CONTROL)
{
mRunning = false;
term();
return true;
}
// handle SOURCE, TRANSFORM, SINK below
mStopCount++;
if (myNature != SOURCE && mStopCount != mForwardPins)
{
Expand Down
47 changes: 39 additions & 8 deletions base/src/PipeLine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ bool PipeLine::addControlModule(boost::shared_ptr<AbsControlModule> cModule)
for (int i = 0; i < modules.size(); i++)
{
modules[i]->addControlModule(cModule);
cModule->pipelineModules.push_back(modules[i]);
}
controlModule = cModule;
return true;
}

Expand Down Expand Up @@ -133,6 +133,10 @@ bool PipeLine::init()
return false;
}
}
if (controlModule != nullptr)
{
controlModule->init();
}
myStatus = PL_INITED;
LOG_TRACE << " Pipeline initialized";
return true;
Expand All @@ -159,10 +163,11 @@ void PipeLine::run_all_threaded()
m.myThread = boost::thread(ref(m));
Utils::setModuleThreadName(m.myThread, m.getId());
}
if ((modules[0]->controlModule) != nullptr)
if (controlModule != nullptr)
{
Module& m = *(modules[0]->controlModule);
Module& m = *(controlModule);
m.myThread = boost::thread(ref(m));
Utils::setModuleThreadName(m.myThread, m.getId());
}
mPlay = true;
}
Expand All @@ -183,7 +188,7 @@ void PipeLine::pause()
i->get()->play(false);
}
}

//Note: controlModule should not be paused
mPlay = false;
}

Expand All @@ -196,7 +201,11 @@ void PipeLine::play()
i->get()->play(true);
}
}

// Control module should continue running anyways
if (controlModule != nullptr)
{
controlModule->play(true);
}
mPlay = true;
}

Expand All @@ -215,6 +224,7 @@ void PipeLine::step()
i->get()->queueStep();
}
}
// should controlModule step ?
mraduldubey marked this conversation as resolved.
Show resolved Hide resolved
}

void PipeLine::stop()
Expand All @@ -232,6 +242,10 @@ void PipeLine::stop()
i->get()->stop();
}
}
if (controlModule != nullptr)
{
controlModule->stop();
}
}

void PipeLine::wait_for_all(bool ignoreStatus)
Expand All @@ -247,10 +261,15 @@ void PipeLine::wait_for_all(bool ignoreStatus)
Module& m = *(i->get());
m.myThread.join();
}

if (controlModule != nullptr)
{
controlModule->myThread.join();
}
}


void PipeLine::interrup_wait_for_all()
void PipeLine::interrupt_wait_for_all()
{
if (myStatus > PL_STOPPING)
{
Expand All @@ -269,6 +288,12 @@ void PipeLine::interrup_wait_for_all()
Module& m = *(i->get());
m.myThread.join();
}

if (controlModule != nullptr)
{
controlModule->myThread.interrupt();
controlModule->myThread.join();
}
myStatus = PL_STOPPED;
}

Expand All @@ -280,12 +305,18 @@ const char * PipeLine::getStatus()
return StatusNames[myStatus];
}

void PipeLine::flushAllQueues() {
void PipeLine::flushAllQueues(bool flushControlModuleQ)
{
if (flushControlModuleQ && controlModule != nullptr)
{
controlModule->flushQue();
}
for (auto& m : modules)
{
if (m->myNature == Module::Kind::SOURCE)
{
m->flushQueRecursive();
}
}
}
}

10 changes: 10 additions & 0 deletions base/src/SimpleControlModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ void SimpleControlModule::sendEOPFrame()
return Module::sendEoPFrame();
}

std::string SimpleControlModule::printStatus()
{
return AbsControlModule::printStatus();
}

// Right Now, Just Logging But Can be used to Do bunch of other things
void SimpleControlModule::handleError(const APErrorObject &error)
{
Expand All @@ -26,4 +31,9 @@ void SimpleControlModule::handleError(const APErrorObject &error)
void SimpleControlModule::handleHealthCallback(const APHealthObject &healthObj)
{
LOG_ERROR << "Health Callback from module " << healthObj.getModuleId();
if (!healthCallbackExtention.empty())
{
LOG_INFO << "Calling Health Callback Extention...";
healthCallbackExtention(&healthObj,1);
}
}
Loading
Loading