Skip to content

Commit

Permalink
use feign requestInterceptor to fixed server
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Apr 14, 2024
1 parent ced29f2 commit 2eefd7c
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import feign.RequestInterceptor;
import feign.RequestTemplate;

@Component
public class FeignRequestInterceptor implements RequestInterceptor {

@Override
Expand All @@ -55,8 +54,6 @@ public void apply(RequestTemplate requestTemplate) {
requestTemplate.body(),
org.apache.linkis.common.conf.Configuration.BDP_ENCODING().getValue());
Message message = BDPJettyServerHelper.gson().fromJson(body, Message.class);
headers.put(
RpcConstant.FIXED_INSTANCE, Arrays.asList(BaseRPCSender.getFixedInstanceInfo(message)));
requestTemplate.headers(headers);
} catch (UnsupportedEncodingException e) {
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

@Configuration
@LoadBalancerClients(defaultConfiguration = {LinkisLoadBalancerClientConfiguration.class})

public class LinkisLoadBalancerClientConfiguration {
@Bean
public ReactorLoadBalancer<ServiceInstance> customLoadBalancer(
Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,27 @@ import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.WarnException
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.protocol.Protocol
import org.apache.linkis.rpc.conf.DynamicFeignClient
import org.apache.linkis.rpc.conf.RPCConfiguration.{
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX,
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX,
BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY
}
import org.apache.linkis.rpc.constant.RpcConstant
import org.apache.linkis.rpc.interceptor._
import org.apache.linkis.rpc.transform.{RPCConsumer, RPCProduct}
import org.apache.linkis.server.Message

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.server.conf.ServerConfiguration

import java.util

import scala.concurrent.duration.Duration
import scala.runtime.BoxedUnit

import feign.{Feign, Retryer}
import feign.slf4j.Slf4jLogger

private[rpc] class BaseRPCSender extends Sender with Logging {
private var name: String = _
private var rpc: RPCReceiveRemote = _
private var dynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = _

protected def getRPCInterceptors: Array[RPCInterceptor] = Array.empty

Expand All @@ -68,21 +67,18 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
rpc
}

private def getDynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = {
if (dynamicFeignClient == null) this synchronized {
if (dynamicFeignClient == null) dynamicFeignClient = new DynamicFeignClient()
}
dynamicFeignClient
}

private[rpc] def getApplicationName = name

def getSenderInstance(): String = {
null
}
protected def doBuilder(builder: Feign.Builder): Unit =
builder.retryer(Retryer.NEVER_RETRY)

protected def newRPC: RPCReceiveRemote = {
getDynamicFeignClient.getFeignClient(classOf[RPCReceiveRemote], name)
val builder = Feign.builder.logger(new Slf4jLogger()).logLevel(feign.Logger.Level.FULL)
doBuilder(builder)
var url = if (name.startsWith("http://")) name else "http://" + name
if (url.endsWith("/")) url = url.substring(0, url.length - 1)
url += ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
builder.target(classOf[RPCReceiveRemote], url)
}

private def execute(message: Any)(op: => Any): Any = message match {
Expand All @@ -94,9 +90,6 @@ private[rpc] class BaseRPCSender extends Sender with Logging {

override def ask(message: Any): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
if (StringUtils.isNotBlank(getSenderInstance())) {
BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
}
BaseRPCSender.addInstanceInfo(msg.getData)
val response = getRPC.receiveAndReply(msg)
RPCConsumer.getRPCConsumer.toObject(response)
Expand All @@ -105,19 +98,13 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
override def ask(message: Any, timeout: Duration): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
msg.data("duration", timeout.toMillis)
if (StringUtils.isNotBlank(getSenderInstance())) {
BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
}
BaseRPCSender.addInstanceInfo(msg.getData)
val response = getRPC.receiveAndReplyInMills(msg)
RPCConsumer.getRPCConsumer.toObject(response)
}

private def sendIt(message: Any, op: Message => Message): Unit = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
if (StringUtils.isNotBlank(getSenderInstance())) {
BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
}
BaseRPCSender.addInstanceInfo(msg.getData)
RPCConsumer.getRPCConsumer.toObject(op(msg)) match {
case w: WarnException => logger.warn("RPC requests an alarm!(RPC请求出现告警!)", w)
Expand Down Expand Up @@ -188,16 +175,4 @@ private[rpc] object BaseRPCSender extends Logging {
ServiceInstance(name, instance)
}

def addFixedInstanceInfo(map: util.Map[String, Object], fixedInstance: String): Unit = {
map.put(RpcConstant.FIXED_INSTANCE, fixedInstance)
}

def getFixedInstanceInfo(message: Message): String = {
if (null != message && null != message.getData) {
message.getData.getOrDefault(RpcConstant.FIXED_INSTANCE, null).asInstanceOf[String]
} else {
null
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,40 @@

package org.apache.linkis.rpc.sender

import feign._
import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.rpc.{BaseRPCSender, RPCMessageEvent, RPCSpringBeanCache}
import org.apache.linkis.rpc.interceptor.{RPCInterceptor, ServiceInstanceRPCInterceptorChain}

import org.apache.commons.lang3.StringUtils

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.core.env.Environment
import org.apache.linkis.rpc.{BaseRPCSender, RPCMessageEvent, RPCSpringBeanCache}

private[rpc] class SpringMVCRPCSender private[rpc] (
private[rpc] val serviceInstance: ServiceInstance
) extends BaseRPCSender(serviceInstance.getApplicationName) {

import SpringCloudFeignConfigurationCache._

override protected def getRPCInterceptors: Array[RPCInterceptor] =
RPCSpringBeanCache.getRPCInterceptors

override protected def createRPCInterceptorChain() =
new ServiceInstanceRPCInterceptorChain(0, getRPCInterceptors, serviceInstance)

@Autowired
private var env: Environment = _
override protected def doBuilder(builder: Feign.Builder): Unit = {
if (serviceInstance != null && StringUtils.isNotBlank(serviceInstance.getInstance)) {
builder.requestInterceptor(new RequestInterceptor() {
def apply(template: RequestTemplate ): Unit = {
// Fixed instance
template.target(s"http://${serviceInstance.getInstance}")
}
})
}
super.doBuilder(builder)
builder
.contract(getContract)
.encoder(getEncoder)
.decoder(getDecoder)
.requestInterceptor(getRPCTicketIdRequestInterceptor)
}

/**
* Deliver is an asynchronous method that requests the target microservice asynchronously,
Expand Down Expand Up @@ -66,12 +79,4 @@ private[rpc] class SpringMVCRPCSender private[rpc] (
s"RPCSender(${serviceInstance.getApplicationName})"
} else s"RPCSender($getApplicationName, ${serviceInstance.getInstance})"

override def getSenderInstance(): String = {
if (null != serviceInstance) {
serviceInstance.getInstance
} else {
null
}
}

}

0 comments on commit 2eefd7c

Please sign in to comment.