Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable http proxy support for mqtt3 #1010

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public class MqttConnectOptions {
private int maxReconnectDelay = 128000;
private boolean skipPortDuringHandshake = false;
private Map<String, String> customWebSocketHeaders = null;
private String httpProxyHost;
private int httpProxyPort;
private String httpProxyUser;
private String httpProxyPassword;

// Client Operation Parameters
private int executorServiceTimeout = 1; // How long to wait in seconds when terminating the executor service.
Expand Down Expand Up @@ -716,6 +720,38 @@ public void setCustomWebSocketHeaders(Map<String, String> props) {
public Map<String, String> getCustomWebSocketHeaders() {
return customWebSocketHeaders;
}
public String getHttpProxyHost() {
return httpProxyHost;
}

public void setHttpProxyHost(String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
}

public int getHttpProxyPort() {
return httpProxyPort;
}

public void setHttpProxyPort(int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
}

public String getHttpProxyUser() {
return httpProxyUser;
}

public void setHttpProxyUser(String httpProxyUser) {
this.httpProxyUser = httpProxyUser;
}

public String getHttpProxyPassword() {
return httpProxyPassword;
}

public void setHttpProxyPassword(String httpProxyPassword) {
this.httpProxyPassword = httpProxyPassword;
}


public String toString() {
return Debug.dumpProperties(getDebug(), "Connection options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public class MqttException extends Exception {
* state. New up a new client to continue.
*/
public static final short REASON_CODE_CLIENT_CLOSED = 32111;

/** Unable to connect to server though http proxy*/
public static final short REASON_CODE_HTTP_PROXY_CONNECT_ERROR = 32112;

/**
* A request has been made to use a token that is already associated with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
netModule.setEnabledCiphers(enabledCiphers);
}
}

netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());
return netModule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;

import javax.net.SocketFactory;

import javax.net.ssl.SSLSocketFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.internal.websocket.Base64;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;

Expand All @@ -42,6 +45,11 @@ public class TCPNetworkModule implements NetworkModule {
private int port;
private int conTimeout;

private String httpProxyHost;
private int httpProxyPort;
private String httpProxyUser;
private String httpProxyPassword;

/**
* Constructs a new TCPNetworkModule using the specified host and
* port. The supplied SocketFactory is used to supply the network
Expand All @@ -66,19 +74,139 @@ public TCPNetworkModule(SocketFactory factory, String host, int port, String res
*/
public void start() throws IOException, MqttException {
final String methodName = "start";

// @TRACE 252=connect to host {0} port {1} timeout {2}
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)});

if(httpProxyHost != null) {
Socket tunnel;

/*
* Set up a socket to do tunneling through the proxy.
* Start it off as a regular socket, then layer SSL
* over the top of it.
*/
try {
tunnel = new Socket(httpProxyHost, httpProxyPort);
doTunnelHandshake(tunnel, host, port, httpProxyUser, httpProxyPassword);
}catch (IOException ex) {
//@TRACE 251=Failed to create TCP tunnel
log.fine(CLASS_NAME,methodName,"251",null,ex);
throw new MqttException(MqttException.REASON_CODE_HTTP_PROXY_CONNECT_ERROR, ex);
}

try {
socket = ((SSLSocketFactory) factory).createSocket(tunnel, host, port, true);
} catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME,methodName,"250",null,ex);
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
}

} else {
try {
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout * 1000);
socket.setSoTimeout(1000);
} catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME, methodName, "250", null, ex);
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
}
}
}

