Skip to content

Commit

Permalink
[ros_bridge] Fix tork-a/rtmros_nextage#308
Browse files Browse the repository at this point in the history
  • Loading branch information
130s committed Apr 27, 2017
1 parent 0541637 commit 080e79c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 30 deletions.
12 changes: 2 additions & 10 deletions hironx_ros_bridge/scripts/hironx.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,15 @@
' this script, but can use RTM. To use ROS, do not forget' \
' to run rosbridge. How to do so? --> http://wiki.ros.org/rtmros_nextage/Tutorials/Operating%20Hiro%2C%20NEXTAGE%20OPEN'

RTC_LIST = [
['seq', "SequencePlayer"],
['sh', "StateHolder"],
['fk', "ForwardKinematics"],
['ic', "ImpedanceController"],
['el', "SoftErrorLimiter"],
# ['co', "CollisionDetector"],
['sc', "ServoController"],
['log', "DataLogger"],]
RTC_LIST = 'seq, sh, fk, ic, el, sc, log'

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='hiro command line interpreters')
parser.add_argument('--host', help='corba name server hostname')
parser.add_argument('--port', help='corba name server port number')
parser.add_argument('--modelfile', help='robot model file nmae')
parser.add_argument('--robot', help='robot modlule name (RobotHardware0 for real robot, Robot()')
parser.add_argument('--rtcs', help='RT components to activate. If nothing passed then default value will be used.')
parser.add_argument('--rtcs', help="RT components to activate. If nothing passed then default value will be used. Example: '{}'".format(RTC_LIST))
args, unknown = parser.parse_known_args()
unknown = [u for u in unknown if u[:2] != '__'] # filter out ros arguments

Expand Down
62 changes: 49 additions & 13 deletions hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class via the link above; nicely formatted api doc web page

HandGroups = {'rhand': [2, 3, 4, 5], 'lhand': [6, 7, 8, 9]}

_RTClist = [
_RTC_list = [
['seq', "SequencePlayer"],
['sh', "StateHolder"],
['fk', "ForwardKinematics"],
Expand All @@ -318,6 +318,9 @@ class via the link above; nicely formatted api doc web page
['log', "DataLogger"],
]

# List of the name of RT Components that hrpsys requires at minimum.
_RTC_NAME_MINREQ = ['seq', 'sh', 'fk']

# servo controller (grasper)
sc = None
sc_svc = None
Expand All @@ -328,7 +331,7 @@ class via the link above; nicely formatted api doc web page
"the function call was successful, since not " +
"all methods internally called return status")

def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist):
def init(self, robotname="HiroNX(Robot)0", url="", rtcs=None):
'''
Calls init from its superclass, which tries to connect RTCManager,
looks for ModelLoader, and starts necessary RTC components. Also runs
Expand All @@ -337,11 +340,10 @@ def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist):
@type robotname: str
@type url: str
@type rtcs: [[str, str]]
@param rtcs: List of list of RTC names. Each inner list consists of
'SHORTENED' name and the 'FULLNAME'.
@type rtcs: [str]
@param rtcs: List of abbreviated RTC names.
example: [['seq', "SequencePlayer"], ['sh', "StateHolder"],,,]
example: ['seq', 'sh',,,]
'''
# reload for hrpsys 315.1.8
print(self.configurator_name + "waiting ModelLoader")
Expand All @@ -367,7 +369,10 @@ def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist):
self.sensors = self.getSensors(url)

if rtcs:
self._RTClist = rtcs
# convert the list of abbreviated RTC names to the one that
# getRTCList wants.
self.getRTCList(rtcs)

# all([rtm.findRTC(rn[0], rtm.rootnc) for rn in self.getRTCList()]) # not working somehow...
if set([rn[0] for rn in self.getRTCList()]).issubset(set([x.name() for x in self.ms.get_components()])) :
print(self.configurator_name + "hrpsys components are already created and running")
Expand Down Expand Up @@ -450,24 +455,55 @@ def goInitial(self, tm=7, wait=True, init_pose_type=0):
self.seq_svc.waitInterpolationOfGroup(self.Groups[i][0])
return ret

