From c63801095c7d263d2ac8d04a08e8a79861ad99fa Mon Sep 17 00:00:00 2001 From: matthew Date: Mon, 19 Aug 2024 11:21:39 +0800 Subject: [PATCH] =?UTF-8?q?Fix=20the=20abnormal=20startup=20of=20the=20raf?= =?UTF-8?q?t=20cluster=EF=BC=88#10874=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../registry/api/RegistryClient.java | 26 +++++--- .../plugin/registry/raft/RaftRegistry.java | 40 +++++------- .../raft/RaftRegistryAutoConfiguration.java | 20 +++--- ...erClient.java => IRaftRegistryClient.java} | 2 +- ...terClient.java => RaftRegistryClient.java} | 26 ++++---- .../raft/manage/RaftSubscribeDataManager.java | 65 +++++++++---------- ...terServer.java => RaftRegistryServer.java} | 14 ++-- .../raft/RaftRegistryTestCase.java | 4 +- 8 files changed, 101 insertions(+), 96 deletions(-) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/{IRaftRegisterClient.java => IRaftRegistryClient.java} (98%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/{RaftRegisterClient.java => RaftRegistryClient.java} (91%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/{RaftRegisterServer.java => RaftRegistryServer.java} (88%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/{register => registry}/raft/RaftRegistryTestCase.java (98%) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java index 53ae5f9449a4a..856aa9daff3cd 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java @@ -41,6 +41,8 @@ import java.util.Map; import java.util.Set; +import javax.annotation.PostConstruct; + import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -59,14 +61,22 @@ public class RegistryClient { public RegistryClient(Registry registry) { this.registry = registry; - if (!registry.exists(RegistryNodeType.MASTER.getRegistryPath())) { - registry.put(RegistryNodeType.MASTER.getRegistryPath(), EMPTY, false); - } - if (!registry.exists(RegistryNodeType.WORKER.getRegistryPath())) { - registry.put(RegistryNodeType.WORKER.getRegistryPath(), EMPTY, false); - } - if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath())) { - registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false); + } + + @PostConstruct + public void initializeRegistryPaths() { + try { + if (!registry.exists(RegistryNodeType.MASTER.getRegistryPath())) { + registry.put(RegistryNodeType.MASTER.getRegistryPath(), EMPTY, false); + } + if (!registry.exists(RegistryNodeType.WORKER.getRegistryPath())) { + registry.put(RegistryNodeType.WORKER.getRegistryPath(), EMPTY, false); + } + if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath())) { + registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false); + } + } catch (Exception e) { + log.error("Failed to initialize registry paths", e); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java index c4700b45b9358..f416cacaf00c3 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java @@ -20,8 +20,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.plugin.registry.raft.client.IRaftRegisterClient; -import org.apache.dolphinscheduler.plugin.registry.raft.client.RaftRegisterClient; +import org.apache.dolphinscheduler.plugin.registry.raft.client.IRaftRegistryClient; +import org.apache.dolphinscheduler.plugin.registry.raft.client.RaftRegistryClient; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.Registry; import org.apache.dolphinscheduler.registry.api.RegistryException; @@ -37,21 +37,19 @@ public class RaftRegistry implements Registry { private static final long RECONNECT_WAIT_TIME_MS = 50L; - private final IRaftRegisterClient raftRegisterClient; + private final IRaftRegistryClient raftRegistryClient; public RaftRegistry(RaftRegistryProperties raftRegistryProperties) { - this.raftRegisterClient = new RaftRegisterClient(raftRegistryProperties); + this.raftRegistryClient = new RaftRegistryClient(raftRegistryProperties); } @Override public void start() { - log.info("starting raft registry..."); - raftRegisterClient.start(); - log.info("raft registry started successfully"); + raftRegistryClient.start(); } @Override public boolean isConnected() { - return raftRegisterClient.isConnectivity(); + return raftRegistryClient.isConnectivity(); } @Override @@ -61,7 +59,7 @@ public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryExcept long endTimeMillis = timeout.toMillis() > 0 ? startTimeMillis + timeout.toMillis() : Long.MAX_VALUE; while (System.currentTimeMillis() < endTimeMillis) { - if (raftRegisterClient.isConnectivity()) { + if (raftRegistryClient.isConnectivity()) { return; } } @@ -75,68 +73,66 @@ public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryExcept public void subscribe(String path, SubscribeListener listener) { checkNotNull(path); checkNotNull(listener); - raftRegisterClient.subscribeRaftRegistryDataChange(path, listener); + raftRegistryClient.subscribeRaftRegistryDataChange(path, listener); } @Override public void addConnectionStateListener(ConnectionListener listener) { checkNotNull(listener); - raftRegisterClient.subscribeConnectionStateChange(listener); + raftRegistryClient.subscribeConnectionStateChange(listener); } @Override public String get(String key) { checkNotNull(key); - return raftRegisterClient.getRegistryDataByKey(key); + return raftRegistryClient.getRegistryDataByKey(key); } @Override public void put(String key, String value, boolean deleteOnDisconnect) { checkNotNull(key); - raftRegisterClient.putRegistryData(key, value, deleteOnDisconnect); + raftRegistryClient.putRegistryData(key, value, deleteOnDisconnect); } @Override public void delete(String key) { checkNotNull(key); - raftRegisterClient.deleteRegistryDataByKey(key); + raftRegistryClient.deleteRegistryDataByKey(key); } @Override public Collection children(String key) { checkNotNull(key); - return raftRegisterClient.getRegistryDataChildren(key); + return raftRegistryClient.getRegistryDataChildren(key); } @Override public boolean exists(String key) { checkNotNull(key); - return raftRegisterClient.existRaftRegistryDataKey(key); + return raftRegistryClient.existRaftRegistryDataKey(key); } @Override public boolean acquireLock(String key) { checkNotNull(key); - return raftRegisterClient.acquireRaftRegistryLock(key); + return raftRegistryClient.acquireRaftRegistryLock(key); } @Override public boolean acquireLock(String key, long timeout) { checkNotNull(key); - return raftRegisterClient.acquireRaftRegistryLock(key, timeout); + return raftRegistryClient.acquireRaftRegistryLock(key, timeout); } @Override public boolean releaseLock(String key) { checkNotNull(key); - return raftRegisterClient.releaseRaftRegistryLock(key); + return raftRegistryClient.releaseRaftRegistryLock(key); } @Override public void close() { - log.info("closing raft registry..."); - raftRegisterClient.close(); - log.info("raft registry closed"); + raftRegistryClient.close(); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryAutoConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryAutoConfiguration.java index 9051aef9387b5..647c665d5f673 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryAutoConfiguration.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryAutoConfiguration.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.plugin.registry.raft; -import org.apache.dolphinscheduler.plugin.registry.raft.server.RaftRegisterServer; +import org.apache.dolphinscheduler.plugin.registry.raft.server.RaftRegistryServer; import lombok.extern.slf4j.Slf4j; @@ -34,21 +34,21 @@ public class RaftRegistryAutoConfiguration { public RaftRegistryAutoConfiguration() { - log.info("Load RaftRegisterAutoConfiguration"); + log.info("Load RaftRegistryAutoConfiguration"); } @Bean @ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "master") - public RaftRegisterServer raftRegisterServer(RaftRegistryProperties raftRegistryProperties) { - RaftRegisterServer raftRegisterServer = new RaftRegisterServer(raftRegistryProperties); - raftRegisterServer.start(); - return raftRegisterServer; + public RaftRegistryServer raftRegistryServer(RaftRegistryProperties raftRegistryProperties) { + RaftRegistryServer raftRegistryServer = new RaftRegistryServer(raftRegistryProperties); + raftRegistryServer.start(); + return raftRegistryServer; } @Bean - @DependsOn("raftRegisterServer") + @DependsOn("raftRegistryServer") @ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "master") - public RaftRegistry masterRegisterClient(RaftRegistryProperties raftRegistryProperties) { + public RaftRegistry masterRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) { RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties); raftRegistry.start(); return raftRegistry; @@ -56,7 +56,7 @@ public RaftRegistry masterRegisterClient(RaftRegistryProperties raftRegistryProp @Bean @ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "worker") - public RaftRegistry workerRegisterClient(RaftRegistryProperties raftRegistryProperties) { + public RaftRegistry workerRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) { RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties); raftRegistry.start(); return raftRegistry; @@ -64,7 +64,7 @@ public RaftRegistry workerRegisterClient(RaftRegistryProperties raftRegistryProp @Bean @ConditionalOnProperty(prefix = "registry", name = "module", havingValue = "api") - public RaftRegistry apiRegisterClient(RaftRegistryProperties raftRegistryProperties) { + public RaftRegistry apiRaftRegistryClient(RaftRegistryProperties raftRegistryProperties) { RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties); raftRegistry.start(); return raftRegistry; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegisterClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegistryClient.java similarity index 98% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegisterClient.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegistryClient.java index 365dbdccc17cf..3a6fd29d39d3d 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegisterClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegistryClient.java @@ -22,7 +22,7 @@ import java.util.Collection; -public interface IRaftRegisterClient extends AutoCloseable { +public interface IRaftRegistryClient extends AutoCloseable { /** * Start the raft registry client. Once started, the client will connect to the raft registry server and then it can be used. diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegisterClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegistryClient.java similarity index 91% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegisterClient.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegistryClient.java index 7a0d6755b2a58..525f192b85fab 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegisterClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegistryClient.java @@ -52,7 +52,7 @@ import com.alipay.sofa.jraft.rhea.storage.KVEntry; @Slf4j -public class RaftRegisterClient implements IRaftRegisterClient { +public class RaftRegistryClient implements IRaftRegistryClient { private final RheaKVStore rheaKvStore; private final RaftRegistryProperties raftRegistryProperties; @@ -61,7 +61,7 @@ public class RaftRegisterClient implements IRaftRegisterClient { private final IRaftLockManager raftLockManager; private volatile boolean started; private static final String MASTER_MODULE = "master"; - public RaftRegisterClient(RaftRegistryProperties raftRegistryProperties) { + public RaftRegistryClient(RaftRegistryProperties raftRegistryProperties) { this.raftRegistryProperties = raftRegistryProperties; this.rheaKvStore = new DefaultRheaKVStore(); this.raftConnectionStateManager = new RaftConnectionStateManager(raftRegistryProperties); @@ -90,14 +90,16 @@ private void initRheakv() { @Override public void start() { if (this.started) { - log.info("RaftRegisterClient is already started"); + log.info("RaftRegistryClient is already started"); return; } + log.info("starting raft client registry..."); if (raftRegistryProperties.getModule().equals(MASTER_MODULE)) { raftSubscribeDataManager.start(); } raftConnectionStateManager.start(); this.started = true; + log.info("raft client registry started successfully"); } @Override @@ -154,7 +156,7 @@ public Collection getRegistryDataChildren(String key) { } List kvEntries = rheaKvStore.bScan(basePath + Constants.SINGLE_SLASH, basePath + Constants.SINGLE_SLASH + Constants.RAFT_END_KEY); - return getRegisterList(kvEntries); + return getRegistryList(kvEntries); } @Override @@ -162,17 +164,17 @@ public boolean existRaftRegistryDataKey(String key) { return rheaKvStore.bContainsKey(key); } - private Collection getRegisterList(List kvEntries) { + private Collection getRegistryList(List kvEntries) { if (kvEntries == null || kvEntries.isEmpty()) { return new ArrayList<>(); } - List registerList = new ArrayList<>(); + List registryList = new ArrayList<>(); for (KVEntry kvEntry : kvEntries) { String entryKey = readUtf8(kvEntry.getKey()); String childKey = entryKey.substring(entryKey.lastIndexOf(Constants.SINGLE_SLASH) + 1); - registerList.add(childKey); + registryList.add(childKey); } - return registerList; + return registryList; } @Override @@ -182,7 +184,7 @@ public boolean acquireRaftRegistryLock(String lockKey) { } catch (Exception ex) { log.error("acquire raft registry lock error", ex); raftLockManager.releaseLock(lockKey); - throw new RegistryException("acquire raft register lock error: " + lockKey, ex); + throw new RegistryException("acquire raft registry lock error: " + lockKey, ex); } } @@ -193,7 +195,7 @@ public boolean acquireRaftRegistryLock(String lockKey, long timeout) { } catch (Exception ex) { log.error("acquire raft registry lock error", ex); raftLockManager.releaseLock(lockKey); - throw new RegistryException("acquire raft register lock error: " + lockKey, ex); + throw new RegistryException("acquire raft registry lock error: " + lockKey, ex); } } @@ -209,11 +211,11 @@ public boolean releaseRaftRegistryLock(String lockKey) { @Override public void close() { - log.info("start close raft register client"); + log.info("ready to close raft registry client"); if (rheaKvStore != null) { rheaKvStore.shutdown(); } this.started = false; - log.info("closed raft register client"); + log.info("closed raft registry client"); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java index 2c5ce5f1429e7..2a35f96f99c1d 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -87,21 +88,24 @@ private class SubscribeCheckTask implements Runnable { public void run() { try { final Map newDataMap = getNodeDataMap(); - if (dataSubScribeMap.isEmpty() || newDataMap.isEmpty()) { + if (dataSubScribeMap.isEmpty() || newDataMap == null || newDataMap.isEmpty()) { return; } // find the different final Map addedData = new HashMap<>(); final Map deletedData = new HashMap<>(); final Map updatedData = new HashMap<>(); - for (Map.Entry entry : newDataMap.entrySet()) { + + Iterator> iterator = newDataMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); final NodeItem oldData = oldDataMap.get(entry.getKey()); if (oldData == null) { addedData.put(entry.getKey(), entry.getValue().getNodeValue()); } else if (NodeType.EPHEMERAL.getName().equals(entry.getValue().getNodeType()) && isUnHealthy(entry.getValue().getNodeValue())) { kvStore.bDelete(entry.getKey()); - newDataMap.remove(entry.getKey(), entry.getValue()); + iterator.remove(); } else if (!oldData.getNodeValue().equals(entry.getValue().getNodeValue())) { updatedData.put(entry.getKey(), entry.getValue().getNodeValue()); } @@ -155,40 +159,35 @@ private void triggerListener(Map addedData, Map } private Map getNodeDataMap() { - final Map nodeItemMap = new HashMap<>(); - final List entryList = kvStore.bScan(RegistryNodeType.ALL_SERVERS.getRegistryPath(), - RegistryNodeType.ALL_SERVERS.getRegistryPath() + Constants.SINGLE_SLASH + Constants.RAFT_END_KEY); - - for (KVEntry kvEntry : entryList) { - final String entryKey = readUtf8(kvEntry.getKey()); - final String compositeValue = readUtf8(kvEntry.getValue()); + try { + final Map nodeItemMap = new HashMap<>(); + final List entryList = kvStore.bScan(RegistryNodeType.ALL_SERVERS.getRegistryPath(), + RegistryNodeType.ALL_SERVERS.getRegistryPath() + Constants.SINGLE_SLASH + + Constants.RAFT_END_KEY); + + for (KVEntry kvEntry : entryList) { + final String entryKey = readUtf8(kvEntry.getKey()); + final String compositeValue = readUtf8(kvEntry.getValue()); + + if (StringUtils.isEmpty(compositeValue) + || !entryKey.startsWith(RegistryNodeType.ALL_SERVERS.getRegistryPath())) { + continue; + } - if (StringUtils.isEmpty(compositeValue) - || !entryKey.startsWith(RegistryNodeType.ALL_SERVERS.getRegistryPath())) { - continue; - } + String[] nodeTypeAndValue = compositeValue.split(Constants.AT_SIGN); + if (nodeTypeAndValue.length < 2) { + continue; + } + String nodeType = nodeTypeAndValue[0]; + String nodeValue = nodeTypeAndValue[1]; - String[] nodeTypeAndValue = parseCompositeValue(compositeValue); - if (nodeTypeAndValue.length < 2) { - continue; + nodeItemMap.put(entryKey, NodeItem.builder().nodeValue(nodeValue).nodeType(nodeType).build()); } - String nodeType = nodeTypeAndValue[0]; - String nodeValue = nodeTypeAndValue[1]; - - nodeItemMap.put(entryKey, NodeItem.builder().nodeValue(nodeValue).nodeType(nodeType).build()); - } - return nodeItemMap; - } - - private String[] parseCompositeValue(String compositeValue) { - String[] nodeTypeAndValue = compositeValue.split(Constants.AT_SIGN); - if (nodeTypeAndValue.length < 2) { - log.error("Invalid compositeValue: {}", compositeValue); - return new String[]{}; + return nodeItemMap; + } catch (Exception ex) { + log.error("Fail to getNodeDataMap", ex); + return null; } - String nodeType = nodeTypeAndValue[0]; - String nodeValue = nodeTypeAndValue[1]; - return new String[]{nodeType, nodeValue}; } private void triggerListener(Map nodeDataMap, String subscribeKey, diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegisterServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegistryServer.java similarity index 88% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegisterServer.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegistryServer.java index bc67d99c796da..74b92b5e51401 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegisterServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegistryServer.java @@ -33,12 +33,12 @@ import com.alipay.sofa.jraft.util.Endpoint; @Slf4j -public class RaftRegisterServer { +public class RaftRegistryServer { private final RheaKVStore rheaKVStore; private final RheaKVStoreOptions options; private volatile boolean started; - public RaftRegisterServer(RaftRegistryProperties raftRegistryProperties) { + public RaftRegistryServer(RaftRegistryProperties raftRegistryProperties) { final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured() .withFake(true) // use a fake pd .config(); @@ -59,21 +59,21 @@ public RaftRegisterServer(RaftRegistryProperties raftRegistryProperties) { public void start() { if (this.started) { - log.info("raft register server is already started"); + log.info("raft registry server has already started"); return; } - log.info("starting raft register server..."); + log.info("starting raft registry server..."); this.rheaKVStore.init(this.options); - log.info("raft register server started successfully"); + log.info("raft registry server started successfully"); this.started = true; } public void stop() { - log.info("stopping raft register server"); + log.info("stopping raft registry server"); if (this.rheaKVStore != null) { this.rheaKVStore.shutdown(); } this.started = false; - log.info("raft register server stopped successfully"); + log.info("raft registry server stopped successfully"); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/register/raft/RaftRegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryTestCase.java similarity index 98% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/register/raft/RaftRegistryTestCase.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryTestCase.java index df284c74d2d58..5ccd2311e2245 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/register/raft/RaftRegistryTestCase.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryTestCase.java @@ -14,15 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.register.raft; +package org.apache.dolphinscheduler.plugin.registry.raft; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import org.apache.dolphinscheduler.common.model.BaseHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistry; -import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties; import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.RegistryException;