From ba3dd3e6618a64405c1410bd3b7a5e9bad639d51 Mon Sep 17 00:00:00 2001 From: Stefan Glaser Date: Wed, 3 Apr 2019 14:26:31 +0200 Subject: [PATCH 1/3] Added option to specify transport hints. * Added TransportHints class, currently only containing a TCP-NO-DELAY option. * Added option to specify transport hints when subscribing to a topic. * Added transport hints to TopicDeclaration class. --- .../org/ros/internal/node/DefaultNode.java | 17 ++++++-- .../node/response/TopicListResultFactory.java | 2 +- .../internal/node/topic/TopicDeclaration.java | 18 ++++++-- .../main/java/org/ros/node/ConnectedNode.java | 19 ++++++++ .../org/ros/node/topic/TransportHints.java | 43 +++++++++++++++++++ 5 files changed, 91 insertions(+), 8 deletions(-) create mode 100644 rosjava/src/main/java/org/ros/node/topic/TransportHints.java diff --git a/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java b/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java index 9f1a1bdc1..ca3d1fc11 100644 --- a/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java +++ b/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java @@ -65,6 +65,7 @@ import org.ros.node.topic.DefaultSubscriberListener; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; +import org.ros.node.topic.TransportHints; import org.ros.time.ClockTopicTimeProvider; import org.ros.time.TimeProvider; @@ -283,7 +284,7 @@ public Publisher newPublisher(GraphName topicName, String messageType) { TopicDescription topicDescription = nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType); TopicDeclaration topicDeclaration = - TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription); + TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, null); org.ros.message.MessageSerializer serializer = newMessageSerializer(messageType); return publisherFactory.newOrExisting(topicDeclaration, serializer); } @@ -295,11 +296,16 @@ public Publisher newPublisher(String topicName, String messageType) { @Override public Subscriber newSubscriber(GraphName topicName, String messageType) { + return newSubscriber(topicName, messageType, null); + } + + @Override + public Subscriber newSubscriber(GraphName topicName, String messageType, TransportHints transportHints) { GraphName resolvedTopicName = resolveName(topicName); TopicDescription topicDescription = nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType); TopicDeclaration topicDeclaration = - TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription); + TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, transportHints); MessageDeserializer deserializer = newMessageDeserializer(messageType); Subscriber subscriber = subscriberFactory.newOrExisting(topicDeclaration, deserializer); return subscriber; @@ -307,7 +313,12 @@ public Subscriber newSubscriber(GraphName topicName, String messageType) @Override public Subscriber newSubscriber(String topicName, String messageType) { - return newSubscriber(GraphName.of(topicName), messageType); + return newSubscriber(GraphName.of(topicName), messageType, null); + } + + @Override + public Subscriber newSubscriber(String topicName, String messageType, TransportHints transportHints) { + return newSubscriber(GraphName.of(topicName), messageType, transportHints); } @Override diff --git a/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java b/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java index 6422ce0e9..eb0ede229 100644 --- a/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java @@ -43,7 +43,7 @@ public List newFromValue(Object value) { String name = (String) ((Object[]) topic)[0]; String type = (String) ((Object[]) topic)[1]; descriptions.add(TopicDeclaration.newFromTopicName(GraphName.of(name), new TopicDescription(type, null, - null))); + null), null)); } return descriptions; } diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java b/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java index c55603fb3..4908c6110 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java @@ -23,6 +23,7 @@ import org.ros.internal.transport.ConnectionHeader; import org.ros.internal.transport.ConnectionHeaderFields; import org.ros.namespace.GraphName; +import org.ros.node.topic.TransportHints; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ public class TopicDeclaration { private final TopicIdentifier topicIdentifier; private final TopicDescription topicDescription; + private final TransportHints transportHints; /** * @param header @@ -49,19 +51,26 @@ public static TopicDeclaration newFromHeader(Map header) { String definition = header.get(ConnectionHeaderFields.MESSAGE_DEFINITION); String md5Checksum = header.get(ConnectionHeaderFields.MD5_CHECKSUM); TopicDescription topicDescription = new TopicDescription(type, definition, md5Checksum); - return new TopicDeclaration(new TopicIdentifier(name), topicDescription); + boolean tcpNoDelay = "1".equals(header.get(ConnectionHeaderFields.TCP_NODELAY)); + return new TopicDeclaration(new TopicIdentifier(name), topicDescription, new TransportHints(tcpNoDelay)); } public static TopicDeclaration newFromTopicName(GraphName topicName, - TopicDescription topicDescription) { - return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription); + TopicDescription topicDescription, TransportHints transportHints) { + return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription, transportHints); } - public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription) { + public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription, TransportHints transportHints) { Preconditions.checkNotNull(topicIdentifier); Preconditions.checkNotNull(topicDescription); this.topicIdentifier = topicIdentifier; this.topicDescription = topicDescription; + + if (transportHints != null) { + this.transportHints = transportHints; + } else { + this.transportHints = new TransportHints(); + } } public TopicIdentifier getIdentifier() { @@ -84,6 +93,7 @@ public ConnectionHeader toConnectionHeader() { topicDescription.getDefinition()); connectionHeader.addField(ConnectionHeaderFields.MD5_CHECKSUM, topicDescription.getMd5Checksum()); + connectionHeader.addField(ConnectionHeaderFields.TCP_NODELAY, transportHints.getTcpNoDelay() ? "1" : "0"); return connectionHeader; } diff --git a/rosjava/src/main/java/org/ros/node/ConnectedNode.java b/rosjava/src/main/java/org/ros/node/ConnectedNode.java index 34d6994c1..4dfa6606d 100644 --- a/rosjava/src/main/java/org/ros/node/ConnectedNode.java +++ b/rosjava/src/main/java/org/ros/node/ConnectedNode.java @@ -26,6 +26,7 @@ import org.ros.node.service.ServiceServer; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; +import org.ros.node.topic.TransportHints; import java.net.URI; @@ -82,11 +83,29 @@ public interface ConnectedNode extends Node { */ Subscriber newSubscriber(GraphName topicName, String messageType); + /** + * @param + * the message type to create the {@link Subscriber} for + * @param topicName + * the topic name to be subscribed to, this will be auto resolved + * @param messageType + * the message data type (e.g. "std_msgs/String") + * @param transportHints + * the transport hints + * @return a {@link Subscriber} for the specified topic + */ + Subscriber newSubscriber(GraphName topicName, String messageType, TransportHints transportHints); + /** * @see #newSubscriber(GraphName, String) */ Subscriber newSubscriber(String topicName, String messageType); + /** + * @see #newSubscriber(GraphName, String, TransportHints) + */ + Subscriber newSubscriber(String topicName, String messageType, TransportHints transportHints); + /** * Create a new {@link ServiceServer}. * diff --git a/rosjava/src/main/java/org/ros/node/topic/TransportHints.java b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java new file mode 100644 index 000000000..a05b0e9c6 --- /dev/null +++ b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java @@ -0,0 +1,43 @@ +package org.ros.node.topic; + +import java.util.Map; + +import org.ros.internal.transport.ConnectionHeaderFields; +import org.ros.node.ConnectedNode; + +import com.google.common.collect.Maps; + + +/** + * Provides a way of specifying network transport hints to + * {@link ConnectedNode#newSubscriber(String, String)} and + * {@link ConnectedNode#newSubscriber(org.ros.namespace.GraphName, String)}. + * + * @author stefan.glaser@hs-offenburg.de (Stefan Glaser) + */ +public class TransportHints { + + Map options; + + public TransportHints() { + this.options = Maps.newConcurrentMap(); + } + + public TransportHints(boolean tcpNoDelay) { + tcpNoDelay(tcpNoDelay); + } + + public TransportHints tcpNoDelay(boolean nodelay) { + options.put(ConnectionHeaderFields.TCP_NODELAY, nodelay ? "1" : "0"); + + return this; + } + + public boolean getTcpNoDelay() { + return "1".equals(options.get(ConnectionHeaderFields.TCP_NODELAY)); + } + + public Map getOptions() { + return options; + } +} From 7ae94f608a70a757b743ad777c9e62fe9fa699dc Mon Sep 17 00:00:00 2001 From: Stefan Glaser Date: Wed, 3 Apr 2019 14:51:31 +0200 Subject: [PATCH 2/3] Process TCP-NO-DELAY option for incoming subscriber connections. * Check incoming subscriber connection requests for the TCP-NO-DELAY header field and set the TCP-NO-DELAY option on the corresponding channel accordingly. --- .../ros/internal/transport/tcp/TcpServerHandshakeHandler.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java index f5f1a149e..6f534d585 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java +++ b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java @@ -96,6 +96,10 @@ private void handleSubscriberHandshake(ChannelHandlerContext ctx, MessageEvent e DefaultPublisher publisher = topicParticipantManager.getPublisher(topicName); ChannelBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader); Channel channel = ctx.getChannel(); + if (incomingConnectionHeader.hasField(ConnectionHeaderFields.TCP_NODELAY)) { + boolean tcpNoDelay = "1".equals(incomingConnectionHeader.getField(ConnectionHeaderFields.TCP_NODELAY)); + channel.getConfig().setOption("tcpNoDelay", tcpNoDelay); + } ChannelFuture future = channel.write(outgoingBuffer).await(); if (!future.isSuccess()) { throw new RosRuntimeException(future.getCause()); From 391a0a922164b8721f0c3893c97e10eab8bc920f Mon Sep 17 00:00:00 2001 From: Stefan Glaser Date: Sun, 7 Apr 2019 13:34:24 +0200 Subject: [PATCH 3/3] Simplified TransportHints class. * Replaced internal map with simple boolean. * Fixed initialization bug. * Fixed code style. --- .../org/ros/node/topic/TransportHints.java | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/rosjava/src/main/java/org/ros/node/topic/TransportHints.java b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java index a05b0e9c6..97ac02ede 100644 --- a/rosjava/src/main/java/org/ros/node/topic/TransportHints.java +++ b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java @@ -1,43 +1,33 @@ package org.ros.node.topic; -import java.util.Map; - -import org.ros.internal.transport.ConnectionHeaderFields; import org.ros.node.ConnectedNode; -import com.google.common.collect.Maps; - /** * Provides a way of specifying network transport hints to - * {@link ConnectedNode#newSubscriber(String, String)} and - * {@link ConnectedNode#newSubscriber(org.ros.namespace.GraphName, String)}. + * {@link ConnectedNode#newSubscriber(org.ros.namespace.GraphName, String, TransportHints)} and + * {@link ConnectedNode#newSubscriber(String, String, TransportHints)}. * * @author stefan.glaser@hs-offenburg.de (Stefan Glaser) */ public class TransportHints { - Map options; + private boolean tcpNoDelay; - public TransportHints() { - this.options = Maps.newConcurrentMap(); + public TransportHints() { + this(false); } public TransportHints(boolean tcpNoDelay) { - tcpNoDelay(tcpNoDelay); + this.tcpNoDelay = tcpNoDelay; } - public TransportHints tcpNoDelay(boolean nodelay) { - options.put(ConnectionHeaderFields.TCP_NODELAY, nodelay ? "1" : "0"); - + public TransportHints tcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; return this; } public boolean getTcpNoDelay() { - return "1".equals(options.get(ConnectionHeaderFields.TCP_NODELAY)); - } - - public Map getOptions() { - return options; + return tcpNoDelay; } }