diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index f43aac68dc..3fef77703b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -174,9 +174,7 @@ public boolean verifyClusterConnection() { } else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) { try { String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview"; - String result = - HttpClientUtils.httpGetRequest( - restUrl, RequestConfig.custom().setConnectTimeout(2000).build()); + String result = YarnUtils.restRequest(restUrl, 2000); JacksonUtils.read(result, Overview.class); return true; } catch (Exception e) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 1cc9a82f4a..9c1c54b450 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -185,7 +185,7 @@ public void start(Long id) { // 2) setAddress if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) { String address = - YarnUtils.getRMWebAppURL(true) + "/proxy/" + deployResponse.clusterId() + "/"; + YarnUtils.getRMWebAppProxyURL() + "/proxy/" + deployResponse.clusterId() + "/"; flinkCluster.setAddress(address); } else { flinkCluster.setAddress(deployResponse.address()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java index bc6d712e71..9bf47b5e32 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java @@ -17,6 +17,8 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.util.HadoopUtils; +import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.util.EncryptUtils; import org.apache.streampark.console.core.entity.Application; @@ -34,6 +36,12 @@ import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.web.client.RestTemplateBuilder; @@ -43,6 +51,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.http.client.ClientHttpResponse; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.stereotype.Service; import org.springframework.web.client.DefaultResponseErrorHandler; import org.springframework.web.client.RestTemplate; @@ -52,6 +61,8 @@ import java.io.ByteArrayOutputStream; import java.io.InputStream; +import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.Enumeration; @Service @@ -73,6 +84,10 @@ public class ProxyServiceImpl implements ProxyService { private final RestTemplate proxyRestTemplate; + private final boolean hasYarnHttpKerberosAuth; + + private String lastUsername = ""; + public ProxyServiceImpl(RestTemplateBuilder restTemplateBuilder) { this.proxyRestTemplate = restTemplateBuilder @@ -84,6 +99,9 @@ public void handleError(@Nonnull ClientHttpResponse response) { } }) .build(); + + String yarnHttpAuth = SystemPropertyUtils.get("streampark.yarn.http-auth"); + this.hasYarnHttpKerberosAuth = "kerberos".equalsIgnoreCase(yarnHttpAuth); } @Override @@ -117,7 +135,8 @@ public ResponseEntity proxyFlinkUI(HttpServletRequest request, Long appId) th case YARN_SESSION: String yarnURL = YarnUtils.getRMWebAppProxyURL(); url = yarnURL + "/proxy/" + app.getClusterId(); - break; + url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, ""); + return proxyYarnRequest(request, url); case KUBERNETES_NATIVE_APPLICATION: case KUBERNETES_NATIVE_SESSION: String jobManagerUrl = app.getJobManagerUrl(); @@ -143,7 +162,7 @@ public ResponseEntity proxyYarn(HttpServletRequest request, String appId) thr String yarnURL = YarnUtils.getRMWebAppProxyURL(); String url = yarnURL + "/proxy/" + appId + "/"; url += getRequestURL(request).replace("/proxy/yarn/" + appId, ""); - return proxyRequest(request, url); + return proxyYarnRequest(request, url); } @Override @@ -155,7 +174,8 @@ public ResponseEntity proxyJobManager(HttpServletRequest request, Long logId) return proxyRequest(request, url); } - private ResponseEntity proxyRequest(HttpServletRequest request, String url) throws Exception { + private HttpEntity getRequestEntity(HttpServletRequest request, String url, boolean setAuth) + throws Exception { HttpHeaders headers = new HttpHeaders(); Enumeration headerNames = request.getHeaderNames(); while (headerNames.hasMoreElements()) { @@ -163,9 +183,15 @@ private ResponseEntity proxyRequest(HttpServletRequest request, String url) t headers.set(headerName, request.getHeader(headerName)); } - String token = serviceHelper.getAuthorization(); - if (token != null) { - headers.set("Authorization", EncryptUtils.encrypt(token)); + // Ensure the Host header is set correctly. + URI uri = new URI(url); + headers.set("Host", uri.getHost()); + + if (setAuth) { + String token = serviceHelper.getAuthorization(); + if (token != null) { + headers.set("Authorization", EncryptUtils.encrypt(token)); + } } byte[] body = null; @@ -177,12 +203,69 @@ private ResponseEntity proxyRequest(HttpServletRequest request, String url) t } HttpEntity requestEntity = new HttpEntity<>(body, headers); + return requestEntity; + } + + private ResponseEntity proxyRequest(HttpServletRequest request, String url) throws Exception { + HttpEntity requestEntity = getRequestEntity(request, url, true); return proxyRestTemplate.exchange( url, HttpMethod.valueOf(request.getMethod()), requestEntity, byte[].class); } + private ResponseEntity proxyYarnRequest(HttpServletRequest request, String url) + throws Exception { + if (hasYarnHttpKerberosAuth) { + UserGroupInformation ugi = HadoopUtils.getUgi(); + + HttpEntity requestEntity = getRequestEntity(request, url, false); + setRestTemplateCredentials(ugi.getShortUserName()); + + return ugi.doAs( + new PrivilegedExceptionAction>() { + @Override + public ResponseEntity run() throws Exception { + return proxyRestTemplate.exchange( + url, HttpMethod.valueOf(request.getMethod()), requestEntity, byte[].class); + } + }); + } else { + return proxyRequest(request, url); + } + } + private String getRequestURL(HttpServletRequest request) { return request.getRequestURI() + (request.getQueryString() != null ? "?" + request.getQueryString() : ""); } + + private void setRestTemplateCredentials(String username) { + setRestTemplateCredentials(username, null); + } + + /** + * Configures the RestTemplate's HttpClient connector. This method is primarily used to configure + * the HttpClient authentication information and SSL certificate validation policies. + * + * @param username The username for HTTP basic authentication. + * @param password The password for HTTP basic authentication. + */ + private void setRestTemplateCredentials(String username, String password) { + // Check if the username is not null and has changed since the last configuration + if (username != null && !this.lastUsername.equals(username)) { + // Create a new credentials provider + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + // Add the username and password for HTTP basic authentication + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + + // Customize the HttpClient with the credentials provider + CloseableHttpClient httpClient = + HttpClients.custom().setDefaultCredentialsProvider(credentialsProvider).build(); + // Set the HttpClient request factory for the RestTemplate + this.proxyRestTemplate.setRequestFactory( + new HttpComponentsClientHttpRequestFactory(httpClient)); + // Update the last known username + this.lastUsername = username; + } + } }