Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streampark-console): Proxy and Session Issues with YARN HTTP Kerb… #3979

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ public boolean verifyClusterConnection() {
} else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
try {
String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
String result = YarnUtils.restRequest(restUrl, 2000);
JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void start(Long id) {
// 2) setAddress
if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
String address =
YarnUtils.getRMWebAppURL(true) + "/proxy/" + deployResponse.clusterId() + "/";
YarnUtils.getRMWebAppProxyURL() + "/proxy/" + deployResponse.clusterId() + "/";
flinkCluster.setAddress(address);
} else {
flinkCluster.setAddress(deployResponse.address());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.streampark.console.core.entity.Application;
Expand All @@ -34,6 +36,12 @@
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.client.RestTemplateBuilder;
Expand All @@ -43,6 +51,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.RestTemplate;
Expand All @@ -52,6 +61,8 @@

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Enumeration;

@Service
Expand All @@ -73,6 +84,10 @@ public class ProxyServiceImpl implements ProxyService {

private final RestTemplate proxyRestTemplate;

private final boolean hasYarnHttpKerberosAuth;

private String lastUsername = "";

public ProxyServiceImpl(RestTemplateBuilder restTemplateBuilder) {
this.proxyRestTemplate =
restTemplateBuilder
Expand All @@ -84,6 +99,9 @@ public void handleError(@Nonnull ClientHttpResponse response) {
}
})
.build();

String yarnHttpAuth = SystemPropertyUtils.get("streampark.yarn.http-auth");
this.hasYarnHttpKerberosAuth = "kerberos".equalsIgnoreCase(yarnHttpAuth);
}

@Override
Expand Down Expand Up @@ -117,7 +135,8 @@ public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long appId) th
case YARN_SESSION:
String yarnURL = YarnUtils.getRMWebAppProxyURL();
url = yarnURL + "/proxy/" + app.getClusterId();
break;
url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
return proxyYarnRequest(request, url);
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
String jobManagerUrl = app.getJobManagerUrl();
Expand All @@ -143,7 +162,7 @@ public ResponseEntity<?> proxyYarn(HttpServletRequest request, String appId) thr
String yarnURL = YarnUtils.getRMWebAppProxyURL();
String url = yarnURL + "/proxy/" + appId + "/";
url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
return proxyRequest(request, url);
return proxyYarnRequest(request, url);
}

@Override
Expand All @@ -155,17 +174,24 @@ public ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long logId)
return proxyRequest(request, url);
}

private ResponseEntity<?> proxyRequest(HttpServletRequest request, String url) throws Exception {
private HttpEntity<?> getRequestEntity(HttpServletRequest request, String url, boolean setAuth)
throws Exception {
HttpHeaders headers = new HttpHeaders();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.set(headerName, request.getHeader(headerName));
}

String token = serviceHelper.getAuthorization();
if (token != null) {
headers.set("Authorization", EncryptUtils.encrypt(token));
// Ensure the Host header is set correctly.
URI uri = new URI(url);
headers.set("Host", uri.getHost());

if (setAuth) {
String token = serviceHelper.getAuthorization();
if (token != null) {
headers.set("Authorization", EncryptUtils.encrypt(token));
}
}

byte[] body = null;
Expand All @@ -177,12 +203,69 @@ private ResponseEntity<?> proxyRequest(HttpServletRequest request, String url) t
}

HttpEntity<?> requestEntity = new HttpEntity<>(body, headers);
return requestEntity;
}

private ResponseEntity<?> proxyRequest(HttpServletRequest request, String url) throws Exception {
HttpEntity<?> requestEntity = getRequestEntity(request, url, true);
return proxyRestTemplate.exchange(
url, HttpMethod.valueOf(request.getMethod()), requestEntity, byte[].class);
}

private ResponseEntity<?> proxyYarnRequest(HttpServletRequest request, String url)
throws Exception {
if (hasYarnHttpKerberosAuth) {
UserGroupInformation ugi = HadoopUtils.getUgi();

HttpEntity<?> requestEntity = getRequestEntity(request, url, false);
setRestTemplateCredentials(ugi.getShortUserName());

return ugi.doAs(
new PrivilegedExceptionAction<ResponseEntity<?>>() {
@Override
public ResponseEntity<?> run() throws Exception {
return proxyRestTemplate.exchange(
url, HttpMethod.valueOf(request.getMethod()), requestEntity, byte[].class);
}
});
} else {
return proxyRequest(request, url);
}
}

private String getRequestURL(HttpServletRequest request) {
return request.getRequestURI()
+ (request.getQueryString() != null ? "?" + request.getQueryString() : "");
}

private void setRestTemplateCredentials(String username) {
setRestTemplateCredentials(username, null);
}

/**
* Configures the RestTemplate's HttpClient connector. This method is primarily used to configure
* the HttpClient authentication information and SSL certificate validation policies.
*
* @param username The username for HTTP basic authentication.
* @param password The password for HTTP basic authentication.
*/
private void setRestTemplateCredentials(String username, String password) {
// Check if the username is not null and has changed since the last configuration
if (username != null && !this.lastUsername.equals(username)) {
// Create a new credentials provider
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// Add the username and password for HTTP basic authentication
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(username, password));

// Customize the HttpClient with the credentials provider
CloseableHttpClient httpClient =
HttpClients.custom().setDefaultCredentialsProvider(credentialsProvider).build();
// Set the HttpClient request factory for the RestTemplate
this.proxyRestTemplate.setRequestFactory(
new HttpComponentsClientHttpRequestFactory(httpClient));
// Update the last known username
this.lastUsername = username;
}
}
}
Loading