Skip to content

Commit

Permalink
fix(streampark-console): Proxy and Session Issues with YARN HTTP Kerb…
Browse files Browse the repository at this point in the history
…eros Authentication
  • Loading branch information
Tidra committed Aug 20, 2024
1 parent 37402fd commit f8102c3
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ public boolean verifyClusterConnection() {
} else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
try {
String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
String result = YarnUtils.restRequest(restUrl, 2000);
JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void start(Long id) {
// 2) setAddress
if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
String address =
YarnUtils.getRMWebAppURL(true) + "/proxy/" + deployResponse.clusterId() + "/";
YarnUtils.getRMWebAppProxyURL() + "/proxy/" + deployResponse.clusterId() + "/";
flinkCluster.setAddress(address);
} else {
flinkCluster.setAddress(deployResponse.address());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.streampark.console.core.entity.Application;
Expand All @@ -34,6 +36,12 @@
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.client.RestTemplateBuilder;
Expand All @@ -43,6 +51,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.RestTemplate;
Expand All @@ -52,6 +61,8 @@

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Enumeration;

@Service
Expand All @@ -73,6 +84,10 @@ public class ProxyServiceImpl implements ProxyService {

private final RestTemplate proxyRestTemplate;

private final boolean hasYarnHttpKerberosAuth;

private String lastUsername = "";

public ProxyServiceImpl(RestTemplateBuilder restTemplateBuilder) {
this.proxyRestTemplate =
restTemplateBuilder
Expand All @@ -84,6 +99,9 @@ public void handleError(@Nonnull ClientHttpResponse response) {
}
})
.build();

String yarnHttpAuth = SystemPropertyUtils.get("streampark.yarn.http-auth");
this.hasYarnHttpKerberosAuth = "kerberos".equalsIgnoreCase(yarnHttpAuth);
}

@Override
Expand Down Expand Up @@ -117,7 +135,8 @@ public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long appId) th
case YARN_SESSION:
String yarnURL = YarnUtils.getRMWebAppProxyURL();
url = yarnURL + "/proxy/" + app.getClusterId();
break;
url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
return proxyYarnRequest(request, url);
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
String jobManagerUrl = app.getJobManagerUrl();
Expand All @@ -143,7 +162,7 @@ public ResponseEntity<?> proxyYarn(HttpServletRequest request, String appId) thr
String yarnURL = YarnUtils.getRMWebAppProxyURL();
String url = yarnURL + "/proxy/" + appId + "/";
url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
return proxyRequest(request, url);
return proxyYarnRequest(request, url);
}

@Override
Expand All @@ -155,17 +174,24 @@ public ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long logId)
return proxyRequest(request, url);
}

private ResponseEntity<?> proxyRequest(HttpServletRequest request, String url) throws Exception {
private HttpEntity<?> getRequestEntity(HttpServletRequest request, String url, boolean setAuth)
throws Exception {
HttpHeaders headers = new HttpHeaders();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.set(headerName, request.getHeader(headerName));
}

String token = serviceHelper.getAuthorization();
if (token != null) {
headers.set("Authorization", EncryptUtils.encrypt(token));
// Ensure the Host header is set correctly.
URI uri = new URI(url);
headers.set("Host", uri.getHost());

if (setAuth) {
String token = serviceHelper.getAuthorization();
if (token != null) {
headers.set("Authorization", EncryptUtils.encrypt(token));
}
}

byte[] body = null;
Expand All @@ -177,12 +203,69 @@ private ResponseEntity<?> proxyRequest(HttpServletRequest request, String url) t
}

HttpEntity<?> requestEntity = new HttpEntity<>(body, headers);
return requestEntity;
}

private ResponseEntity<?> proxyRequest(HttpServletRequest request, String url) throws Exception {
HttpEntity<?> requestEntity = getRequestEntity(request, url, true);
return proxyRestTemplate.exchange(
url, HttpMethod.valueOf(request.getMethod()), requestEntity, byte[].class);
}

private ResponseEntity<?> proxyYarnRequest(HttpServletRequest request, String url)
throws Exception {
if (hasYarnHttpKerberosAuth) {
UserGroupInformation ugi = HadoopUtils.getUgi();

HttpEntity<?> requestEntity = getRequestEntity(request, url, false);
setRestTemplateCredentials(ugi.getShortUserName());

return ugi.doAs(
new PrivilegedExceptionAction<ResponseEntity<?>>() {
@Override
public ResponseEntity<?> run() throws Exception {
return proxyRestTemplate.exchange(
url, HttpMethod.valueOf(request.getMethod()), requestEntity, byte[].class);
}
});
} else {
return proxyRequest(request, url);
}
}

private String getRequestURL(HttpServletRequest request) {
return request.getRequestURI()
+ (request.getQueryString() != null ? "?" + request.getQueryString() : "");
}

private void setRestTemplateCredentials(String username) {
setRestTemplateCredentials(username, null);
}

/**
* Configures the RestTemplate's HttpClient connector. This method is primarily used to configure
* the HttpClient authentication information and SSL certificate validation policies.
*
* @param username The username for HTTP basic authentication.
* @param password The password for HTTP basic authentication.
*/
private void setRestTemplateCredentials(String username, String password) {
// Check if the username is not null and has changed since the last configuration
if (username != null && !this.lastUsername.equals(username)) {
// Create a new credentials provider
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// Add the username and password for HTTP basic authentication
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(username, password));

// Customize the HttpClient with the credentials provider
CloseableHttpClient httpClient =
HttpClients.custom().setDefaultCredentialsProvider(credentialsProvider).build();
// Set the HttpClient request factory for the RestTemplate
this.proxyRestTemplate.setRequestFactory(
new HttpComponentsClientHttpRequestFactory(httpClient));
// Update the last known username
this.lastUsername = username;
}
}
}

0 comments on commit f8102c3

Please sign in to comment.