Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8854]refactor(proxy): refactor proxy server with spi #8855

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 11 additions & 143 deletions proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,24 @@

package org.apache.rocketmq.proxy;

import com.google.common.collect.Lists;
import io.grpc.protobuf.services.ChannelzService;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.config.Configuration;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.GrpcServer;
import org.apache.rocketmq.proxy.grpc.GrpcServerBuilder;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager;
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.proxy.spi.ProxyServer;
import org.apache.rocketmq.proxy.spi.ProxyServerFactory;
import org.apache.rocketmq.proxy.spi.ProxyServerInitializer;
import org.apache.rocketmq.srvutil.ServerUtil;

public class ProxyStartup {
Expand All @@ -69,28 +52,14 @@ public static void main(String[] args) {
try {
// parse argument from command line
CommandLineArgument commandLineArgument = parseCommandLineArgument(args);
initConfiguration(commandLineArgument);

// init thread pool monitor for proxy.
initThreadPoolMonitor();
ProxyServerFactory factory = ServiceProvider.<ProxyServerFactory>loadClass(ProxyServerFactory.class);
ProxyServer server = factory
.withInitializer(new ProxyServerInitializer(commandLineArgument))
.withAccessValidators(loadAccessValidators())
.get();

ThreadPoolExecutor executor = createServerExecutor();

MessagingProcessor messagingProcessor = createMessagingProcessor();

List<AccessValidator> accessValidators = loadAccessValidators();
// create grpcServer
GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort())
.addService(createServiceProcessor(messagingProcessor))
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
.configInterceptor(accessValidators)
.shutdownTime(ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(), TimeUnit.SECONDS)
.build();
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);

RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators);
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
server.getStartAndShutdowns().forEach(PROXY_START_AND_SHUTDOWN::appendStartAndShutdown);

// start servers one by one.
PROXY_START_AND_SHUTDOWN.start();
Expand Down Expand Up @@ -123,17 +92,6 @@ protected static List<AccessValidator> loadAccessValidators() {
return accessValidators;
}

protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception {
if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) {
System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
}
ConfigurationManager.initEnv();
ConfigurationManager.intConfig();
setConfigFromCommandLineArgument(commandLineArgument);
log.info("Current configuration: " + ConfigurationManager.formatProxyConfig());

}

protected static CommandLineArgument parseCommandLineArgument(String[] args) {
CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args,
buildCommandlineOptions(), new DefaultParser());
Expand Down Expand Up @@ -164,94 +122,4 @@ private static Options buildCommandlineOptions() {
return options;
}

private static void setConfigFromCommandLineArgument(CommandLineArgument commandLineArgument) {
if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) {
ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr());
}
if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) {
ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath());
}
if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) {
ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode());
}
}

protected static MessagingProcessor createMessagingProcessor() {
String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode();
MessagingProcessor messagingProcessor;

if (ProxyMode.isClusterMode(proxyModeStr)) {
messagingProcessor = DefaultMessagingProcessor.createForClusterMode();
ProxyMetricsManager proxyMetricsManager = ProxyMetricsManager.initClusterMode(ConfigurationManager.getProxyConfig());
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(proxyMetricsManager);
} else if (ProxyMode.isLocalMode(proxyModeStr)) {
BrokerController brokerController = createBrokerController();
ProxyMetricsManager.initLocalMode(brokerController.getBrokerMetricsManager(), ConfigurationManager.getProxyConfig());
StartAndShutdown brokerControllerWrapper = new StartAndShutdown() {
@Override
public void start() throws Exception {
brokerController.start();
String tip = "The broker[" + brokerController.getBrokerConfig().getBrokerName() + ", "
+ brokerController.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != brokerController.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + brokerController.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
}

@Override
public void shutdown() throws Exception {
brokerController.shutdown();
}
};
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(brokerControllerWrapper);
messagingProcessor = DefaultMessagingProcessor.createForLocalMode(brokerController);
} else {
throw new IllegalArgumentException("try to start grpc server with wrong mode, use 'local' or 'cluster'");
}
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(messagingProcessor);
return messagingProcessor;
}

private static GrpcMessagingApplication createServiceProcessor(MessagingProcessor messagingProcessor) {
GrpcMessagingApplication application = GrpcMessagingApplication.create(messagingProcessor);
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(application);
return application;
}

protected static BrokerController createBrokerController() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
List<String> brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath());
if (StringUtils.isNotBlank(config.getNamesrvAddr())) {
brokerStartupArgList.add("-n");
brokerStartupArgList.add(config.getNamesrvAddr());
}
String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]);
return BrokerStartup.createBrokerController(brokerStartupArgs);
}