def getRTCList(self):
def getRTCList(self, rtcs_str=None):
'''
@see: HrpsysConfigurator.getRTCList
@type rtcs_str: str
@param rtcs_str: A single str for a set of abbreviated names of RTCs,
each of which is comma-separated.
example: "seq, sh, fk, ic, el, sc, log"
@rtype [[str]]
@rerutrn List of available components. Each element consists of a list
@return List of available components. Each element consists of a list
of abbreviated and full names of the component.
@raise TypeError: When rtcs_str isn't a string.
@raise ValueError: When rtcs_str does not contain minimum
required RTCs.
'''
if hasattr(self, 'rmfo'):
if rtcs_str:
if not isinstance(rtcs_str, basestring):
raise TypeError('rtcs_str needs to be string.')
# Set a new list of RTCs
new_rtcs = []
# Separate by comma and remove whitespace.
rtcs_req_list = [x.strip() for x in rtcs_str.split(",")]
# Check if minimum required RTCs are passed.
if not all(x in rtcs_req_list for x in self._RTC_NAME_MINREQ):
raise ValueError('{} are required at minimum'.format(
self._RTC_NAME_MINREQ))
for rtc_requested in rtcs_req_list:
for elem in self._RTC_list:
if elem[0] == rtc_requested:
new_rtcs.append(elem)
break
self._RTC_list = new_rtcs

is_rmfo_initiated = False
# For some reason using built-in "any" method yields
# `TypeError: 'module' object is not callable`, so do the iteration.
for rtc_list in self._RTC_list:
if 'rmfo' in rtc_list:
is_rmfo_initiated = True
if hasattr(self, 'rmfo') and not is_rmfo_initiated:
self.ms.load("RemoveForceSensorLinkOffset")
self.ms.load("AbsoluteForceSensor")
if "RemoveForceSensorLinkOffset" in self.ms.get_factory_names():
self._RTClist.append(['rmfo', "RemoveForceSensorLinkOffset"])
self._RTC_list.append(['rmfo', "RemoveForceSensorLinkOffset"])
elif "AbsoluteForceSensor" in self.ms.get_factory_names():
self._RTClist.append(['rmfo', "AbsoluteForceSensor"])
self._RTC_list.append(['rmfo', "AbsoluteForceSensor"])
else:
print "Component rmfo is not loadable."
return self._RTClist
return self._RTC_list

# hand interface
# effort: 1~100[%]
Expand Down
34 changes: 27 additions & 7 deletions hironx_ros_bridge/test/test_hironx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,23 @@ class TestHiroClient(TestHiro):
['fk', "ForwardKinematics"],
['ic', "ImpedanceController"],
['el', "SoftErrorLimiter"],
# ['co', "CollisionDetector"],
['sc', "ServoController"],
['log', "DataLogger"],
# rmfo will be automatically added in getRTCList.
['rmfo', 'RemoveForceSensorLinkOffset']
]

_RTC_LIST_CUSTOM = [
['seq', "SequencePlayer"],
['sh', "StateHolder"],
['fk', "ForwardKinematics"],
['el', "SoftErrorLimiter"],
['co', "CollisionDetector"],
['log', "DataLogger"],
['rmfo', 'RemoveForceSensorLinkOffset']
]

def test_getRTCList(self):
self.assertListEqual(self.robot.getRTCList(), self._RTC_LIST)

def test_getRTCList_customrtcs(self):
def test_getRTCList_customrtcs_args_correct(self):
'''
Test when the RTC list was passed from the client.
Expand All @@ -73,10 +72,31 @@ def test_getRTCList_customrtcs(self):
which is not elegant but as of now I can't think of a better way.
'''
self.robot = HIRONX()
self.robot.init(rtcs=self._RTC_LIST_CUSTOM)

# Passing 1st elems from _RTC_LIST_CUSTOM, to init method that calls
# internally getRTCList.
self.robot.init(rtcs='seq, sh, fk')
self.assertListEqual(self.robot.getRTCList(), self._RTC_LIST_CUSTOM)

def test_getRTCList_customrtcs_args_wrong(self):
'''
Test when the RTC list was passed from the client, in wrong format.
'''
# Passing the list of RTCs falling short of requirement.
self.assertRaises(
ValueError, self.robot.getRTCList, rtcs_str='seq, sh')

# Passing 1st elems from _RTC_LIST_CUSTOM,
# but list is not the right type of arg.
#
## http://stackoverflow.com/questions/6103825/how-to-properly-use-unit-testings-assertraises-with-nonetype-objects
## <-- didn't work, TypeError still raised before assertion.
# with self.assertRaises(TypeError):
# self.robot.getRTCList(rtcs_str=['seq', 'sh', 'fk', 'el', 'co', 'log'])
## http://stackoverflow.com/a/6103930/577001
self.assertRaises(
TypeError, lambda: self.robot.getRTCList,
rtcs_str=['seq', 'sh', 'fk', 'el', 'co', 'log'])

if __name__ == '__main__':
import rostest
rostest.rosrun(PKG, 'test_hronx_client', TestHiroClient)

0 comments on commit 080e79c

Please sign in to comment.