Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring cloud version upgraded, loadBalancer replaced ribbon #4958

Merged
merged 44 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
c7c86d2
[WIP] Spring cloud version upgraded and loadBalancer replaced ribbon
ChengJie1053 Nov 7, 2023
c604486
Formatted code
ChengJie1053 Nov 7, 2023
f094821
Modifying the spring cloud gateway
ChengJie1053 Nov 10, 2023
fa3011f
optimize code
peacewong Nov 16, 2023
7935cde
Modifying linkis rpc
ChengJie1053 Nov 20, 2023
edf866d
Optimized code linkis rpc module
ChengJie1053 Nov 21, 2023
1278847
Modify known-dependencies.txt
ChengJie1053 Nov 21, 2023
bf755dd
Modify known-dependencies.txt
ChengJie1053 Nov 21, 2023
b5b5f1a
Modify RPCReceiveRemote
ChengJie1053 Nov 21, 2023
d73b4d3
Add EurekaClientCacheManualRefresher
ChengJie1053 Nov 22, 2023
3070308
Move the loadbalancer module
ChengJie1053 Nov 22, 2023
0c1c4ed
optimize ServiceInstancePriorityLoadBalancer
peacewong Nov 22, 2023
95b655e
Formatted code
ChengJie1053 Nov 23, 2023
e264fb3
Modify application-engineconn.yml
ChengJie1053 Nov 24, 2023
faf973d
Merge branch 'master' into master-spring
ChengJie1053 Nov 24, 2023
e534f23
Temporarily hide the gateway test case
ChengJie1053 Nov 24, 2023
8036be6
Temporarily hide test case
ChengJie1053 Nov 24, 2023
8181aa1
Unhide test cases
ChengJie1053 Nov 24, 2023
9871501
Modify application.yml
ChengJie1053 Nov 24, 2023
0014c04
Temporarily hide the gateway test case
ChengJie1053 Nov 24, 2023
4aa8664
Temporarily hide the gateway test case
ChengJie1053 Nov 24, 2023
c4ceada
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 24, 2023
3976ae3
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 25, 2023
2a7ef56
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 25, 2023
3aab001
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 25, 2023
812f49b
add application.yml
ChengJie1053 Nov 25, 2023
86a499d
Modify configuration
ChengJie1053 Nov 25, 2023
5f37f9c
Modify configuration
ChengJie1053 Nov 25, 2023
e2f0ab4
Modify configuration
ChengJie1053 Nov 25, 2023
9388af1
Modify configuration
ChengJie1053 Nov 25, 2023
62f76af
Modify configuration
ChengJie1053 Nov 25, 2023
1ba7db8
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 25, 2023
9a5eb86
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 27, 2023
a8d6684
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 27, 2023
c41f3f1
Modify configmap-linkis-config.yaml
ChengJie1053 Nov 27, 2023
4e5b39e
Merge branch 'master' into master-spring
ChengJie1053 Jan 25, 2024
e74f0ab
Merge the latest code and resolve conflicts
ChengJie1053 Jan 25, 2024
805ddbf
using commons-lang3 ExceptionUtils instead of commons-lang and remove…
ChengJie1053 Jan 26, 2024
22cf6ea
Eureka refresh is called via reflection instead
ChengJie1053 Jan 27, 2024
d027088
common-lang relies on eureka-client, replacing common-lang with commo…
ChengJie1053 Jan 27, 2024
e65fb90
Compatible with nacos
ChengJie1053 Feb 26, 2024
f505c25
Compatible with nacos
ChengJie1053 Mar 7, 2024
cff6add
add nacos refresher
ChengJie1053 Mar 13, 2024
e6cb65b
Code optimization
ChengJie1053 Mar 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions linkis-commons/linkis-module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-openfeign-core</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
Expand All @@ -66,6 +67,7 @@
@SpringBootApplication(scanBasePackages = {"org.apache.linkis", "com.webank.wedatasphere"})
@EnableDiscoveryClient
@RefreshScope
@EnableFeignClients
public class DataWorkCloudApplication extends SpringBootServletInitializer {
private static final Log logger = LogFactory.getLog(DataWorkCloudApplication.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.swagger;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.servlet.mvc.method.RequestMappingInfoHandlerMapping;

import java.lang.reflect.Field;
import java.util.List;
import java.util.stream.Collectors;

import springfox.documentation.spring.web.plugins.WebFluxRequestHandlerProvider;
import springfox.documentation.spring.web.plugins.WebMvcRequestHandlerProvider;

@Configuration
public class SwaggerBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof WebMvcRequestHandlerProvider
|| bean instanceof WebFluxRequestHandlerProvider) {
List<RequestMappingInfoHandlerMapping> handlerMappings = getHandlerMappings(bean);
customizeSpringfoxHandlerMappings(handlerMappings);
}
return bean;
}

