Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Entrance result set directory unified optimization #5126

Merged
merged 21 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ object Configuration extends Logging {

val JOB_HISTORY_ADMIN = CommonVars("wds.linkis.jobhistory.admin", "hadoop")

val JOB_HISTORY_DEPARTMENT_ADMIN = CommonVars("wds.linkis.jobhistory.department.admin", "hadoop")

// Only the specified token has permission to call some api
val GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH = "ADMIN-"

Expand Down Expand Up @@ -124,6 +126,11 @@ object Configuration extends Logging {
.exists(username.equalsIgnoreCase)
}

def isDepartmentAdmin(username: String): Boolean = {
val departmentAdminUsers = JOB_HISTORY_DEPARTMENT_ADMIN.getHotValue.split(",")
departmentAdminUsers.exists(username.equalsIgnoreCase)
}

def getJobHistoryAdmin(): Array[String] = {
val adminUsers = GOVERNANCE_STATION_ADMIN.getHotValue.split(",")
val historyAdminUsers = JOB_HISTORY_ADMIN.getHotValue.split(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.linkis.hadoop.common.utils;

import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.hadoop.common.conf.HadoopConf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +35,10 @@
public class KerberosUtils {
private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);

private static boolean kerberosRefreshStarted = false;

private static final Object kerberosRefreshLock = new Object();

private KerberosUtils() {}

private static Configuration createKerberosSecurityConfiguration() {
Expand Down Expand Up @@ -81,40 +87,105 @@ public static boolean runRefreshKerberosLogin() {

public static Long getKerberosRefreshInterval() {
long refreshInterval;
String refreshIntervalString = "86400000";
// defined in linkis-env.sh, if not initialized then the default value is 86400000 ms (1d).
if (System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL") != null) {
refreshIntervalString = System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL");
String refreshIntervalString = "43200";
// defined in linkis-env.sh, if not initialized then the default value is 43200 s (0.5d).
if (System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL") != null) {
refreshIntervalString = System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL");
}
try {
refreshInterval = Long.parseLong(refreshIntervalString);
} catch (NumberFormatException e) {
LOG.error(
"Cannot get time in MS for the given string, "
"Cannot get time in S for the given string, "
+ refreshIntervalString
+ " defaulting to 86400000 ",
+ " defaulting to 43200 ",
e);
refreshInterval = 86400000L;
refreshInterval = 43200;
}
return refreshInterval;
}

public static Integer kinitFailTimesThreshold() {
Integer kinitFailThreshold = 5;
// defined in linkis-env.sh, if not initialized then the default value is 5.
if (System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
if (System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
try {
kinitFailThreshold =
new Integer(System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD"));
kinitFailThreshold = new Integer(System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD"));
} catch (Exception e) {
LOG.error(
"Cannot get integer value from the given string, "
+ System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD")
+ System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD")
+ " defaulting to "
+ kinitFailThreshold,
e);
}
}
return kinitFailThreshold;
}

public static void checkStatus() {
try {
LOG.info("isSecurityEnabled:" + UserGroupInformation.isSecurityEnabled());
LOG.info(
"userAuthenticationMethod:"
+ UserGroupInformation.getLoginUser().getAuthenticationMethod());
UserGroupInformation loginUsr = UserGroupInformation.getLoginUser();
UserGroupInformation curUsr = UserGroupInformation.getCurrentUser();
LOG.info("LoginUser: " + loginUsr);
LOG.info("CurrentUser: " + curUsr);
if (curUsr == null) {
LOG.info("CurrentUser is null");
} else {
LOG.info("CurrentUser is not null");
}
if (loginUsr.getClass() != curUsr.getClass()) {
LOG.info("getClass() is different");
} else {
LOG.info("getClass() is same");
}
if (loginUsr.equals(curUsr)) {
LOG.info("subject is equal");
} else {
LOG.info("subject is not equal");
}
} catch (Exception e) {
LOG.error("UGI error: ", e.getMessage());
}
}

public static void startKerberosRefreshThread() {

if (kerberosRefreshStarted || !HadoopConf.KERBEROS_ENABLE()) {
LOG.warn(
"kerberos refresh thread had start or not kerberos {}", HadoopConf.HDFS_ENABLE_CACHE());
return;
}
synchronized (kerberosRefreshLock) {
if (kerberosRefreshStarted) {
LOG.warn("kerberos refresh thread had start");
return;
}
kerberosRefreshStarted = true;
LOG.info("kerberos Refresh tread started");
Utils.defaultScheduler()
.scheduleAtFixedRate(
() -> {
try {
checkStatus();
if (UserGroupInformation.isLoginKeytabBased()) {
LOG.info("Trying re-login from keytab");
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
} else if (UserGroupInformation.isLoginTicketBased()) {
LOG.info("Trying re-login from ticket cache");
UserGroupInformation.getLoginUser().reloginFromTicketCache();
}
} catch (Exception e) {
LOG.error("Unable to re-login", e);
}
},
getKerberosRefreshInterval(),
getKerberosRefreshInterval(),
TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object HadoopConf {

val HADOOP_ROOT_USER = CommonVars("wds.linkis.hadoop.root.user", "hadoop")

val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false)
val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false).getValue

val KERBEROS_ENABLE_MAP =
CommonVars("linkis.keytab.enable.map", "cluster1=false,cluster2=true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ object HDFSUtils extends Logging {
)
}
.foreach { hdfsFileSystemContainer =>
val locker =
hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel + LOCKER_SUFFIX
val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
locker.intern() synchronized {
if (hdfsFileSystemContainer.canRemove()) {
fileSystemCache.remove(
Expand Down Expand Up @@ -248,7 +247,7 @@ object HDFSUtils extends Logging {

def isKerberosEnabled(label: String): Boolean = {
if (label == null) {
KERBEROS_ENABLE.getValue
KERBEROS_ENABLE
} else {
kerberosValueMapParser(KERBEROS_ENABLE_MAP.getValue).get(label).contains("true")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class KerberosUtilsTest {
public void getKerberosRefreshIntervalTest() {

Long refreshInterval = KerberosUtils.getKerberosRefreshInterval();
Assertions.assertTrue(86400000L == refreshInterval.longValue());
Assertions.assertTrue(43200L == refreshInterval.longValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class HadoopConfTest {
def constTest(): Unit = {

Assertions.assertEquals("hadoop", HadoopConf.HADOOP_ROOT_USER.getValue)
Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE.getValue)
Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE)
Assertions.assertEquals("/appcom/keytab/", HadoopConf.KEYTAB_FILE.getValue)
Assertions.assertEquals("127.0.0.1", HadoopConf.KEYTAB_HOST.getValue)
Assertions.assertFalse(HadoopConf.KEYTAB_HOST_ENABLED.getValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,21 @@ import org.apache.http.conn.{
ConnectTimeoutException,
HttpHostConnectException
}
import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustSelfSignedStrategy}
import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.impl.client.{BasicCookieStore, CloseableHttpClient, HttpClients}
import org.apache.http.impl.client.{
BasicCookieStore,
CloseableHttpClient,
HttpClientBuilder,
HttpClients
}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.ssl.SSLContextBuilder
import org.apache.http.util.EntityUtils

import javax.net.ssl.{HostnameVerifier, SSLContext, SSLSession}

import java.net.URI
import java.nio.charset.Charset
import java.util
Expand All @@ -81,12 +90,26 @@ abstract class AbstractHttpClient(clientConfig: ClientConfig, clientName: String

protected val cookieStore = new BasicCookieStore

protected val httpClient: CloseableHttpClient = HttpClients
private val httpClientBuilder: HttpClientBuilder = HttpClients
.custom()
.setDefaultCookieStore(cookieStore)
.setMaxConnTotal(clientConfig.getMaxConnection)
.setMaxConnPerRoute(clientConfig.getMaxConnection / 2)
.build

protected val httpClient: CloseableHttpClient = if (clientConfig.isSSL) {
val sslContext: SSLContext =
SSLContextBuilder.create.loadTrustMaterial(null, new TrustSelfSignedStrategy).build

val sslConnectionFactory = new SSLConnectionSocketFactory(
sslContext,
new HostnameVerifier() {
override def verify(hostname: String, session: SSLSession) = true
}
)
httpClientBuilder.setSSLSocketFactory(sslConnectionFactory).build()
} else {
httpClientBuilder.build()
}

if (clientConfig.getAuthenticationStrategy != null) {
clientConfig.getAuthenticationStrategy match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ClientConfig private () {
private var maxConnection: Int = 20
private var retryEnabled: Boolean = _
private var retryHandler: RetryHandler = _
private var ssl: Boolean = false

protected[config] def this(
serverUrl: String,
Expand All @@ -59,7 +60,8 @@ class ClientConfig private () {
retryEnabled: Boolean,
retryHandler: RetryHandler,
authTokenKey: String,
authTokenValue: String
authTokenValue: String,
isSSL: Boolean = false
) = {
this()
this.serverUrl = serverUrl
Expand All @@ -78,6 +80,7 @@ class ClientConfig private () {
this.retryHandler = retryHandler
this.authTokenKey = authTokenKey
this.authTokenValue = authTokenValue
this.ssl = isSSL
authenticationStrategy match {
case ab: AbstractAuthenticationStrategy => ab.setClientConfig(this)
case _ =>
Expand Down Expand Up @@ -123,4 +126,6 @@ class ClientConfig private () {

def getRetryHandler: RetryHandler = retryHandler

def isSSL: Boolean = ssl

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class ClientConfigBuilder protected () {
protected var maxConnection: Int = _
protected var retryEnabled: Boolean = true

protected var ssl: Boolean = false

protected var retryHandler: RetryHandler = {
val retryHandler = new DefaultRetryHandler
retryHandler.addRetryException(classOf[LinkisRetryException])
Expand Down Expand Up @@ -112,6 +114,11 @@ class ClientConfigBuilder protected () {
this
}

def setSSL(isSSL: Boolean): this.type = {
this.ssl = isSSL
this
}

def build(): ClientConfig = new ClientConfig(
serverUrl,
discoveryEnabled,
Expand All @@ -126,7 +133,8 @@ class ClientConfigBuilder protected () {
retryEnabled,
retryHandler,
authTokenKey,
authTokenValue
authTokenValue,
ssl
)

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.proxy;

public interface ProxyUserService {

ProxyUserEntity getProxyUserEntity(String proxyUser, String loginUser);
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,8 @@ public static String getTokenUser(HttpServletRequest httpServletRequest) {
}
return tokenUser;
}

public static void printAuditLog(String auditLogMsg) {
LOGGER.info(auditLogMsg);
}
}
Loading
Loading