public static ThreadPoolExecutor createServerExecutor() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
int threadPoolNums = config.getGrpcThreadPoolNums();
int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity();
ThreadPoolExecutor executor = ThreadPoolMonitor.createAndMonitor(
threadPoolNums,
threadPoolNums,
1, TimeUnit.MINUTES,
"GrpcRequestExecutorThread",
threadPoolQueueCapacity
);
PROXY_START_AND_SHUTDOWN.appendShutdown(executor::shutdown);
return executor;
}

public static void initThreadPoolMonitor() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
ThreadPoolMonitor.config(
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME),
LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME),
config.isEnablePrintJstack(), config.getPrintJstackInMillis(),
config.getPrintThreadPoolStatusInMillis());
ThreadPoolMonitor.init();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.grpc;

import java.util.concurrent.TimeUnit;
import io.grpc.Server;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.spi.ProxyServerBase;

public class GrpcServer implements StartAndShutdown {
public class GrpcServer extends ProxyServerBase {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

private final Server server;
Expand All @@ -39,11 +38,13 @@ protected GrpcServer(Server server, long timeout, TimeUnit unit) {
this.unit = unit;
}

@Override
public void start() throws Exception {
this.server.start();
log.info("grpc server start successfully.");
}

@Override
public void shutdown() {
try {
this.server.shutdown().awaitTermination(timeout, unit);
Expand All @@ -52,4 +53,4 @@ public void shutdown() {
e.printStackTrace();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,60 @@
*/
package org.apache.rocketmq.proxy.grpc;

import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.List;
import io.grpc.protobuf.services.ChannelzService;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase;

public class GrpcServerBuilder {
public class GrpcServerBuilder extends ProxyServerFactoryBase {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected NettyServerBuilder serverBuilder;

protected long time = 30;

protected TimeUnit unit = TimeUnit.SECONDS;

public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port) {
return new GrpcServerBuilder(executor, port);
}

protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
serverBuilder = NettyServerBuilder.forPort(port);
@Override
public GrpcServer build() {
int port = ConfigurationManager.getProxyConfig().getGrpcServerPort();
//
ThreadPoolExecutor executor = createServerExecutor();
appendStartAndShutdown(new StartAndShutdown() {
@Override
public void shutdown() throws Exception {
executor.shutdown();
}

@Override
public void start() throws Exception {

}
});
//
GrpcMessagingApplication application = GrpcMessagingApplication.create(initializer.getMessagingProcessor());
appendStartAndShutdown(application);
//
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port);
serverBuilder.addService(application)
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
.intercept(new AuthenticationInterceptor(validators))
.intercept(new GlobalExceptionInterceptor())
.intercept(new ContextInterceptor())
.intercept(new HeaderInterceptor());

serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());

Expand All @@ -73,49 +92,26 @@ protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
}

serverBuilder.maxInboundMessageSize(maxInboundMessageSize)
.maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);
.maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);

log.info(
"grpc server has built. port: {}, tlsKeyPath: {}, tlsCertPath: {}, threadPool: {}, queueCapacity: {}, "
+ "boosLoop: {}, workerLoop: {}, maxInboundMessageSize: {}",
port, bossLoopNum, workerLoopNum, maxInboundMessageSize);
}

public GrpcServerBuilder shutdownTime(long time, TimeUnit unit) {
this.time = time;
this.unit = unit;
return this;
return new GrpcServer(serverBuilder.build(), ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(), TimeUnit.SECONDS);
}

public GrpcServerBuilder addService(BindableService service) {
this.serverBuilder.addService(service);
return this;
}

public GrpcServerBuilder addService(ServerServiceDefinition service) {
this.serverBuilder.addService(service);
return this;
}

public GrpcServerBuilder appendInterceptor(ServerInterceptor interceptor) {
this.serverBuilder.intercept(interceptor);
return this;
}

public GrpcServer build() {
return new GrpcServer(this.serverBuilder.build(), time, unit);
}

public GrpcServerBuilder configInterceptor(List<AccessValidator> accessValidators) {
// grpc interceptors, including acl, logging etc.
this.serverBuilder
.intercept(new AuthenticationInterceptor(accessValidators));

this.serverBuilder
.intercept(new GlobalExceptionInterceptor())
.intercept(new ContextInterceptor())
.intercept(new HeaderInterceptor());

return this;
private ThreadPoolExecutor createServerExecutor() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
int threadPoolNums = config.getGrpcThreadPoolNums();
int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity();
return ThreadPoolMonitor.createAndMonitor(
threadPoolNums,
threadPoolNums,
1, TimeUnit.MINUTES,
"GrpcRequestExecutorThread",
threadPoolQueueCapacity
);
}
}
Loading