From e2937e354b4c015c92d1144336f0b674285003fe Mon Sep 17 00:00:00 2001 From: tiboun Date: Mon, 24 Jan 2022 12:01:52 +0100 Subject: [PATCH] feat: add proxy authentication to schema registry Add new property for schema registry (proxy.user and proxy.password) in order to authenticate with proxy requiring authentication. The authentication rely on Authenticator which is globally set and configuration of the RestService will be a replace of the globally set configuration. http proxy with authentication has been disabled by default since JDK8.111 and in order to reach https, system property must be set to empty : - jdk.http.auth.tunneling.disabledSchemes - jdk.http.auth.proxying.disabledSchemes System.setProperty("jdk.http.auth.tunneling.disabledSchemes", ""); System.setProperty("jdk.http.auth.proxying.disabledSchemes", ""); --- .../client/SchemaRegistryClientConfig.java | 2 ++ .../client/rest/RestService.java | 28 +++++++++++++++++-- .../AbstractKafkaSchemaSerDeConfig.java | 18 +++++++++++- 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java index 2a47f8a9f0b..d32b4c0d085 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java @@ -38,6 +38,8 @@ public class SchemaRegistryClientConfig { public static final String PROXY_HOST = "proxy.host"; public static final String PROXY_PORT = "proxy.port"; + public static final String PROXY_USER = "proxy.user"; + public static final String PROXY_PASSWORD = "proxy.password"; public static final String MISSING_CACHE_SIZE_CONFIG = "missing.cache.size"; public static final String MISSING_ID_CACHE_TTL_CONFIG = "missing.id.cache.ttl.sec"; diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java index 96897ebfd69..441374543d2 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java @@ -40,16 +40,19 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.InetSocketAddress; import java.net.Proxy; +import java.net.Authenticator; +import java.net.HttpURLConnection; import java.net.URL; +import java.net.InetSocketAddress; +import java.net.PasswordAuthentication; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -218,6 +221,27 @@ public void configure(Map configs) { if (isValidProxyConfig(proxyHost, proxyPort)) { setProxy(proxyHost, proxyPort); + String user = Optional.ofNullable((String) configs.get(SchemaRegistryClientConfig.PROXY_USER)).orElse(""); + String password = Optional.ofNullable((String) configs.get(SchemaRegistryClientConfig.PROXY_PASSWORD)).orElse(""); + PasswordAuthentication passwordAuthentication = new PasswordAuthentication(user, password.toCharArray()); + String hostLowercase = proxyHost.toLowerCase(); + // override authenticator setting since probability to have only + // one corporate proxy setting to reach internet is very high + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + if (getRequestorType() == RequestorType.PROXY) { + if (hostLowercase.equals(getRequestingHost().toLowerCase())) { + if (proxyPort == getRequestingPort()) { + return passwordAuthentication; + } + } + } + // Don't have access to Authenticator.getDefault() in order to relay the Authenticator Provider + // to another definition because it's available from 1.9+ + return null; + } + }); } } diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java index e30c1c0144d..468b6a73443 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java @@ -162,6 +162,18 @@ public class AbstractKafkaSchemaSerDeConfig extends AbstractConfig { "The port number of the proxy server that will be used to connect to the schema registry " + "instances."; + public static final String PROXY_USER = SchemaRegistryClientConfig.PROXY_USER; + public static final String PROXY_USER_DEFAULT = ""; + public static final String PROXY_USER_DOC = + "The username used to authenticate with the proxy that will be used to connect to the schema " + + " registry instances."; + + public static final String PROXY_PASSWORD = SchemaRegistryClientConfig.PROXY_PASSWORD; + public static final String PROXY_PASSWORD_DEFAULT = ""; + public static final String PROXY_PASSWORD_DOC = + "The password used to authenticate with the proxy that will be used to connect to the schema " + + " registry instances."; + public static ConfigDef baseConfigDef() { ConfigDef configDef = new ConfigDef() .define(SCHEMA_REGISTRY_URL_CONFIG, Type.LIST, @@ -201,7 +213,11 @@ public static ConfigDef baseConfigDef() { .define(PROXY_HOST, Type.STRING, PROXY_HOST_DEFAULT, Importance.LOW, PROXY_HOST_DOC) .define(PROXY_PORT, Type.INT, PROXY_PORT_DEFAULT, - Importance.LOW, PROXY_PORT_DOC); + Importance.LOW, PROXY_PORT_DOC) + .define(PROXY_USER, Type.INT, PROXY_USER_DEFAULT, + Importance.LOW, PROXY_USER_DOC) + .define(PROXY_PASSWORD, Type.INT, PROXY_PASSWORD_DEFAULT, + Importance.LOW, PROXY_PASSWORD_DOC); SchemaRegistryClientConfig.withClientSslSupport( configDef, SchemaRegistryClientConfig.CLIENT_NAMESPACE); return configDef;