Skip to content

Commit

Permalink
[KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize con…
Browse files Browse the repository at this point in the history
…figurable

# 🔍 Description

Fix #6594.

This PR ports HIVE-26633(apache/hive#3674): Make thrift client maxMessageSize configurable to fix a regression after upgrading Thrift 0.16 in 1.9.0.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6631 from pan3793/thrift-max-size.

Closes #6594

e4841c8 [Cheng Pan] [KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize configurable

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
pan3793 committed Aug 27, 2024
1 parent 2d883e7 commit 11de72f
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.frontend.thrift.binary.ssl.disallowed.protocols | SSLv2,SSLv3 | SSL versions to disallow for Kyuubi thrift binary frontend. | set | 1.7.0 |
| kyuubi.frontend.thrift.binary.ssl.enabled | false | Set this to true for using SSL encryption in thrift binary frontend server. | boolean | 1.7.0 |
| kyuubi.frontend.thrift.binary.ssl.include.ciphersuites || A comma-separated list of include SSL cipher suite names for thrift binary frontend. | seq | 1.7.0 |
| kyuubi.frontend.thrift.client.max.message.size | 1073741824 | Maximum message size in bytes a thrift client will receive. | int | 1.9.3 |
| kyuubi.frontend.thrift.http.bind.host | &lt;undefined&gt; | Hostname or IP of the machine on which to run the thrift frontend service via http protocol. | string | 1.6.0 |
| kyuubi.frontend.thrift.http.bind.port | 10010 | Port of the machine on which to run the thrift frontend service via http protocol. | int | 1.6.0 |
| kyuubi.frontend.thrift.http.compression.enabled | true | Enable thrift http compression via Jetty compression support | boolean | 1.6.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,13 @@ object KyuubiConf {
.version("1.4.0")
.fallbackConf(FRONTEND_MAX_MESSAGE_SIZE)

val FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.frontend.thrift.client.max.message.size")
.doc("Maximum message size in bytes a thrift client will receive.")
.version("1.9.3")
.intConf
.createWithDefault(1 * 1024 * 1024 * 1024) // follow HIVE-26633 to use 1g as default value

val FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.frontend.thrift.http.request.header.size")
.doc("Request header size in bytes, when using HTTP transport mode. Jetty defaults used.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class JdbcConnectionParams {

static final String CONNECT_TIMEOUT = "connectTimeout";
static final String SOCKET_TIMEOUT = "socketTimeout";
static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE = "thrift.client.max.message.size";

// We support ways to specify application name modeled after some existing DBs, since
// there's no standard approach.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.*;
import org.apache.kyuubi.shaded.thrift.TConfiguration;
import org.apache.kyuubi.shaded.thrift.TException;
import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol;
import org.apache.kyuubi.shaded.thrift.transport.THttpClient;
Expand Down Expand Up @@ -419,7 +420,13 @@ private TTransport createHttpTransport() throws SQLException, TTransportExceptio
boolean useSsl = isSslConnection();
// Create an http client from the configs
httpClient = getHttpClient(useSsl);
transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
int maxMessageSize = getMaxMessageSize();
TConfiguration.Builder tConfBuilder = TConfiguration.custom();
if (maxMessageSize > 0) {
tConfBuilder.setMaxMessageSize(maxMessageSize);
}
TConfiguration tConf = tConfBuilder.build();
transport = new THttpClient(tConf, getServerHttpUrl(useSsl), httpClient);
return transport;
}

Expand Down Expand Up @@ -629,7 +636,8 @@ private String getJWTStringFromSession() {
}

/** Create underlying SSL or non-SSL transport */
private TTransport createUnderlyingTransport() throws TTransportException {
private TTransport createUnderlyingTransport() throws TTransportException, SQLException {
int maxMessageSize = getMaxMessageSize();
TTransport transport = null;
// Note: Thrift returns an SSL socket that is already bound to the specified host:port
// Therefore an open called on this would be a no-op later
Expand All @@ -643,19 +651,46 @@ private TTransport createUnderlyingTransport() throws TTransportException {
Utils.getPassword(sessConfMap, JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);

if (sslTrustStore == null || sslTrustStore.isEmpty()) {
transport = ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout);
transport =
ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout, maxMessageSize);
} else {
transport =
ThriftUtils.getSSLSocket(
host, port, connectTimeout, socketTimeout, sslTrustStore, sslTrustStorePassword);
host,
port,
connectTimeout,
socketTimeout,
sslTrustStore,
sslTrustStorePassword,
maxMessageSize);
}
} else {
// get non-SSL socket transport
transport = ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout);
transport =
ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout, maxMessageSize);
}
return transport;
}

private int getMaxMessageSize() throws SQLException {
String maxMessageSize = sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE);
if (maxMessageSize == null) {
return -1;
}

