Skip to content

Commit

Permalink
better subscriber handling in camera throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
klaxalk committed Oct 23, 2024
1 parent 394590e commit 0dffb22
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 23 deletions.
2 changes: 2 additions & 0 deletions mrs_camera_republisher/launch/throttle.launch
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

<param name="rate" value="$(arg rate)" />

<param name="camera_name" value="$(arg camera_name)" />

<remap from="~image_in" to="$(arg original_topic)/image_raw" />
<remap from="~camera_info_in" to="$(arg original_topic)/camera_info" />

Expand Down
101 changes: 78 additions & 23 deletions mrs_camera_republisher/src/throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ class Throttle : public nodelet::Nodelet {

ros::NodeHandle nh_;

std::unique_ptr<image_transport::ImageTransport> it_;

double _dt_;

std::string _camera_name_;

mrs_lib::PublisherHandler<sensor_msgs::CameraInfo> ph_camera_info;

image_transport::Publisher publisher_image_;
Expand All @@ -45,6 +49,13 @@ class Throttle : public nodelet::Nodelet {

mrs_lib::SubscribeHandler<sensor_msgs::CameraInfo> sh_camera_info_;

ros::Timer timer_main_;

std::atomic<bool> sh_image_on_ = true;
std::atomic<bool> sh_cam_info_on_ = true;

void timerMain(const ros::TimerEvent& event);

ros::Time last_time_published_image_;
ros::Time last_time_published_cam_info_;
};
Expand All @@ -57,7 +68,7 @@ void Throttle::onInit() {

ros::NodeHandle nh = nodelet::Nodelet::getMTPrivateNodeHandle();

ROS_INFO("[Throttle]: initializing");
ROS_INFO("[CameraThrottle_%s]: initializing", _camera_name_.c_str());

ros::Time::waitForValid();

Expand All @@ -69,26 +80,27 @@ void Throttle::onInit() {
double rate;

param_loader.loadParam("rate", rate);
param_loader.loadParam("camera_name", _camera_name_);

if (!param_loader.loadedSuccessfully()) {
ROS_ERROR("[Throttle]: failed to load non-optional parameters!");
ROS_ERROR("[CameraThrottle_%s]: failed to load non-optional parameters!", _camera_name_.c_str());
ros::shutdown();
}

if (rate < 1e-3) {
ROS_ERROR("[Throttle]: provided rate is too low");
ROS_ERROR("[CameraThrottle_%s]: provided rate is too low", _camera_name_.c_str());
ros::shutdown();
}

_dt_ = 1.0 / rate;

// | --------------------- image transport -------------------- |

image_transport::ImageTransport it(nh);
it_ = std::make_unique<image_transport::ImageTransport>(nh);

// | ----------------------- subscribers ---------------------- |

subscriber_image_ = it.subscribe("image_in", 1, &Throttle::callbackImage, this);
subscriber_image_ = it_->subscribe("image_in", 1, &Throttle::callbackImage, this);

mrs_lib::SubscribeHandlerOptions shopts;
shopts.nh = nh;
Expand All @@ -105,42 +117,91 @@ void Throttle::onInit() {

ph_camera_info = mrs_lib::PublisherHandler<sensor_msgs::CameraInfo>(nh, "camera_info_out", 1);

publisher_image_ = it.advertise("image_out", 1);
publisher_image_ = it_->advertise("image_out", 1);

// | ------------------------- timers ------------------------- |

timer_main_ = nh_.createTimer(ros::Duration(1.0), &Throttle::timerMain, this);

// | --------------------- finish the init -------------------- |

is_initialized_ = true;

ROS_INFO_ONCE("[Throttle]: initialized");
ROS_INFO_ONCE("[CameraThrottle_%s]: initialized", _camera_name_.c_str());
}

//}

// | ------------------------ callbacks ----------------------- |
// | ------------------------- timers ------------------------- |

/* callbackImage() //{ */
/* timerMain() //{ */

void Throttle::callbackImage(const sensor_msgs::Image::ConstPtr& msg) {
void Throttle::timerMain([[maybe_unused]] const ros::TimerEvent& event) {

if (!is_initialized_) {
return;
}

if (publisher_image_.getNumSubscribers() == 0) {
if (sh_image_on_) {

if (publisher_image_.getNumSubscribers() == 0) {

sh_image_on_ = false;
subscriber_image_.shutdown();
ROS_INFO("[CameraThrottle_%s]: no subscribers on images, shutting down image subscriber", _camera_name_.c_str());
}

} else {

if (publisher_image_.getNumSubscribers() > 0) {

sh_image_on_ = true;
subscriber_image_ = it_->subscribe("image_in", 1, &Throttle::callbackImage, this);
ROS_INFO("[CameraThrottle_%s]: re-enabling image subscriber", _camera_name_.c_str());
}
}

if (sh_cam_info_on_) {

if (ph_camera_info.getNumSubscribers() == 0) {

sh_cam_info_on_ = false;
sh_camera_info_.stop();
ROS_INFO("[CameraThrottle_%s]: no subscribers on camera info, shutting down camera info subscriber", _camera_name_.c_str());
}

} else {

if (ph_camera_info.getNumSubscribers() > 0) {

sh_cam_info_on_ = true;
sh_camera_info_.start();
ROS_INFO("[CameraThrottle_%s]: re-enabling camera info subscriber", _camera_name_.c_str());
}
}
}

//}

// | ------------------------ callbacks ----------------------- |

/* callbackImage() //{ */

void Throttle::callbackImage(const sensor_msgs::Image::ConstPtr& msg) {

if (!is_initialized_) {
return;
}

if ((ros::Time::now() - last_time_published_image_).toSec() < _dt_) {
return;
}

sensor_msgs::Image image = *msg;

publisher_image_.publish(image);
publisher_image_.publish(msg);

last_time_published_image_ = ros::Time::now();

ROS_INFO_ONCE("[Throttle]: republishing throttled images");
ROS_INFO_ONCE("[CameraThrottle_%s]: republishing throttled images", _camera_name_.c_str());
}

//}
Expand All @@ -153,21 +214,15 @@ void Throttle::callbackCameraInfo(const sensor_msgs::CameraInfo::ConstPtr msg) {
return;
}

if (ph_camera_info.getNumSubscribers() == 0) {
return;
}

if ((ros::Time::now() - last_time_published_image_).toSec() < _dt_) {
return;
}

sensor_msgs::CameraInfo info = *msg;

ph_camera_info.publish(info);
ph_camera_info.publish(msg);

last_time_published_cam_info_ = ros::Time::now();

ROS_INFO_ONCE("[Throttle]: republishing throttled camera info");
ROS_INFO_ONCE("[CameraThrottle_%s]: republishing throttled camera info", _camera_name_.c_str());
}

//}
Expand Down

0 comments on commit 0dffb22

Please sign in to comment.