diff --git a/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java b/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java index 9f1a1bdc1..ca3d1fc11 100644 --- a/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java +++ b/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java @@ -65,6 +65,7 @@ import org.ros.node.topic.DefaultSubscriberListener; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; +import org.ros.node.topic.TransportHints; import org.ros.time.ClockTopicTimeProvider; import org.ros.time.TimeProvider; @@ -283,7 +284,7 @@ public Publisher newPublisher(GraphName topicName, String messageType) { TopicDescription topicDescription = nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType); TopicDeclaration topicDeclaration = - TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription); + TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, null); org.ros.message.MessageSerializer serializer = newMessageSerializer(messageType); return publisherFactory.newOrExisting(topicDeclaration, serializer); } @@ -295,11 +296,16 @@ public Publisher newPublisher(String topicName, String messageType) { @Override public Subscriber newSubscriber(GraphName topicName, String messageType) { + return newSubscriber(topicName, messageType, null); + } + + @Override + public Subscriber newSubscriber(GraphName topicName, String messageType, TransportHints transportHints) { GraphName resolvedTopicName = resolveName(topicName); TopicDescription topicDescription = nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType); TopicDeclaration topicDeclaration = - TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription); + TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, transportHints); MessageDeserializer deserializer = newMessageDeserializer(messageType); Subscriber subscriber = subscriberFactory.newOrExisting(topicDeclaration, deserializer); return subscriber; @@ -307,7 +313,12 @@ public Subscriber newSubscriber(GraphName topicName, String messageType) @Override public Subscriber newSubscriber(String topicName, String messageType) { - return newSubscriber(GraphName.of(topicName), messageType); + return newSubscriber(GraphName.of(topicName), messageType, null); + } + + @Override + public Subscriber newSubscriber(String topicName, String messageType, TransportHints transportHints) { + return newSubscriber(GraphName.of(topicName), messageType, transportHints); } @Override diff --git a/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java b/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java index 6422ce0e9..eb0ede229 100644 --- a/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java @@ -43,7 +43,7 @@ public List newFromValue(Object value) { String name = (String) ((Object[]) topic)[0]; String type = (String) ((Object[]) topic)[1]; descriptions.add(TopicDeclaration.newFromTopicName(GraphName.of(name), new TopicDescription(type, null, - null))); + null), null)); } return descriptions; } diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java b/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java index c55603fb3..4908c6110 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java @@ -23,6 +23,7 @@ import org.ros.internal.transport.ConnectionHeader; import org.ros.internal.transport.ConnectionHeaderFields; import org.ros.namespace.GraphName; +import org.ros.node.topic.TransportHints; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ public class TopicDeclaration { private final TopicIdentifier topicIdentifier; private final TopicDescription topicDescription; + private final TransportHints transportHints; /** * @param header @@ -49,19 +51,26 @@ public static TopicDeclaration newFromHeader(Map header) { String definition = header.get(ConnectionHeaderFields.MESSAGE_DEFINITION); String md5Checksum = header.get(ConnectionHeaderFields.MD5_CHECKSUM); TopicDescription topicDescription = new TopicDescription(type, definition, md5Checksum); - return new TopicDeclaration(new TopicIdentifier(name), topicDescription); + boolean tcpNoDelay = "1".equals(header.get(ConnectionHeaderFields.TCP_NODELAY)); + return new TopicDeclaration(new TopicIdentifier(name), topicDescription, new TransportHints(tcpNoDelay)); } public static TopicDeclaration newFromTopicName(GraphName topicName, - TopicDescription topicDescription) { - return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription); + TopicDescription topicDescription, TransportHints transportHints) { + return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription, transportHints); } - public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription) { + public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription, TransportHints transportHints) { Preconditions.checkNotNull(topicIdentifier); Preconditions.checkNotNull(topicDescription); this.topicIdentifier = topicIdentifier; this.topicDescription = topicDescription; + + if (transportHints != null) { + this.transportHints = transportHints; + } else { + this.transportHints = new TransportHints(); + } } public TopicIdentifier getIdentifier() { @@ -84,6 +93,7 @@ public ConnectionHeader toConnectionHeader() { topicDescription.getDefinition()); connectionHeader.addField(ConnectionHeaderFields.MD5_CHECKSUM, topicDescription.getMd5Checksum()); + connectionHeader.addField(ConnectionHeaderFields.TCP_NODELAY, transportHints.getTcpNoDelay() ? "1" : "0"); return connectionHeader; } diff --git a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java index f5f1a149e..6f534d585 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java +++ b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java @@ -96,6 +96,10 @@ private void handleSubscriberHandshake(ChannelHandlerContext ctx, MessageEvent e DefaultPublisher publisher = topicParticipantManager.getPublisher(topicName); ChannelBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader); Channel channel = ctx.getChannel(); + if (incomingConnectionHeader.hasField(ConnectionHeaderFields.TCP_NODELAY)) { + boolean tcpNoDelay = "1".equals(incomingConnectionHeader.getField(ConnectionHeaderFields.TCP_NODELAY)); + channel.getConfig().setOption("tcpNoDelay", tcpNoDelay); + } ChannelFuture future = channel.write(outgoingBuffer).await(); if (!future.isSuccess()) { throw new RosRuntimeException(future.getCause()); diff --git a/rosjava/src/main/java/org/ros/node/ConnectedNode.java b/rosjava/src/main/java/org/ros/node/ConnectedNode.java index 34d6994c1..4dfa6606d 100644 --- a/rosjava/src/main/java/org/ros/node/ConnectedNode.java +++ b/rosjava/src/main/java/org/ros/node/ConnectedNode.java @@ -26,6 +26,7 @@ import org.ros.node.service.ServiceServer; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; +import org.ros.node.topic.TransportHints; import java.net.URI; @@ -82,11 +83,29 @@ public interface ConnectedNode extends Node { */ Subscriber newSubscriber(GraphName topicName, String messageType); + /** + * @param + * the message type to create the {@link Subscriber} for + * @param topicName + * the topic name to be subscribed to, this will be auto resolved + * @param messageType + * the message data type (e.g. "std_msgs/String") + * @param transportHints + * the transport hints + * @return a {@link Subscriber} for the specified topic + */ + Subscriber newSubscriber(GraphName topicName, String messageType, TransportHints transportHints); + /** * @see #newSubscriber(GraphName, String) */ Subscriber newSubscriber(String topicName, String messageType); + /** + * @see #newSubscriber(GraphName, String, TransportHints) + */ + Subscriber newSubscriber(String topicName, String messageType, TransportHints transportHints); + /** * Create a new {@link ServiceServer}. * diff --git a/rosjava/src/main/java/org/ros/node/topic/TransportHints.java b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java new file mode 100644 index 000000000..97ac02ede --- /dev/null +++ b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java @@ -0,0 +1,33 @@ +package org.ros.node.topic; + +import org.ros.node.ConnectedNode; + + +/** + * Provides a way of specifying network transport hints to + * {@link ConnectedNode#newSubscriber(org.ros.namespace.GraphName, String, TransportHints)} and + * {@link ConnectedNode#newSubscriber(String, String, TransportHints)}. + * + * @author stefan.glaser@hs-offenburg.de (Stefan Glaser) + */ +public class TransportHints { + + private boolean tcpNoDelay; + + public TransportHints() { + this(false); + } + + public TransportHints(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public TransportHints tcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + return this; + } + + public boolean getTcpNoDelay() { + return tcpNoDelay; + } +}