Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor spawner to be able to reuse code for ros2controlcli #1661

Merged
8 changes: 8 additions & 0 deletions controller_manager/controller_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
set_hardware_component_state,
switch_controllers,
unload_controller,
get_parameter_from_param_file,
set_controller_parameters,
set_controller_parameters_from_param_file,
bcolors,
)

__all__ = [
Expand All @@ -36,4 +40,8 @@
"set_hardware_component_state",
"switch_controllers",
"unload_controller",
"get_parameter_from_param_file",
"set_controller_parameters",
"set_controller_parameters_from_param_file",
"bcolors",
]
117 changes: 117 additions & 0 deletions controller_manager/controller_manager/controller_manager_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,29 @@
)

import rclpy
import yaml
from rcl_interfaces.msg import Parameter

# @note: The versions conditioning is added here to support the source-compatibility with Humble
# The `get_parameter_value` function is moved to `rclpy.parameter` module from `ros2param.api` module from version 3.6.0
try:
from rclpy.parameter import get_parameter_value
except ImportError:
from ros2param.api import get_parameter_value
from ros2param.api import call_set_parameters


# from https://stackoverflow.com/a/287944
class bcolors:
MAGENTA = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


class ServiceNotFoundError(Exception):
Expand Down Expand Up @@ -220,3 +243,97 @@ def unload_controller(node, controller_manager_name, controller_name, service_ti
request,
service_timeout,
)


def get_parameter_from_param_file(controller_name, namespace, parameter_file, parameter_name):
with open(parameter_file) as f:
namespaced_controller = (
controller_name if namespace == "/" else f"{namespace}/{controller_name}"
)
parameters = yaml.safe_load(f)
if namespaced_controller in parameters:
value = parameters[namespaced_controller]
if not isinstance(value, dict) or "ros__parameters" not in value:
raise RuntimeError(
f"YAML file : {parameter_file} is not a valid ROS parameter file for controller : {namespaced_controller}"
)
if parameter_name in parameters[namespaced_controller]["ros__parameters"]:
return parameters[namespaced_controller]["ros__parameters"][parameter_name]
else:
return None
else:
return None


def set_controller_parameters(
node, controller_manager_name, controller_name, parameter_name, parameter_value
):
parameter = Parameter()
parameter.name = controller_name + "." + parameter_name
parameter_string = str(parameter_value)
parameter.value = get_parameter_value(string_value=parameter_string)

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Setting controller param "'
+ parameter_name
+ '" to "'
+ parameter_string
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set controller param "'
+ parameter_name
+ '" to "'
+ parameter_string
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
return False
return True


def set_controller_parameters_from_param_file(
node, controller_manager_name, controller_name, parameter_file, namespace=None
):
if parameter_file:
spawner_namespace = namespace if namespace else node.get_namespace()
set_controller_parameters(
node, controller_manager_name, controller_name, "param_file", parameter_file
)

controller_type = get_parameter_from_param_file(
controller_name, spawner_namespace, parameter_file, "type"
)
if controller_type:
if not set_controller_parameters(
node, controller_manager_name, controller_name, "type", controller_type
):
return False

fallback_controllers = get_parameter_from_param_file(
controller_name, spawner_namespace, parameter_file, "fallback_controllers"
)
if fallback_controllers:
if not set_controller_parameters(
node,
controller_manager_name,
controller_name,
"fallback_controllers",
fallback_controllers,
):
return False
return True
14 changes: 1 addition & 13 deletions controller_manager/controller_manager/hardware_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from controller_manager import (
list_hardware_components,
set_hardware_component_state,
bcolors,
)
from controller_manager.controller_manager_services import ServiceNotFoundError

Expand All @@ -28,19 +29,6 @@
from rclpy.signals import SignalHandlerOptions


# from https://stackoverflow.com/a/287944
class bcolors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


def first_match(iterable, predicate):
return next((n for n in iterable if predicate(n)), None)

Expand Down
165 changes: 9 additions & 156 deletions controller_manager/controller_manager/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,21 @@
import sys
import time
import warnings
import yaml