/*
* Tell our tunnel where we want to CONNECT, and look for the
* right reply. Throw IOException if anything goes wrong.
*/
private void doTunnelHandshake(Socket tunnel, String host, int port, String proxyUser, String proxyPassword)
throws IOException {
OutputStream out = tunnel.getOutputStream();

String msg;
if(proxyUser != null) {
String proxyUserPass = String.format("%s:%s", proxyUser, proxyPassword);
msg = "CONNECT " + host + ":" + port + " HTTP/1.1\n"
+ "Proxy-Authorization: Basic " + Base64.encode(proxyUserPass) + "\n"
+ "User-Agent: Paho MQTT3 Client\n"
+ "Proxy-Connection: Keep-Alive"
+ "\r\n\r\n";
} else {
msg = "CONNECT " + host + ":" + port + " HTTP/1.0\n"
+ "User-Agent: "
+ "User-Agent: Paho MQTT3 Client\n"
+ "Proxy-Connection: Keep-Alive"
+ "\r\n\r\n";
}

byte b[];
try {
// @TRACE 252=connect to host {0} port {1} timeout {2}
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)});
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
socket.setSoTimeout(1000);
/*
* We really do want ASCII7 -- the http protocol doesn't change
* with locale.
*/
b = msg.getBytes("ASCII7");
} catch (UnsupportedEncodingException ignored) {
/*
* If ASCII7 isn't there, something serious is wrong, but
* Paranoia Is Good (tm)
*/
b = msg.getBytes();
}
catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME,methodName,"250",null,ex);
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
out.write(b);
out.flush();

/*
* We need to store the reply so we can create a detailed
* error message to the user.
*/
byte reply[] = new byte[200];
int replyLen = 0;
int newlinesSeen = 0;
boolean headerDone = false; /* Done on first newline */

InputStream in = tunnel.getInputStream();
boolean error = false;

while (newlinesSeen < 2) {
int i = in.read();
if (i < 0) {
throw new IOException("Unexpected EOF from proxy");
}
if (i == '\n') {
headerDone = true;
++newlinesSeen;
} else if (i != '\r') {
newlinesSeen = 0;
if (!headerDone && replyLen < reply.length) {
reply[replyLen++] = (byte) i;
}
}
}

/*
* Converting the byte array to a string is slightly wasteful
* in the case where the connection was successful, but it's
* insignificant compared to the network overhead.
*/
String replyStr;
try {
replyStr = new String(reply, 0, replyLen, "ASCII7");
} catch (UnsupportedEncodingException ignored) {
replyStr = new String(reply, 0, replyLen);
}

/* We asked for HTTP/1.0, so we should get that back */
// if (!replyStr.startsWith("HTTP/1.0 200")) {
if(replyStr.indexOf("200") == -1) {
throw new IOException("Unable to tunnel through "
+ tunnel.getInetAddress().getHostName() + ":" + tunnel.getPort()
+ ". Proxy returns \"" + replyStr + "\"");
}

/* tunneling Handshake was successful! */
}

public InputStream getInputStream() throws IOException {
Expand Down Expand Up @@ -110,4 +238,33 @@ public void setConnectTimeout(int timeout) {
public String getServerURI() {
return "tcp://" + host + ":" + port;
}
}

public void setHttpProxyHost(String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
}

public void setHttpProxyPort(int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
}

public void setHttpProxyUser(String httpProxyUser) {
this.httpProxyUser = httpProxyUser;
}

public void setHttpProxyPassword(String httpProxyPassword) {
this.httpProxyPassword = httpProxyPassword;
}

public void configHttpProxy(String proxyHost, int proxyPort, String user, String password) {
if(proxyHost != null && proxyHost.length() > 0 &&
proxyPort > 0){
setHttpProxyHost(proxyHost);
setHttpProxyPort(proxyPort);
if(user != null && user.length() > 0 &&
password != null && password.length() > 0) {
setHttpProxyUser(user);
setHttpProxyPassword(password);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
}
TCPNetworkModule networkModule = new TCPNetworkModule(factory, host, port, clientId);
networkModule.setConnectTimeout(options.getConnectionTimeout());

networkModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());
return networkModule;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
WebSocketNetworkModule netModule = new WebSocketNetworkModule(factory, brokerUri.toString(), host, port,
clientId, options.getCustomWebSocketHeaders(), options.isSkipPortDuringHandshake());
netModule.setConnectTimeout(options.getConnectionTimeout());

netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());
return netModule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}

netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());

return netModule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
223=failed: in closed state
224=failed: not disconnected
250=Failed to create TCP socket
251=Failed to create TCP tunnel
252=connect to host {0} port {1} timeout {2}
260=setEnabledCiphers ciphers={0}
300=key={0} message={1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@
32200=Persistence already in use
32201=Token already in use
32202=Too many publishes in progress
32203=Unable to connect to Http Proxy