try {
return Integer.parseInt(maxMessageSize);
} catch (Exception e) {
String errFormat =
"Invalid {} configuration of '{}'. Expected an integer specifying number of bytes. "
+ "A configuration of <= 0 uses default max message size.";
String errMsg =
String.format(
errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, maxMessageSize);
throw new SQLException(errMsg, "42000", e);
}
}

/**
* Create transport per the connection options Supported transport options are: - SASL based
* transports over + Kerberos + SSL + non-SSL - Raw (non-SASL) socket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,70 @@
import org.apache.kyuubi.shaded.thrift.transport.TSocket;
import org.apache.kyuubi.shaded.thrift.transport.TTransport;
import org.apache.kyuubi.shaded.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class helps in some aspects of authentication. It creates the proper Thrift classes for the
* given configuration as well as helps with authenticating requests.
*/
public class ThriftUtils {

private static final Logger LOG = LoggerFactory.getLogger(ThriftUtils.class);

/**
* Configure the provided T transport's max message size.
*
* @param transport Transport to configure maxMessage for
* @param maxMessageSize Maximum allowed message size in bytes, less than or equal to 0 means use
* the Thrift library default.
* @return The passed in T transport configured with desired max message size. The same object
* passed in is returned.
*/
public static <T extends TTransport> T configureThriftMaxMessageSize(
T transport, int maxMessageSize) {
if (maxMessageSize > 0) {
if (transport.getConfiguration() == null) {
LOG.warn(
"TTransport {} is returning a null Configuration, Thrift max message size is not getting configured",
transport.getClass().getName());
return transport;
}
transport.getConfiguration().setMaxMessageSize(maxMessageSize);
}
return transport;
}

/**
* Create a TSocket for the provided host and port with specified connectTimeout, loginTimeout and
* maxMessageSize.
*
* @param host Host to connect to.
* @param port Port to connect to.
* @param connectTimeout Socket connect timeout (0 means no timeout).
* @param socketTimeout Socket read/write timeout (0 means no timeout).
* @param maxMessageSize Size in bytes for max allowable Thrift message size, less than or equal
* to 0 results in using the Thrift library default.
* @return TTransport TSocket for host/port
*/
public static TTransport getSocketTransport(
String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
return new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectTimeout);
String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
throws TTransportException {
TConfiguration.Builder tConfBuilder = TConfiguration.custom();
if (maxMessageSize > 0) {
tConfBuilder.setMaxMessageSize(maxMessageSize);
}
TConfiguration tConf = tConfBuilder.build();
return new TSocket(tConf, host, port, socketTimeout, connectTimeout);
}

public static TTransport getSSLSocket(
String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
throws TTransportException {
// The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout);
tSSLSocket.setConnectTimeout(connectTimeout);
return getSSLSocketWithHttps(tSSLSocket);
return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
}

public static TTransport getSSLSocket(
Expand All @@ -49,7 +96,8 @@ public static TTransport getSSLSocket(
int connectTimeout,
int socketTimeout,
String trustStorePath,
String trustStorePassWord)
String trustStorePassWord,
int maxMessageSize)
throws TTransportException {
TSSLTransportFactory.TSSLTransportParameters params =
new TSSLTransportFactory.TSSLTransportParameters();
Expand All @@ -59,16 +107,18 @@ public static TTransport getSSLSocket(
// SSLContext created with the given params
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout, params);
tSSLSocket.setConnectTimeout(connectTimeout);
return getSSLSocketWithHttps(tSSLSocket);
return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
}

// Using endpoint identification algorithm as HTTPS enables us to do
// CNAMEs/subjectAltName verification
private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException {
private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int maxMessageSize)
throws TTransportException {
SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
SSLParameters sslParams = sslSocket.getSSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslSocket.setSSLParameters(sslParams);
return new TSocket(sslSocket);
TSocket tSocket = new TSocket(sslSocket);
return configureThriftMaxMessageSize(tSocket, maxMessageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,10 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
host: String,
port: Int,
socketTimeout: Int,
connectionTimeout: Int): TProtocol = {
val tSocket = new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectionTimeout)
connectionTimeout: Int,
maxMessageSize: Int): TProtocol = {
val tConf = TConfiguration.custom().setMaxMessageSize(maxMessageSize).build()
val tSocket = new TSocket(tConf, host, port, socketTimeout, connectionTimeout)
val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket)
tTransport.open()
new TBinaryProtocol(tTransport)
Expand All @@ -485,15 +487,23 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
conf: KyuubiConf): KyuubiSyncThriftClient = {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt
val maxMessageSize = conf.get(KyuubiConf.FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE)
val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)

val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout)
val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout, maxMessageSize)

val aliveProbeProtocol =
if (aliveProbeEnabled) {
Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout))
Some(createTProtocol(
user,
passwd,
host,
port,
aliveProbeInterval,
loginTimeout,
maxMessageSize))
} else {
None
}
Expand Down

0 comments on commit 11de72f

Please sign in to comment.