Skip to content

Commit

Permalink
Merge pull request #66 from kroshu/fix/unblock_stream
Browse files Browse the repository at this point in the history
Fix observe thread block
  • Loading branch information
altex111 authored Feb 27, 2023
2 parents 089f2fd + 5f6759a commit 6fab420
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ class KukaRoXHardwareInterface : public hardware_interface::SystemInterface
#endif

std::thread observe_thread_;
std::atomic<bool> terminate_{false};
std::mutex observe_mutex_;

std::unique_ptr<os::core::udp::communication::UDPReplier> udp_replier_;
std::chrono::milliseconds receive_timeout_ {100};
Expand Down
67 changes: 37 additions & 30 deletions kuka_rox_hardware_interface/src/robot_manager_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ RobotManagerNode::RobotManagerNode()

RobotManagerNode::~RobotManagerNode()
{
#ifdef NON_MOCK_SETUP
if (context_ != nullptr) {
context_->TryCancel();
}
#endif
if (observe_thread_.joinable()) {
observe_thread_.join();
}
Expand Down Expand Up @@ -161,33 +166,28 @@ RobotManagerNode::on_cleanup(const rclcpp_lifecycle::State &)
void RobotManagerNode::ObserveControl()
{
#ifdef NON_MOCK_SETUP
while (!terminate_) {
context_ = std::make_unique<::grpc::ClientContext>();
kuka::ecs::v1::ObserveControlStateRequest observe_request;
std::unique_ptr<grpc::ClientReader<kuka::ecs::v1::CommandState>> reader(
stub_->ObserveControlState(context_.get(), observe_request));

kuka::ecs::v1::CommandState response;

if (reader->Read(&response)) {
switch (static_cast<int>(response.event())) {
case kuka::ecs::v1::CommandEvent::STOPPED:
case kuka::ecs::v1::CommandEvent::ERROR:
RCLCPP_INFO(get_logger(), "External control stopped");
terminate_ = true;
if (this->get_current_state().id() == State::PRIMARY_STATE_ACTIVE) {
this->deactivate();
} else if (this->get_current_state().id() == State::TRANSITION_STATE_ACTIVATING) {
// TODO(Svastits): this can be removed if rollback is implemented properly
this->on_deactivate(get_current_state());
}
break;
default:
break;
}
} else {
// WORKAROUND: Ec is starting later so we have some errors before stable work.
std::this_thread::sleep_for(std::chrono::milliseconds(300));
context_ = std::make_unique<::grpc::ClientContext>();
kuka::ecs::v1::ObserveControlStateRequest observe_request;
std::unique_ptr<grpc::ClientReader<kuka::ecs::v1::CommandState>> reader(
stub_->ObserveControlState(context_.get(), observe_request));

kuka::ecs::v1::CommandState response;

while (reader->Read(&response)) {
switch (static_cast<int>(response.event())) {
case kuka::ecs::v1::CommandEvent::STOPPED:
case kuka::ecs::v1::CommandEvent::ERROR:
RCLCPP_INFO(get_logger(), "External control stopped");
terminate_ = true;
if (this->get_current_state().id() == State::PRIMARY_STATE_ACTIVE) {
this->deactivate();
} else if (this->get_current_state().id() == State::TRANSITION_STATE_ACTIVATING) {
// TODO(Svastits): this can be removed if rollback is implemented properly
this->on_deactivate(get_current_state());
}
break;
default:
break;
}
}
#endif
Expand All @@ -197,6 +197,11 @@ void RobotManagerNode::ObserveControl()
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
RobotManagerNode::on_activate(const rclcpp_lifecycle::State &)
{
#ifdef NON_MOCK_SETUP
if (context_ != nullptr) {
context_->TryCancel();
}
#endif
// Join observe thread, necessary if previous activation failed
if (observe_thread_.joinable()) {
observe_thread_.join();
Expand All @@ -215,8 +220,6 @@ RobotManagerNode::on_activate(const rclcpp_lifecycle::State &)
change_hardware_state_client_, hw_request, 0, 2000);
if (!hw_response || !hw_response->ok) {
RCLCPP_ERROR(get_logger(), "Could not activate hardware interface");
// 'unset config' does not exist, safe to return
terminate_ = true;
return FAILURE;
}

Expand All @@ -231,6 +234,8 @@ RobotManagerNode::on_activate(const rclcpp_lifecycle::State &)
);
if (!controller_response || !controller_response->ok) {
RCLCPP_ERROR(get_logger(), "Could not start joint state broadcaster");
// TODO(Svastits): this can be removed if rollback is implemented properly
this->on_deactivate(get_current_state());
return FAILURE;
}

Expand All @@ -248,6 +253,8 @@ RobotManagerNode::on_activate(const rclcpp_lifecycle::State &)
);
if (!controller_response || !controller_response->ok) {
RCLCPP_ERROR(get_logger(), "Could not activate controller");
// TODO(Svastits): this can be removed if rollback is implemented properly
this->on_deactivate(get_current_state());
return FAILURE;
}
RCLCPP_INFO(get_logger(), "Successfully activated controllers");
Expand All @@ -272,7 +279,7 @@ RobotManagerNode::on_deactivate(const rclcpp_lifecycle::State &)
hw_request->target_state.id = State::PRIMARY_STATE_INACTIVE;
auto hw_response =
kroshu_ros2_core::sendRequest<SetHardwareComponentState::Response>(
change_hardware_state_client_, hw_request, 0, 2000);
change_hardware_state_client_, hw_request, 0, 3000); // was not stable with 2000 ms timeout
if (!hw_response || !hw_response->ok) {
RCLCPP_ERROR(get_logger(), "Could not deactivate hardware interface");
return ERROR;
Expand Down
103 changes: 43 additions & 60 deletions kuka_rox_hardware_interface/src/rox_hardware_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ CallbackReturn KukaRoXHardwareInterface::on_init(const hardware_interface::Hardw
}
#endif

// if (mlockall(MCL_CURRENT | MCL_FUTURE) == -1) {
// RCLCPP_ERROR(rclcpp::get_logger("KukaRoXHardwareInterface"), "mlockall error");
// RCLCPP_ERROR(rclcpp::get_logger("KukaRoXHardwareInterface"), strerror(errno));
// return CallbackReturn::ERROR;
// }

RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "Init successful");

return CallbackReturn::SUCCESS;
Expand Down Expand Up @@ -133,8 +127,6 @@ export_command_interfaces()

CallbackReturn KukaRoXHardwareInterface::on_configure(const rclcpp_lifecycle::State &)
{
// TODO(Svastits): Set QoS profile here
// should be using another configuration file read from xacro
#ifdef NON_MOCK_SETUP
SetQoSProfileRequest request;
SetQoSProfileResponse response;
Expand Down Expand Up @@ -175,9 +167,12 @@ CallbackReturn KukaRoXHardwareInterface::on_activate(const rclcpp_lifecycle::Sta
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "Connecting to robot . . .");
// Reset timeout to catch first tick message
receive_timeout_ = std::chrono::milliseconds(100);
terminate_ = false;
control_signal_ext_.control_signal.stop_ipo = false;

#ifdef NON_MOCK_SETUP
if (context_ != nullptr) {
context_->TryCancel();
}
#endif
if (observe_thread_.joinable()) {
observe_thread_.join();
}
Expand Down Expand Up @@ -208,7 +203,6 @@ CallbackReturn KukaRoXHardwareInterface::on_activate(const rclcpp_lifecycle::Sta
.error_code() != grpc::StatusCode::OK)
{
RCLCPP_ERROR(rclcpp::get_logger("KukaRoXHardwareInterface"), "OpenControlChannel failed");
terminate_ = true;
return CallbackReturn::FAILURE;
}
#endif
Expand All @@ -226,13 +220,11 @@ CallbackReturn KukaRoXHardwareInterface::on_deactivate(
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

terminate_ = true;
#ifdef NON_MOCK_SETUP
if (context_ != nullptr) {
context_->TryCancel();
}
#endif

if (observe_thread_.joinable()) {
observe_thread_.join();
}
Expand Down Expand Up @@ -345,53 +337,44 @@ bool KukaRoXHardwareInterface::isActive() const
void KukaRoXHardwareInterface::ObserveControl()
{
#ifdef NON_MOCK_SETUP
RCLCPP_INFO(
rclcpp::get_logger(
"KukaRoXHardwareInterface"),
"Observe control");
while (!terminate_) {
context_ = std::make_unique<::grpc::ClientContext>();
ObserveControlStateRequest obs_control;
std::unique_ptr<grpc::ClientReader<CommandState>> reader(
stub_->ObserveControlState(context_.get(), obs_control));

CommandState response;

if (reader->Read(&response)) {
std::unique_lock<std::mutex> lck(observe_mutex_); // TODO(Svastits): is this necessary?
command_state_ = response;
RCLCPP_INFO(
rclcpp::get_logger(
"KukaRoXHardwareInterface"),
"Event streamed from external control service: %s", CommandEvent_Name(
response.event()).c_str());

switch (static_cast<int>(response.event())) {
case CommandEvent::COMMAND_READY:
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "Command accepted");
break;
case CommandEvent::SAMPLING:
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "External control is active");
is_active_ = true;
break;
case CommandEvent::STOPPED:
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "External control finished");
is_active_ = false;
break;
case CommandEvent::ERROR:
RCLCPP_ERROR(
rclcpp::get_logger(
"KukaRoXHardwareInterface"),
"External control stopped by an error");
RCLCPP_ERROR(rclcpp::get_logger("KukaRoXHardwareInterface"), response.message().c_str());
is_active_ = false;
break;
default:
break;
}
} else {
// WORKAROUND: Ec is starting later so we have some errors before stable work.
std::this_thread::sleep_for(std::chrono::milliseconds(300));
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "Observe control");
context_ = std::make_unique<::grpc::ClientContext>();
ObserveControlStateRequest obs_control;
std::unique_ptr<grpc::ClientReader<CommandState>> reader(
stub_->ObserveControlState(context_.get(), obs_control));

CommandState response;

while (reader->Read(&response)) {
command_state_ = response;
RCLCPP_INFO(
rclcpp::get_logger(
"KukaRoXHardwareInterface"),
"Event streamed from external control service: %s", CommandEvent_Name(
response.event()).c_str());

switch (static_cast<int>(response.event())) {
case CommandEvent::COMMAND_READY:
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "Command accepted");
break;
case CommandEvent::SAMPLING:
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "External control is active");
is_active_ = true;
break;
case CommandEvent::STOPPED:
RCLCPP_INFO(rclcpp::get_logger("KukaRoXHardwareInterface"), "External control finished");
is_active_ = false;
break;
case CommandEvent::ERROR:
RCLCPP_ERROR(
rclcpp::get_logger(
"KukaRoXHardwareInterface"),
"External control stopped by an error");
RCLCPP_ERROR(rclcpp::get_logger("KukaRoXHardwareInterface"), response.message().c_str());
is_active_ = false;
break;
default:
break;
}
}
#endif
Expand Down

0 comments on commit 6fab420

Please sign in to comment.