private <T extends RequestMappingInfoHandlerMapping> void customizeSpringfoxHandlerMappings(
List<T> mappings) {
List<T> copy =
mappings.stream()
.filter(mapping -> mapping.getPatternParser() == null)
.collect(Collectors.toList());
mappings.clear();
mappings.addAll(copy);
}

@SuppressWarnings("unchecked")
private List<RequestMappingInfoHandlerMapping> getHandlerMappings(Object bean) {
try {
Field field = ReflectionUtils.findField(bean.getClass(), "handlerMappings");
field.setAccessible(true);
return (List<RequestMappingInfoHandlerMapping>) field.get(bean);
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.rpc.conf;

import org.apache.linkis.DataWorkCloudApplication;

import org.apache.commons.lang3.StringUtils;

import org.springframework.cloud.openfeign.FeignClientBuilder;
import org.springframework.cloud.openfeign.FeignClientFactoryBean;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

@Component
public class DynamicFeignClient<T> {

private FeignClientBuilder feignClientBuilder;

private final ConcurrentHashMap<String, T> CACHE_BEAN = new ConcurrentHashMap();

public DynamicFeignClient() {
this.feignClientBuilder =
new FeignClientBuilder(DataWorkCloudApplication.getApplicationContext());
}

public T getFeignClient(final Class<T> type, final String serviceName) {
return getFeignClient(type, serviceName, null);
}

public T getFeignClient(
final Class<T> type, final Class<?> fallbackFactory, final String serviceName) {
return getFeignClient(type, fallbackFactory, serviceName, null);
}

public T getFeignClient(
final Class<T> type,
final FeignClientFactoryBean clientFactoryBean,
final String serviceName) {
return getFeignClient(type, clientFactoryBean, serviceName, null);
}

public T getFeignClient(final Class<T> type, String serviceName, final String serviceUrl) {
String k = serviceName;
if (StringUtils.isNotEmpty(serviceUrl)) {
k = serviceUrl;
}
return CACHE_BEAN.computeIfAbsent(
k,
(t) -> {
FeignClientBuilder.Builder<T> builder =
this.feignClientBuilder.forType(type, serviceName);
if (StringUtils.isNotEmpty(serviceUrl)) {
builder.url(serviceUrl);
}
return builder.build();
});
}

public T getFeignClient(
final Class<T> type,
final Class<?> fallbackFactory,
final String serviceName,
final String serviceUrl) {
String k = serviceName;
if (StringUtils.isNotEmpty(serviceUrl)) {
k = serviceUrl;
}
return CACHE_BEAN.computeIfAbsent(
k,
(t) -> {
FeignClientFactoryBean feignClientFactoryBean = new FeignClientFactoryBean();
feignClientFactoryBean.setFallbackFactory(fallbackFactory);
FeignClientBuilder.Builder<T> builder =
this.feignClientBuilder.forType(type, feignClientFactoryBean, serviceName);
if (StringUtils.isNotEmpty(serviceUrl)) {
builder.url(serviceUrl);
}
return builder.build();
});
}

public T getFeignClient(
final Class<T> type,
final FeignClientFactoryBean clientFactoryBean,
final String serviceName,
final String serviceUrl) {
String k = serviceName;
if (StringUtils.isNotEmpty(serviceUrl)) {
k = serviceUrl;
}
return CACHE_BEAN.computeIfAbsent(
k,
(t) -> {
FeignClientBuilder.Builder<T> builder =
this.feignClientBuilder.forType(type, clientFactoryBean, serviceName);
if (StringUtils.isNotEmpty(serviceUrl)) {
builder.url(serviceUrl);
}
return builder.build();
});
}

private T getFromCache(final String serviceName, final String serviceUrl) {
if (StringUtils.isNotEmpty(serviceUrl)) {
return CACHE_BEAN.get(serviceUrl);
} else {
return CACHE_BEAN.get(serviceName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.rpc.conf;

import org.apache.commons.lang.exception.ExceptionUtils;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.TimedSupervisorTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
public class EurekaClientCacheManualRefresher {
jackxu2011 marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger logger =
LoggerFactory.getLogger(EurekaClientCacheManualRefresher.class);
private final AtomicBoolean isRefreshing = new AtomicBoolean(false);
private final ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
private final String cacheRefreshTaskField = "cacheRefreshTask";
private TimedSupervisorTask cacheRefreshTask;

private long lastRefreshMillis = 0;
private final Duration refreshIntervalDuration = Duration.ofSeconds(3);

@Autowired private BeanFactory beanFactory;

public void refreshOnExceptions(Exception e, List<Class<? extends Exception>> clazzs) {
if (null == clazzs || clazzs.size() == 0) {
throw new IllegalArgumentException();
}

if (clazzs.stream()
.anyMatch(
clazz -> clazz.isInstance(e) || clazz.isInstance(ExceptionUtils.getRootCause(e)))) {
jackxu2011 marked this conversation as resolved.
Show resolved Hide resolved
refresh();
}
}

public void refresh() {
jackxu2011 marked this conversation as resolved.
Show resolved Hide resolved
if (isRefreshing.compareAndSet(false, true)) {
refreshExecutor.execute(
() -> {
try {
if (System.currentTimeMillis()
<= lastRefreshMillis + refreshIntervalDuration.toMillis()) {
logger.warn(
"Not manually refresh eureka client cache as refresh interval was not exceeded:{}",
refreshIntervalDuration.getSeconds());
return;
}

if (null == cacheRefreshTask) {
Field field =
ReflectionUtils.findField(DiscoveryClient.class, cacheRefreshTaskField);
if (null != field) {
ReflectionUtils.makeAccessible(field);
DiscoveryClient discoveryClient = beanFactory.getBean(DiscoveryClient.class);
cacheRefreshTask =
(TimedSupervisorTask) ReflectionUtils.getField(field, discoveryClient);
}
}

if (null == cacheRefreshTask) {
logger.error(
"Field ({}) not found in class '{}'",
cacheRefreshTaskField,
DiscoveryClient.class.getSimpleName());
return;
}

lastRefreshMillis = System.currentTimeMillis();
cacheRefreshTask.run();
logger.info(
"Manually refresh eureka client cache completed(DiscoveryClient.cacheRefreshTask#run())");
} catch (Exception e) {
logger.error("An exception occurred when manually refresh eureka client cache", e);
} finally {
isRefreshing.set(false);
}
});
} else {
logger.warn(
"Not manually refresh eureka client cache as another thread is refreshing it already");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.rpc.conf;

import org.apache.linkis.rpc.BaseRPCSender;
import org.apache.linkis.rpc.constant.RpcConstant;
import org.apache.linkis.server.BDPJettyServerHelper;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.security.SSOUtils$;
import org.apache.linkis.server.security.SecurityFilter$;

import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import feign.RequestInterceptor;
import feign.RequestTemplate;

@Component
public class FeignRequestInterceptor implements RequestInterceptor {

@Override
public void apply(RequestTemplate requestTemplate) {
Map<String, Collection<String>> headers = new HashMap<>(requestTemplate.headers());
headers.put(
RpcConstant.LINKIS_LOAD_BALANCER_TYPE,
Arrays.asList(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC));
Tuple2<String, String> userTicketKV =
SSOUtils$.MODULE$.getUserTicketKV(SecurityFilter$.MODULE$.OTHER_SYSTEM_IGNORE_UM_USER());
headers.put(userTicketKV._1, Arrays.asList(userTicketKV._2));
try {
String body =
new String(
requestTemplate.body(),
org.apache.linkis.common.conf.Configuration.BDP_ENCODING().getValue());
Message message = BDPJettyServerHelper.gson().fromJson(body, Message.class);
headers.put(
RpcConstant.FIXED_INSTANCE, Arrays.asList(BaseRPCSender.getFixedInstanceInfo(message)));
requestTemplate.headers(headers);
} catch (UnsupportedEncodingException e) {
}
}
}
Loading
Loading