Skip to content

Commit

Permalink
test: ros2 action tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejmajek committed Jan 15, 2025
1 parent c4061e8 commit 5ddb661
Showing 1 changed file with 179 additions and 2 deletions.
181 changes: 179 additions & 2 deletions tests/communication/ros2/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

import pytest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from action_msgs.msg import GoalStatus
from action_msgs.srv import CancelGoal
from nav2_msgs.action import NavigateToPose
from rclpy.action import ActionServer
from rclpy.action.client import ClientGoalHandle
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node
from std_msgs.msg import String
from std_srvs.srv import SetBool

from rai.communication.ros2.api import ROS2ServiceAPI, ROS2TopicAPI
from rai.communication.ros2.api import ROS2ActionAPI, ROS2ServiceAPI, ROS2TopicAPI


class ServiceServer(Node):
Expand Down Expand Up @@ -51,6 +56,32 @@ def handle_test_message(self, msg: String) -> None:
self.received_messages.append(msg)


class ActionServer_(Node):
def __init__(self, action_name: str):
super().__init__("test_action_server")
self.action_server = ActionServer(
self,
action_type=NavigateToPose,
action_name=action_name,
execute_callback=self.handle_test_action,
)

def handle_test_action(
self, goal_handle: ClientGoalHandle
) -> NavigateToPose.Result:

for i in range(1, 11):
feedback_msg = NavigateToPose.Feedback(distance_remaining=10.0 / i)
goal_handle.publish_feedback(feedback_msg)
time.sleep(0.01)

goal_handle.succeed()

result = NavigateToPose.Result()
result.error_code = NavigateToPose.Result.NONE
return result


class MessagePublisher(Node):
def __init__(self, topic: str):
super().__init__("test_message_publisher")
Expand Down Expand Up @@ -332,3 +363,149 @@ def test_ros2_service_single_call_wrong_service_name(
)
finally:
shutdown_executors_and_threads(executors, threads)


def test_ros2_action_send_goal(ros_setup: None, request: pytest.FixtureRequest) -> None:
action_name = f"{request.node.originalname}_action" # type: ignore
node_name = f"{request.node.originalname}_node" # type: ignore
action_server = ActionServer_(action_name)
node = Node(node_name)
executors, threads = single_threaded_spinner([action_server, node])

try:
action_api = ROS2ActionAPI(node)
accepted, handle = action_api.send_goal(
action_name, "nav2_msgs/action/NavigateToPose", {}
)

assert accepted
assert handle != ""
finally:
shutdown_executors_and_threads(executors, threads)


def test_ros2_action_send_goal_get_result(
ros_setup: None, request: pytest.FixtureRequest
) -> None:
action_name = f"{request.node.originalname}_action" # type: ignore
node_name = f"{request.node.originalname}_node" # type: ignore
action_server = ActionServer_(action_name)
node = Node(node_name)
executors, threads = single_threaded_spinner([action_server, node])

try:
action_api = ROS2ActionAPI(node)
accepted, handle = action_api.send_goal(
action_name, "nav2_msgs/action/NavigateToPose", {}
)
import time

wait_time = 1.0
start_time = time.perf_counter()
while not action_api.is_goal_done(handle):
time.sleep(0.01)
if time.perf_counter() - start_time > wait_time:
raise TimeoutError("Goal not done")
result = action_api.get_result(handle)

assert result.status == GoalStatus.STATUS_SUCCEEDED
assert accepted
assert handle != ""
finally:
shutdown_executors_and_threads(executors, threads)


def test_ros2_action_send_goal_wrong_action_type(
ros_setup: None, request: pytest.FixtureRequest
) -> None:
action_name = f"{request.node.originalname}_action" # type: ignore
node_name = f"{request.node.originalname}_node" # type: ignore
action_server = ActionServer_(action_name)
node = Node(node_name)
executors, threads = single_threaded_spinner([action_server, node])

try:
action_api = ROS2ActionAPI(node)
with pytest.raises(Exception):
action_api.send_goal(
action_name, "nav2_msgs/action/NavigateToPose/WrongActionType", {}
)
finally:
shutdown_executors_and_threads(executors, threads)


def test_ros2_action_send_goal_wrong_action_name(
ros_setup: None, request: pytest.FixtureRequest
) -> None:
action_name = f"{request.node.originalname}_action" # type: ignore
node_name = f"{request.node.originalname}_node" # type: ignore
action_server = ActionServer_(action_name)
node = Node(node_name)
executors, threads = single_threaded_spinner([action_server, node])

try:
action_api = ROS2ActionAPI(node)
accepted, handle = action_api.send_goal(
"WrongActionName", "nav2_msgs/action/NavigateToPose", {}
)
assert not accepted
assert handle == ""
finally:
shutdown_executors_and_threads(executors, threads)


def test_ros2_action_send_goal_get_feedback(
ros_setup: None, request: pytest.FixtureRequest
) -> None:
action_name = f"{request.node.originalname}_action" # type: ignore
node_name = f"{request.node.originalname}_node" # type: ignore
action_server = ActionServer_(action_name)
node = Node(node_name)
executors, threads = single_threaded_spinner([action_server])
executor = MultiThreadedExecutor()
executor.add_node(node)
thread = threading.Thread(target=executor.spin)
thread.start()
action_api = ROS2ActionAPI(node)
accepted, handle = action_api.send_goal(
action_name, "nav2_msgs/action/NavigateToPose", {}
)
assert accepted
assert handle != ""
time.sleep(0.2)
try:
feedback = action_api.get_feedback(handle)
distances = [x.distance_remaining for x in feedback]
assert sorted(distances, reverse=True) == distances, "Wrong message order"
assert len(distances) == 10, "Wrong number of messages"
finally:
shutdown_executors_and_threads(executors, threads)


def test_ros2_action_send_goal_terminate_goal(
ros_setup: None, request: pytest.FixtureRequest
) -> None:
action_name = f"{request.node.originalname}_action" # type: ignore
node_name = f"{request.node.originalname}_node" # type: ignore
action_server = ActionServer_(action_name)
node = Node(node_name)
executors, threads = single_threaded_spinner([action_server])
executor = MultiThreadedExecutor()
executor.add_node(node)
thread = threading.Thread(target=executor.spin)
thread.start()
action_api = ROS2ActionAPI(node)
try:
accepted, handle = action_api.send_goal(
action_name, "nav2_msgs/action/NavigateToPose", {}
)
assert accepted
assert handle != ""
feedbacks_before = action_api.get_feedback(handle)
response = action_api.terminate_goal(handle)
assert response.return_code == CancelGoal.Response.ERROR_GOAL_TERMINATED # type: ignore
assert action_api.is_goal_done(handle)
feedbacks_after = action_api.get_feedback(handle)
assert len(feedbacks_before) == len(feedbacks_after)
finally:
shutdown_executors_and_threads(executors, threads)

0 comments on commit 5ddb661

Please sign in to comment.