Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

Adding TCP-NO-DELAY transport hint #295

Merged
merged 3 commits into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions rosjava/src/main/java/org/ros/internal/node/DefaultNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.ros.node.topic.DefaultSubscriberListener;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
import org.ros.node.topic.TransportHints;
import org.ros.time.ClockTopicTimeProvider;
import org.ros.time.TimeProvider;

Expand Down Expand Up @@ -283,7 +284,7 @@ public <T> Publisher<T> newPublisher(GraphName topicName, String messageType) {
TopicDescription topicDescription =
nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
TopicDeclaration topicDeclaration =
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, null);
org.ros.message.MessageSerializer<T> serializer = newMessageSerializer(messageType);
return publisherFactory.newOrExisting(topicDeclaration, serializer);
}
Expand All @@ -295,19 +296,29 @@ public <T> Publisher<T> newPublisher(String topicName, String messageType) {

@Override
public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType) {
return newSubscriber(topicName, messageType, null);
}

@Override
public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType, TransportHints transportHints) {
GraphName resolvedTopicName = resolveName(topicName);
TopicDescription topicDescription =
nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
TopicDeclaration topicDeclaration =
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, transportHints);
MessageDeserializer<T> deserializer = newMessageDeserializer(messageType);
Subscriber<T> subscriber = subscriberFactory.newOrExisting(topicDeclaration, deserializer);
return subscriber;
}

@Override
public <T> Subscriber<T> newSubscriber(String topicName, String messageType) {
return newSubscriber(GraphName.of(topicName), messageType);
return newSubscriber(GraphName.of(topicName), messageType, null);
}

@Override
public <T> Subscriber<T> newSubscriber(String topicName, String messageType, TransportHints transportHints) {
return newSubscriber(GraphName.of(topicName), messageType, transportHints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public List<TopicDeclaration> newFromValue(Object value) {
String name = (String) ((Object[]) topic)[0];
String type = (String) ((Object[]) topic)[1];
descriptions.add(TopicDeclaration.newFromTopicName(GraphName.of(name), new TopicDescription(type, null,
null)));
null), null));
}
return descriptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.ros.internal.transport.ConnectionHeader;
import org.ros.internal.transport.ConnectionHeaderFields;
import org.ros.namespace.GraphName;
import org.ros.node.topic.TransportHints;

import java.util.List;
import java.util.Map;
Expand All @@ -36,6 +37,7 @@ public class TopicDeclaration {

private final TopicIdentifier topicIdentifier;
private final TopicDescription topicDescription;
private final TransportHints transportHints;

/**
* @param header
Expand All @@ -49,19 +51,26 @@ public static TopicDeclaration newFromHeader(Map<String, String> header) {
String definition = header.get(ConnectionHeaderFields.MESSAGE_DEFINITION);
String md5Checksum = header.get(ConnectionHeaderFields.MD5_CHECKSUM);
TopicDescription topicDescription = new TopicDescription(type, definition, md5Checksum);
return new TopicDeclaration(new TopicIdentifier(name), topicDescription);
boolean tcpNoDelay = "1".equals(header.get(ConnectionHeaderFields.TCP_NODELAY));
return new TopicDeclaration(new TopicIdentifier(name), topicDescription, new TransportHints(tcpNoDelay));
}

public static TopicDeclaration newFromTopicName(GraphName topicName,
TopicDescription topicDescription) {
return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription);
TopicDescription topicDescription, TransportHints transportHints) {
return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription, transportHints);
}

public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription) {
public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription, TransportHints transportHints) {
Preconditions.checkNotNull(topicIdentifier);
Preconditions.checkNotNull(topicDescription);
this.topicIdentifier = topicIdentifier;
this.topicDescription = topicDescription;

if (transportHints != null) {
this.transportHints = transportHints;
} else {
this.transportHints = new TransportHints();
}
}

public TopicIdentifier getIdentifier() {
Expand All @@ -84,6 +93,7 @@ public ConnectionHeader toConnectionHeader() {
topicDescription.getDefinition());
connectionHeader.addField(ConnectionHeaderFields.MD5_CHECKSUM,
topicDescription.getMd5Checksum());
connectionHeader.addField(ConnectionHeaderFields.TCP_NODELAY, transportHints.getTcpNoDelay() ? "1" : "0");
return connectionHeader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ private void handleSubscriberHandshake(ChannelHandlerContext ctx, MessageEvent e
DefaultPublisher<?> publisher = topicParticipantManager.getPublisher(topicName);
ChannelBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader);
Channel channel = ctx.getChannel();
if (incomingConnectionHeader.hasField(ConnectionHeaderFields.TCP_NODELAY)) {
boolean tcpNoDelay = "1".equals(incomingConnectionHeader.getField(ConnectionHeaderFields.TCP_NODELAY));
channel.getConfig().setOption("tcpNoDelay", tcpNoDelay);
}
ChannelFuture future = channel.write(outgoingBuffer).await();
if (!future.isSuccess()) {
throw new RosRuntimeException(future.getCause());
Expand Down
19 changes: 19 additions & 0 deletions rosjava/src/main/java/org/ros/node/ConnectedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.ros.node.service.ServiceServer;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
import org.ros.node.topic.TransportHints;

import java.net.URI;

Expand Down Expand Up @@ -82,11 +83,29 @@ public interface ConnectedNode extends Node {
*/
<T> Subscriber<T> newSubscriber(GraphName topicName, String messageType);

/**
* @param <T>
* the message type to create the {@link Subscriber} for
* @param topicName
* the topic name to be subscribed to, this will be auto resolved
* @param messageType
* the message data type (e.g. "std_msgs/String")
* @param transportHints
* the transport hints
* @return a {@link Subscriber} for the specified topic
*/
<T> Subscriber<T> newSubscriber(GraphName topicName, String messageType, TransportHints transportHints);

/**
* @see #newSubscriber(GraphName, String)
*/
<T> Subscriber<T> newSubscriber(String topicName, String messageType);

/**
* @see #newSubscriber(GraphName, String, TransportHints)
*/
<T> Subscriber<T> newSubscriber(String topicName, String messageType, TransportHints transportHints);

/**
* Create a new {@link ServiceServer}.
*
Expand Down
33 changes: 33 additions & 0 deletions rosjava/src/main/java/org/ros/node/topic/TransportHints.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.ros.node.topic;

import org.ros.node.ConnectedNode;


/**
* Provides a way of specifying network transport hints to
* {@link ConnectedNode#newSubscriber(org.ros.namespace.GraphName, String, TransportHints)} and
* {@link ConnectedNode#newSubscriber(String, String, TransportHints)}.
*
* @author [email protected] (Stefan Glaser)
*/
public class TransportHints {

private boolean tcpNoDelay;

public TransportHints() {
this(false);
}

public TransportHints(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}

public TransportHints tcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
return this;
}

public boolean getTcpNoDelay() {
return tcpNoDelay;
}
}