from controller_manager import (
configure_controller,
list_controllers,
load_controller,
switch_controllers,
unload_controller,
set_controller_parameters_from_param_file,
bcolors,
)
from controller_manager.controller_manager_services import ServiceNotFoundError

import rclpy
from rcl_interfaces.msg import Parameter
from rclpy.node import Node

# @note: The versions conditioning is added here to support the source-compatibility with Humble
# The `get_parameter_value` function is moved to `rclpy.parameter` module from `ros2param.api` module from version 3.6.0
try:
from rclpy.parameter import get_parameter_value
except ImportError:
from ros2param.api import get_parameter_value
from rclpy.signals import SignalHandlerOptions
from ros2param.api import call_set_parameters

# from https://stackoverflow.com/a/287944


class bcolors:
MAGENTA = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


def first_match(iterable, predicate):
Expand Down Expand Up @@ -87,24 +65,6 @@ def is_controller_loaded(node, controller_manager, controller_name, service_time
return any(c.name == controller_name for c in controllers)


def get_parameter_from_param_file(controller_name, namespace, parameter_file, parameter_name):
with open(parameter_file) as f:
namespaced_controller = (
controller_name if namespace == "/" else f"{namespace}/{controller_name}"
)
parameters = yaml.safe_load(f)
if namespaced_controller in parameters:
value = parameters[namespaced_controller]
if not isinstance(value, dict) or "ros__parameters" not in value:
raise RuntimeError(
f"YAML file : {parameter_file} is not a valid ROS parameter file for controller : {namespaced_controller}"
)
if parameter_name in parameters[namespaced_controller]["ros__parameters"]:
return parameters[namespaced_controller]["ros__parameters"][parameter_name]
else:
return None


def main(args=None):
rclpy.init(args=args, signal_handler_options=SignalHandlerOptions.NO)
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -162,14 +122,6 @@ def main(args=None):
action="store_true",
required=False,
)
parser.add_argument(
"--fallback_controllers",
help="Fallback controllers list are activated as a fallback strategy when the"
" spawned controllers fail. When the argument is provided, it takes precedence over"
" the fallback_controllers list in the param file",
default=None,
nargs="+",
)
bmagyar marked this conversation as resolved.
Show resolved Hide resolved

command_line_args = rclpy.utilities.remove_ros_args(args=sys.argv)[1:]
args = parser.parse_args(command_line_args)
Expand Down Expand Up @@ -210,7 +162,6 @@ def main(args=None):

try:
for controller_name in controller_names:
fallback_controllers = args.fallback_controllers

if is_controller_loaded(
node, controller_manager_name, controller_name, controller_manager_timeout
Expand All @@ -221,112 +172,14 @@ def main(args=None):
+ bcolors.ENDC
)
else:
controller_type = (
None
if param_file is None
else get_parameter_from_param_file(
controller_name, spawner_namespace, param_file, "type"
)
)
if controller_type:
parameter = Parameter()
parameter.name = controller_name + ".type"
parameter.value = get_parameter_value(string_value=controller_type)

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Set controller type to "'
+ controller_type
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set controller type to "'
+ controller_type
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
return 1

if param_file:
parameter = Parameter()
parameter.name = controller_name + ".params_file"
parameter.value = get_parameter_value(string_value=param_file)

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Set controller params file to "'
+ param_file
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set controller params file to "'
+ param_file
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
return 1

if not fallback_controllers and param_file:
fallback_controllers = get_parameter_from_param_file(
controller_name, spawner_namespace, param_file, "fallback_controllers"
)

if fallback_controllers:
parameter = Parameter()
parameter.name = controller_name + ".fallback_controllers"
parameter.value = get_parameter_value(string_value=str(fallback_controllers))

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Setting fallback_controllers to ["'
+ ",".join(fallback_controllers)
+ '"] for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set fallback_controllers to ["'
+ ",".join(fallback_controllers)
+ '"] for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
if not set_controller_parameters_from_param_file(
node,
controller_manager_name,
controller_name,
param_file,
spawner_namespace,
):
return 1

ret = load_controller(node, controller_manager_name, controller_name)
Expand Down
Loading
Loading