diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index 3b2ca99bfd0..92d0e877a90 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -17,41 +17,24 @@ package org.apache.rocketmq.proxy; -import com.google.common.collect.Lists; -import io.grpc.protobuf.services.ChannelzService; -import io.grpc.protobuf.services.ProtoReflectionService; import java.util.Date; import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.common.utils.ServiceProvider; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.common.utils.StartAndShutdown; -import org.apache.rocketmq.proxy.config.Configuration; -import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.proxy.grpc.GrpcServer; -import org.apache.rocketmq.proxy.grpc.GrpcServerBuilder; -import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; -import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager; -import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; -import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.proxy.spi.ProxyServer; +import org.apache.rocketmq.proxy.spi.ProxyServerFactory; +import org.apache.rocketmq.proxy.spi.ProxyServerInitializer; import org.apache.rocketmq.srvutil.ServerUtil; public class ProxyStartup { @@ -69,28 +52,14 @@ public static void main(String[] args) { try { // parse argument from command line CommandLineArgument commandLineArgument = parseCommandLineArgument(args); - initConfiguration(commandLineArgument); - // init thread pool monitor for proxy. - initThreadPoolMonitor(); + ProxyServerFactory factory = ServiceProvider.loadClass(ProxyServerFactory.class); + ProxyServer server = factory + .withInitializer(new ProxyServerInitializer(commandLineArgument)) + .withAccessValidators(loadAccessValidators()) + .get(); - ThreadPoolExecutor executor = createServerExecutor(); - - MessagingProcessor messagingProcessor = createMessagingProcessor(); - - List accessValidators = loadAccessValidators(); - // create grpcServer - GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort()) - .addService(createServiceProcessor(messagingProcessor)) - .addService(ChannelzService.newInstance(100)) - .addService(ProtoReflectionService.newInstance()) - .configInterceptor(accessValidators) - .shutdownTime(ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(), TimeUnit.SECONDS) - .build(); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer); - - RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer); + server.getStartAndShutdowns().forEach(PROXY_START_AND_SHUTDOWN::appendStartAndShutdown); // start servers one by one. PROXY_START_AND_SHUTDOWN.start(); @@ -123,17 +92,6 @@ protected static List loadAccessValidators() { return accessValidators; } - protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception { - if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) { - System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath()); - } - ConfigurationManager.initEnv(); - ConfigurationManager.intConfig(); - setConfigFromCommandLineArgument(commandLineArgument); - log.info("Current configuration: " + ConfigurationManager.formatProxyConfig()); - - } - protected static CommandLineArgument parseCommandLineArgument(String[] args) { CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args, buildCommandlineOptions(), new DefaultParser()); @@ -164,94 +122,4 @@ private static Options buildCommandlineOptions() { return options; } - private static void setConfigFromCommandLineArgument(CommandLineArgument commandLineArgument) { - if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) { - ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr()); - } - if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) { - ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath()); - } - if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) { - ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode()); - } - } - - protected static MessagingProcessor createMessagingProcessor() { - String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); - MessagingProcessor messagingProcessor; - - if (ProxyMode.isClusterMode(proxyModeStr)) { - messagingProcessor = DefaultMessagingProcessor.createForClusterMode(); - ProxyMetricsManager proxyMetricsManager = ProxyMetricsManager.initClusterMode(ConfigurationManager.getProxyConfig()); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(proxyMetricsManager); - } else if (ProxyMode.isLocalMode(proxyModeStr)) { - BrokerController brokerController = createBrokerController(); - ProxyMetricsManager.initLocalMode(brokerController.getBrokerMetricsManager(), ConfigurationManager.getProxyConfig()); - StartAndShutdown brokerControllerWrapper = new StartAndShutdown() { - @Override - public void start() throws Exception { - brokerController.start(); - String tip = "The broker[" + brokerController.getBrokerConfig().getBrokerName() + ", " - + brokerController.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); - if (null != brokerController.getBrokerConfig().getNamesrvAddr()) { - tip += " and name server is " + brokerController.getBrokerConfig().getNamesrvAddr(); - } - log.info(tip); - } - - @Override - public void shutdown() throws Exception { - brokerController.shutdown(); - } - }; - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(brokerControllerWrapper); - messagingProcessor = DefaultMessagingProcessor.createForLocalMode(brokerController); - } else { - throw new IllegalArgumentException("try to start grpc server with wrong mode, use 'local' or 'cluster'"); - } - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(messagingProcessor); - return messagingProcessor; - } - - private static GrpcMessagingApplication createServiceProcessor(MessagingProcessor messagingProcessor) { - GrpcMessagingApplication application = GrpcMessagingApplication.create(messagingProcessor); - PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(application); - return application; - } - - protected static BrokerController createBrokerController() { - ProxyConfig config = ConfigurationManager.getProxyConfig(); - List brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath()); - if (StringUtils.isNotBlank(config.getNamesrvAddr())) { - brokerStartupArgList.add("-n"); - brokerStartupArgList.add(config.getNamesrvAddr()); - } - String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]); - return BrokerStartup.createBrokerController(brokerStartupArgs); - } - - public static ThreadPoolExecutor createServerExecutor() { - ProxyConfig config = ConfigurationManager.getProxyConfig(); - int threadPoolNums = config.getGrpcThreadPoolNums(); - int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity(); - ThreadPoolExecutor executor = ThreadPoolMonitor.createAndMonitor( - threadPoolNums, - threadPoolNums, - 1, TimeUnit.MINUTES, - "GrpcRequestExecutorThread", - threadPoolQueueCapacity - ); - PROXY_START_AND_SHUTDOWN.appendShutdown(executor::shutdown); - return executor; - } - - public static void initThreadPoolMonitor() { - ProxyConfig config = ConfigurationManager.getProxyConfig(); - ThreadPoolMonitor.config( - LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME), - LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME), - config.isEnablePrintJstack(), config.getPrintJstackInMillis(), - config.getPrintThreadPoolStatusInMillis()); - ThreadPoolMonitor.init(); - } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java index d5b896fe144..caab6ae60a7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java @@ -14,17 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.rocketmq.proxy.grpc; -import java.util.concurrent.TimeUnit; import io.grpc.Server; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; -public class GrpcServer implements StartAndShutdown { +public class GrpcServer extends ProxyServerBase { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); private final Server server; @@ -39,11 +38,13 @@ protected GrpcServer(Server server, long timeout, TimeUnit unit) { this.unit = unit; } + @Override public void start() throws Exception { this.server.start(); log.info("grpc server start successfully."); } + @Override public void shutdown() { try { this.server.shutdown().awaitTermination(timeout, unit); @@ -52,4 +53,4 @@ public void shutdown() { e.printStackTrace(); } } -} \ No newline at end of file +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index a26ed6b9c90..20a2441e6f4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -16,41 +16,60 @@ */ package org.apache.rocketmq.proxy.grpc; -import io.grpc.BindableService; -import io.grpc.ServerInterceptor; -import io.grpc.ServerServiceDefinition; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel; import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; -import java.util.List; +import io.grpc.protobuf.services.ChannelzService; +import io.grpc.protobuf.services.ProtoReflectionService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor; +import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; +import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; -public class GrpcServerBuilder { +public class GrpcServerBuilder extends ProxyServerFactoryBase { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - protected NettyServerBuilder serverBuilder; - protected long time = 30; - - protected TimeUnit unit = TimeUnit.SECONDS; - - public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port) { - return new GrpcServerBuilder(executor, port); - } - - protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) { - serverBuilder = NettyServerBuilder.forPort(port); + @Override + public GrpcServer build() { + int port = ConfigurationManager.getProxyConfig().getGrpcServerPort(); + // + ThreadPoolExecutor executor = createServerExecutor(); + appendStartAndShutdown(new StartAndShutdown() { + @Override + public void shutdown() throws Exception { + executor.shutdown(); + } + + @Override + public void start() throws Exception { + + } + }); + // + GrpcMessagingApplication application = GrpcMessagingApplication.create(initializer.getMessagingProcessor()); + appendStartAndShutdown(application); + // + NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port); + serverBuilder.addService(application) + .addService(ChannelzService.newInstance(100)) + .addService(ProtoReflectionService.newInstance()) + .intercept(new AuthenticationInterceptor(validators)) + .intercept(new GlobalExceptionInterceptor()) + .intercept(new ContextInterceptor()) + .intercept(new HeaderInterceptor()); serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator()); @@ -73,49 +92,26 @@ protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) { } serverBuilder.maxInboundMessageSize(maxInboundMessageSize) - .maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS); + .maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS); log.info( "grpc server has built. port: {}, tlsKeyPath: {}, tlsCertPath: {}, threadPool: {}, queueCapacity: {}, " + "boosLoop: {}, workerLoop: {}, maxInboundMessageSize: {}", port, bossLoopNum, workerLoopNum, maxInboundMessageSize); - } - public GrpcServerBuilder shutdownTime(long time, TimeUnit unit) { - this.time = time; - this.unit = unit; - return this; + return new GrpcServer(serverBuilder.build(), ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(), TimeUnit.SECONDS); } - public GrpcServerBuilder addService(BindableService service) { - this.serverBuilder.addService(service); - return this; - } - - public GrpcServerBuilder addService(ServerServiceDefinition service) { - this.serverBuilder.addService(service); - return this; - } - - public GrpcServerBuilder appendInterceptor(ServerInterceptor interceptor) { - this.serverBuilder.intercept(interceptor); - return this; - } - - public GrpcServer build() { - return new GrpcServer(this.serverBuilder.build(), time, unit); - } - - public GrpcServerBuilder configInterceptor(List accessValidators) { - // grpc interceptors, including acl, logging etc. - this.serverBuilder - .intercept(new AuthenticationInterceptor(accessValidators)); - - this.serverBuilder - .intercept(new GlobalExceptionInterceptor()) - .intercept(new ContextInterceptor()) - .intercept(new HeaderInterceptor()); - - return this; + private ThreadPoolExecutor createServerExecutor() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + int threadPoolNums = config.getGrpcThreadPoolNums(); + int threadPoolQueueCapacity = config.getGrpcThreadPoolQueueCapacity(); + return ThreadPoolMonitor.createAndMonitor( + threadPoolNums, + threadPoolNums, + 1, TimeUnit.MINUTES, + "GrpcRequestExecutorThread", + threadPoolQueueCapacity + ); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java new file mode 100644 index 00000000000..02a46749f48 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.mixed; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.spi.ProxyServer; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; + +public class MixedProxyServer extends ProxyServerBase { + + private List proxyServers; + + @Override + public void shutdown() throws Exception { + for (ProxyServer proxyServer : proxyServers) { + proxyServer.shutdown(); + } + } + + @Override + public void start() throws Exception { + for (ProxyServer proxyServer : this.proxyServers) { + proxyServer.start(); + } + } + + @Override + public void preShutdown() throws Exception { + for (ProxyServer proxyServer : this.proxyServers) { + proxyServer.preShutdown(); + } + } + + @Override + public List getStartAndShutdowns() { + Set startAndShutdowns = new HashSet<>(); + for (ProxyServer proxyServer : this.proxyServers) { + startAndShutdowns.addAll(proxyServer.getStartAndShutdowns()); + } + return new ArrayList<>(startAndShutdowns); + } + + public void setProxyServers(List proxyServers) { + this.proxyServers = proxyServers; + } + + public List getProxyServers() { + return proxyServers; + } + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java new file mode 100644 index 00000000000..80f89d1d9d9 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/mixed/MixedProxyServerBuilder.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.mixed; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.common.utils.ServiceProvider; +import org.apache.rocketmq.proxy.spi.ProxyServer; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; +import org.apache.rocketmq.proxy.spi.ProxyServerFactory; +import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; + +public class MixedProxyServerBuilder extends ProxyServerFactoryBase { + + @Override + protected ProxyServerBase build() { + String prefix = "META-INF/services/" + ProxyServerFactory.class.getName(); + List factories = ServiceProvider.load(prefix, ProxyServerFactory.class); + List proxyServers = new ArrayList(); + for (ProxyServerFactory psf : factories) { + if (psf instanceof MixedProxyServerBuilder) { + continue; + } + proxyServers.add(psf.withAccessValidators(validators).withInitializer(initializer).get()); + } + MixedProxyServer server = new MixedProxyServer(); + server.setProxyServers(proxyServers); + return server; + } + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index 14c7c0db6fa..12cf7481490 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -31,7 +31,6 @@ import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; -import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -52,6 +51,7 @@ import org.apache.rocketmq.proxy.remoting.pipeline.AuthorizationPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.ContextInitPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.proxy.spi.ProxyServerBase; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingServer; @@ -64,7 +64,7 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { +public class RemotingProtocolServer extends ProxyServerBase implements RemotingProxyOutClient { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final MessagingProcessor messagingProcessor; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java new file mode 100644 index 00000000000..8fc2617492d --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerBuilder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.remoting; + +import org.apache.rocketmq.proxy.spi.ProxyServerBase; +import org.apache.rocketmq.proxy.spi.ProxyServerFactoryBase; + +public class RemotingProtocolServerBuilder extends ProxyServerFactoryBase { + + @Override + protected ProxyServerBase build() { + return new RemotingProtocolServer(initializer.getMessagingProcessor(), validators); + } + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java new file mode 100644 index 00000000000..f2e5ef92761 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.spi; + +import java.util.List; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +public interface ProxyServer extends StartAndShutdown { + + BrokerController getBrokerController(); + + List getStartAndShutdowns(); + + MessagingProcessor getMessagingProcessor(); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java new file mode 100644 index 00000000000..c6a7f5a2d44 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.spi; + +import java.util.List; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +public abstract class ProxyServerBase implements ProxyServer { + + private BrokerController brokerController; + private List startAndShutdowns; + private MessagingProcessor messagingProcessor; + + @Override + public List getStartAndShutdowns() { + return startAndShutdowns; + } + + @Override + public BrokerController getBrokerController() { + return brokerController; + } + + @Override + public MessagingProcessor getMessagingProcessor() { + return messagingProcessor; + } + + public ProxyServerBase setBrokerController(BrokerController brokerController) { + this.brokerController = brokerController; + return this; + } + + public ProxyServerBase setStartAndShutdowns(List startAndShutdowns) { + this.startAndShutdowns = startAndShutdowns; + return this; + } + + public ProxyServerBase setMessagingProcessor(MessagingProcessor messagingProcessor) { + this.messagingProcessor = messagingProcessor; + return this; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java new file mode 100644 index 00000000000..7d5f77e90f1 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.spi; + +import java.util.List; +import java.util.function.Supplier; +import org.apache.rocketmq.acl.AccessValidator; + +public interface ProxyServerFactory extends Supplier { + + ProxyServerFactory withAccessValidators(List accessValidators); + + ProxyServerFactory withInitializer(ProxyServerInitializer proxyServerInitializer); + +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java new file mode 100644 index 00000000000..3b5ce78ff95 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerFactoryBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.spi; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.common.utils.StartAndShutdown; + +public abstract class ProxyServerFactoryBase implements ProxyServerFactory { + + protected List validators; + protected ProxyServerInitializer initializer; + protected final List startAndShutdowns = new ArrayList(); + + @Override + public ProxyServerFactory withAccessValidators(List accessValidators) { + this.validators = accessValidators; + return this; + } + + @Override + public ProxyServerFactory withInitializer(ProxyServerInitializer proxyServerInitializer) { + this.initializer = proxyServerInitializer; + return this; + } + + @Override + public final ProxyServerBase get() { + ProxyServerBase serverBase = build(); + this.initializer.getStartAndShutdowns().forEach(this::appendStartAndShutdown); + serverBase.setBrokerController(this.initializer.getBrokerController()); + serverBase.setStartAndShutdowns(this.startAndShutdowns); + serverBase.setMessagingProcessor(this.initializer.getMessagingProcessor()); + return serverBase; + } + + protected void appendStartAndShutdown(StartAndShutdown sas) { + if (sas != null && !this.startAndShutdowns.contains(sas)) { + this.startAndShutdowns.add(sas); + } + } + + protected abstract ProxyServerBase build(); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java new file mode 100644 index 00000000000..511bef21fe4 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/spi/ProxyServerInitializer.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.spi; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerStartup; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.CommandLineArgument; +import org.apache.rocketmq.proxy.ProxyMode; +import org.apache.rocketmq.proxy.config.Configuration; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager; +import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +public class ProxyServerInitializer { + + private final CommandLineArgument commandLineArgument; + + private final MessagingProcessor messagingProcessor; + private final ProxyMetricsManager proxyMetricsManager; + private final BrokerController brokerController; + private final StartAndShutdown brokerStartAndShutdownWrapper; + private final List startAndShutdowns = new ArrayList(); + + public ProxyServerInitializer(CommandLineArgument commandLineArgument) throws Exception { + this.commandLineArgument = Objects.requireNonNull(commandLineArgument, "commandLineArgument is null"); + initializeConfiguration(); + validateConfiguration(); + initializeThreadPoolMonitor(); + // init services + String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); + if (ProxyMode.isClusterMode(proxyModeStr)) { + this.brokerController = null; + this.brokerStartAndShutdownWrapper = new StartAndShutdown() { + @Override + public void shutdown() throws Exception { + + } + + @Override + public void start() throws Exception { + + } + }; + this.messagingProcessor = DefaultMessagingProcessor.createForClusterMode(); + this.proxyMetricsManager = ProxyMetricsManager.initClusterMode(ConfigurationManager.getProxyConfig()); + } else { + this.brokerController = createBrokerController(); + ProxyMetricsManager.initLocalMode(brokerController.getBrokerMetricsManager(), ConfigurationManager.getProxyConfig()); + this.brokerStartAndShutdownWrapper = new StartAndShutdown() { + @Override + public void start() throws Exception { + brokerController.start(); + } + + @Override + public void shutdown() throws Exception { + brokerController.shutdown(); + } + }; + this.messagingProcessor = DefaultMessagingProcessor.createForLocalMode(brokerController); + this.proxyMetricsManager = null; + } + // + initializeStartAndShutdowns(); + } + + private void initializeConfiguration() throws Exception { + if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) { + System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath()); + } + ConfigurationManager.initEnv(); + ConfigurationManager.intConfig(); + if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) { + ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr()); + } + if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) { + ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath()); + } + if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) { + ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode()); + } + } + + private void validateConfiguration() throws Exception { + String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode(); + if (!ProxyMode.isClusterMode(proxyModeStr) && !ProxyMode.isLocalMode(proxyModeStr)) { + throw new IllegalArgumentException("try to start proxy server with wrong mode, use 'local' or 'cluster'"); + } + } + + private void initializeThreadPoolMonitor() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + ThreadPoolMonitor.config( + LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME), + LoggerFactory.getLogger(LoggerName.PROXY_WATER_MARK_LOGGER_NAME), + config.isEnablePrintJstack(), config.getPrintJstackInMillis(), + config.getPrintThreadPoolStatusInMillis()); + ThreadPoolMonitor.init(); + } + + private BrokerController createBrokerController() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + List brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath()); + if (StringUtils.isNotBlank(config.getNamesrvAddr())) { + brokerStartupArgList.add("-n"); + brokerStartupArgList.add(config.getNamesrvAddr()); + } + String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]); + return BrokerStartup.createBrokerController(brokerStartupArgs); + } + + private void initializeStartAndShutdowns() throws Exception { + this.startAndShutdowns.add(this.brokerStartAndShutdownWrapper); + this.startAndShutdowns.add(this.messagingProcessor); + if (this.proxyMetricsManager != null) { + this.startAndShutdowns.add(this.proxyMetricsManager); + } + } + + public CommandLineArgument getCommandLineArgument() { + return commandLineArgument; + } + + public MessagingProcessor getMessagingProcessor() { + return messagingProcessor; + } + + public ProxyMetricsManager getProxyMetricsManager() { + return proxyMetricsManager; + } + + public BrokerController getBrokerController() { + return brokerController; + } + + public StartAndShutdown getBrokerStartAndShutdownWrapper() { + return brokerStartAndShutdownWrapper; + } + + public List getStartAndShutdowns() { + return startAndShutdowns; + } +} diff --git a/proxy/src/main/resources/META-INF/service/org.apache.rocketmq.proxy.spi.ProxyServerFactory b/proxy/src/main/resources/META-INF/service/org.apache.rocketmq.proxy.spi.ProxyServerFactory new file mode 100644 index 00000000000..2a33699c9be --- /dev/null +++ b/proxy/src/main/resources/META-INF/service/org.apache.rocketmq.proxy.spi.ProxyServerFactory @@ -0,0 +1 @@ +org.apache.rocketmq.proxy.mixed.MixedProxyServerBuilder diff --git a/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory b/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory new file mode 100644 index 00000000000..1db063b17b6 --- /dev/null +++ b/proxy/src/main/resources/META-INF/services/org.apache.rocketmq.proxy.spi.ProxyServerFactory @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.rocketmq.proxy.grpc.GrpcServerBuilder +org.apache.rocketmq.proxy.remoting.RemotingProtocolServerBuilder +org.apache.rocketmq.proxy.mixed.MixedProxyServerBuilder diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java index 58213df4adf..f78de87dec9 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ProxyStartupTest.java @@ -19,7 +19,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import io.opentelemetry.sdk.OpenTelemetrySdk; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; @@ -30,30 +29,17 @@ import java.nio.file.Path; import java.util.Iterator; import java.util.UUID; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.BrokerStartup; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.proxy.config.Configuration; -import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import static org.apache.rocketmq.proxy.config.ConfigurationManager.RMQ_PROXY_HOME; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; public class ProxyStartupTest { @@ -160,121 +146,6 @@ public void testParseAndInitCommandLineArgument() throws Exception { assertEquals(proxyMode, commandLineArgument.getProxyMode()); assertEquals(namesrvAddr, commandLineArgument.getNamesrvAddr()); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(brokerConfigPath, config.getBrokerConfigPath()); - assertEquals(proxyMode, config.getProxyMode()); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - } - - @Test - public void testLocalModeWithNameSrvAddrByProperty() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local" - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); - } - - private void validateBrokerCreateArgsWithNamsrvAddr(ProxyConfig config, String namesrvAddr) { - try (MockedStatic brokerStartupMocked = mockStatic(BrokerStartup.class); - MockedStatic messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) { - ArgumentCaptor args = ArgumentCaptor.forClass(Object.class); - BrokerController brokerControllerMocked = mock(BrokerController.class); - BrokerMetricsManager brokerMetricsManagerMocked = mock(BrokerMetricsManager.class); - Mockito.when(brokerMetricsManagerMocked.getBrokerMeter()).thenReturn(OpenTelemetrySdk.builder().build().getMeter("test")); - Mockito.when(brokerControllerMocked.getBrokerMetricsManager()).thenReturn(brokerMetricsManagerMocked); - brokerStartupMocked.when(() -> BrokerStartup.createBrokerController((String[]) args.capture())) - .thenReturn(brokerControllerMocked); - messagingProcessorMocked.when(() -> DefaultMessagingProcessor.createForLocalMode(any(), any())) - .thenReturn(mock(DefaultMessagingProcessor.class)); - - ProxyStartup.createMessagingProcessor(); - String[] passArgs = (String[]) args.getValue(); - assertEquals("-c", passArgs[0]); - assertEquals(config.getBrokerConfigPath(), passArgs[1]); - assertEquals("-n", passArgs[2]); - assertEquals(namesrvAddr, passArgs[3]); - assertEquals(4, passArgs.length); - } - } - - @Test - public void testLocalModeWithNameSrvAddrByConfigFile() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); - Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByConfigFile", ".json"); - String configData = "{\n" + - " \"namesrvAddr\": \"namesrvAddr\"\n" + - "}"; - Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); - - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local", - "-pc", configFilePath.toAbsolutePath().toString() - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); - } - - @Test - public void testLocalModeWithNameSrvAddrByCommandLine() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); - Path configFilePath = Files.createTempFile("testLocalModeWithNameSrvAddrByCommandLine", ".json"); - String configData = "{\n" + - " \"namesrvAddr\": \"foo\"\n" + - "}"; - Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); - - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local", - "-pc", configFilePath.toAbsolutePath().toString(), - "-n", namesrvAddr - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); - } - - @Test - public void testLocalModeWithAllArgs() throws Exception { - String namesrvAddr = "namesrvAddr"; - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "foo"); - Path configFilePath = Files.createTempFile("testLocalMode", ".json"); - String configData = "{\n" + - " \"namesrvAddr\": \"foo\"\n" + - "}"; - Files.write(configFilePath, configData.getBytes(StandardCharsets.UTF_8)); - Path brokerConfigFilePath = Files.createTempFile("testLocalModeBrokerConfig", ".json"); - - CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { - "-pm", "local", - "-pc", configFilePath.toAbsolutePath().toString(), - "-n", namesrvAddr, - "-bc", brokerConfigFilePath.toAbsolutePath().toString() - }); - ProxyStartup.initConfiguration(commandLineArgument); - - ProxyConfig config = ConfigurationManager.getProxyConfig(); - assertEquals(namesrvAddr, config.getNamesrvAddr()); - assertEquals(brokerConfigFilePath.toAbsolutePath().toString(), config.getBrokerConfigPath()); - - validateBrokerCreateArgsWithNamsrvAddr(config, namesrvAddr); } @Test @@ -282,14 +153,6 @@ public void testClusterMode() throws Exception { CommandLineArgument commandLineArgument = ProxyStartup.parseCommandLineArgument(new String[] { "-pm", "cluster" }); - ProxyStartup.initConfiguration(commandLineArgument); - - try (MockedStatic messagingProcessorMocked = mockStatic(DefaultMessagingProcessor.class)) { - DefaultMessagingProcessor processor = mock(DefaultMessagingProcessor.class); - messagingProcessorMocked.when(DefaultMessagingProcessor::createForClusterMode) - .thenReturn(processor); - - assertSame(processor, ProxyStartup.createMessagingProcessor()); - } + assertEquals("cluster", commandLineArgument.getProxyMode()); } }