diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java index 162e963e..65dc7835 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java @@ -8,6 +8,7 @@ import java.io.ObjectOutput; import java.util.Objects; import java.util.StringJoiner; +import java.util.UUID; /** * Cluster member which represents node in the cluster and contains its id and address. This class @@ -73,8 +74,6 @@ public String namespace() { * from other cluster members. * * @see io.scalecube.cluster.transport.api.TransportConfig#port(int) - * @see ClusterConfig#containerHost(String) - * @see ClusterConfig#containerPort(Integer) * @return member address */ public Address address() { @@ -131,13 +130,22 @@ public void readExternal(ObjectInput in) throws IOException { this.namespace = in.readUTF(); } + private static String stringifyId(String id) { + try { + final UUID uuid = UUID.fromString(id); + return Long.toHexString(uuid.getMostSignificantBits() & Long.MAX_VALUE); + } catch (Exception ex) { + return id; + } + } + @Override public String toString() { StringJoiner stringJoiner = new StringJoiner(":"); if (alias == null) { - return stringJoiner.add(namespace).add(id + "@" + address).toString(); + return stringJoiner.add(namespace).add(stringifyId(id) + "@" + address).toString(); } else { - return stringJoiner.add(namespace).add(alias).add(id + "@" + address).toString(); + return stringJoiner.add(namespace).add(alias).add(stringifyId(id) + "@" + address).toString(); } } } diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 6e65a42e..1f996c6e 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -100,7 +100,6 @@ public final class ClusterImpl implements Cluster { private MembershipProtocolImpl membership; private MetadataStore metadataStore; private Scheduler scheduler; - private CorrelationIdGenerator cidGenerator; private ClusterMonitorModel.Builder monitorModelBuilder; public ClusterImpl() { @@ -255,7 +254,6 @@ private Mono doStart0() { localMember = createLocalMember(boundTransport.address()); transport = new SenderAwareTransport(boundTransport, localMember.address()); - cidGenerator = new CorrelationIdGenerator(localMember.id()); scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true); monitorModelBuilder = new ClusterMonitorModel.Builder(); @@ -265,8 +263,7 @@ private Mono doStart0() { transport, membershipSink.asFlux().onBackpressureBuffer(), config.failureDetectorConfig(), - scheduler, - cidGenerator); + scheduler); gossip = new GossipProtocolImpl( @@ -278,7 +275,7 @@ private Mono doStart0() { metadataStore = new MetadataStoreImpl( - localMember, transport, config.metadata(), config, scheduler, cidGenerator); + localMember, transport, config.metadata(), config, scheduler); membership = new MembershipProtocolImpl( @@ -289,7 +286,6 @@ private Mono doStart0() { metadataStore, config, scheduler, - cidGenerator, monitorModelBuilder); actionsDisposables.add( diff --git a/cluster/src/main/java/io/scalecube/cluster/CorrelationIdGenerator.java b/cluster/src/main/java/io/scalecube/cluster/CorrelationIdGenerator.java deleted file mode 100644 index 689f34d3..00000000 --- a/cluster/src/main/java/io/scalecube/cluster/CorrelationIdGenerator.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.scalecube.cluster; - -import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; - -public class CorrelationIdGenerator { - private final String cidPrefix; - private final AtomicLong counter = new AtomicLong(System.currentTimeMillis()); - - public CorrelationIdGenerator(String cidPrefix) { - this.cidPrefix = Objects.requireNonNull(cidPrefix, "cidPrefix"); - } - - public String nextCid() { - return cidPrefix + "-" + counter.incrementAndGet(); - } -} diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 1943d9b8..ac270c4e 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -2,7 +2,6 @@ import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; -import io.scalecube.cluster.CorrelationIdGenerator; import io.scalecube.cluster.Member; import io.scalecube.cluster.fdetector.PingData.AckType; import io.scalecube.cluster.membership.MemberStatus; @@ -16,6 +15,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -41,7 +41,6 @@ public final class FailureDetectorImpl implements FailureDetector { private final Member localMember; private final Transport transport; private final FailureDetectorConfig config; - private final CorrelationIdGenerator cidGenerator; // State @@ -69,21 +68,18 @@ public final class FailureDetectorImpl implements FailureDetector { * @param membershipProcessor membership event processor * @param config failure detector settings * @param scheduler scheduler - * @param cidGenerator correlationId generator */ public FailureDetectorImpl( Member localMember, Transport transport, Flux membershipProcessor, FailureDetectorConfig config, - Scheduler scheduler, - CorrelationIdGenerator cidGenerator) { + Scheduler scheduler) { this.localMember = Objects.requireNonNull(localMember); this.transport = Objects.requireNonNull(transport); this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); - this.cidGenerator = Objects.requireNonNull(cidGenerator); // Subscribe actionsDisposables.addAll( @@ -144,7 +140,7 @@ private void doPing() { } // Send ping - String cid = cidGenerator.nextCid(); + String cid = UUID.randomUUID().toString(); PingData pingData = new PingData(localMember, pingMember); Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build(); diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 60896ac7..98800f7d 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -7,7 +7,6 @@ import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterMath; -import io.scalecube.cluster.CorrelationIdGenerator; import io.scalecube.cluster.Member; import io.scalecube.cluster.fdetector.FailureDetector; import io.scalecube.cluster.fdetector.FailureDetectorConfig; @@ -35,6 +34,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -80,7 +80,6 @@ private enum MembershipUpdateReason { private final FailureDetector failureDetector; private final GossipProtocol gossipProtocol; private final MetadataStore metadataStore; - private final CorrelationIdGenerator cidGenerator; private final ClusterMonitorModel.Builder monitorModelBuilder; // State @@ -112,7 +111,6 @@ private enum MembershipUpdateReason { * @param metadataStore metadata store * @param config cluster config parameters * @param scheduler scheduler - * @param cidGenerator correlation id generator * @param monitorModelBuilder monitor model builder */ public MembershipProtocolImpl( @@ -123,7 +121,6 @@ public MembershipProtocolImpl( MetadataStore metadataStore, ClusterConfig config, Scheduler scheduler, - CorrelationIdGenerator cidGenerator, ClusterMonitorModel.Builder monitorModelBuilder) { this.transport = Objects.requireNonNull(transport); @@ -132,7 +129,6 @@ public MembershipProtocolImpl( this.metadataStore = Objects.requireNonNull(metadataStore); this.localMember = Objects.requireNonNull(localMember); this.scheduler = Objects.requireNonNull(scheduler); - this.cidGenerator = Objects.requireNonNull(cidGenerator); this.monitorModelBuilder = Objects.requireNonNull(monitorModelBuilder); this.membershipConfig = Objects.requireNonNull(config).membershipConfig(); this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig(); @@ -277,7 +273,8 @@ private void start0(MonoSink sink) { .map( address -> transport - .requestResponse(address, prepareSyncDataMsg(SYNC, cidGenerator.nextCid())) + .requestResponse( + address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())) .doOnError( ex -> LOGGER.warn( diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index 1f0f599d..35ba5328 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -1,7 +1,6 @@ package io.scalecube.cluster.metadata; import io.scalecube.cluster.ClusterConfig; -import io.scalecube.cluster.CorrelationIdGenerator; import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; @@ -12,6 +11,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -36,7 +36,6 @@ public class MetadataStoreImpl implements MetadataStore { private final Member localMember; private final Transport transport; private final ClusterConfig config; - private final CorrelationIdGenerator cidGenerator; // State @@ -58,20 +57,17 @@ public class MetadataStoreImpl implements MetadataStore { * @param localMetadata local metadata (optional) * @param config config * @param scheduler scheduler - * @param cidGenerator correlationId generator */ public MetadataStoreImpl( Member localMember, Transport transport, Object localMetadata, ClusterConfig config, - Scheduler scheduler, - CorrelationIdGenerator cidGenerator) { + Scheduler scheduler) { this.localMember = Objects.requireNonNull(localMember); this.transport = Objects.requireNonNull(transport); this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); - this.cidGenerator = Objects.requireNonNull(cidGenerator); this.localMetadata = localMetadata; // optional } @@ -151,7 +147,7 @@ public ByteBuffer removeMetadata(Member member) { public Mono fetchMetadata(Member member) { return Mono.defer( () -> { - final String cid = cidGenerator.nextCid(); + final String cid = UUID.randomUUID().toString(); final Address targetAddress = member.address(); LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member); diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 03bfd58c..316f814e 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -6,7 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.scalecube.cluster.BaseTest; -import io.scalecube.cluster.CorrelationIdGenerator; import io.scalecube.cluster.Member; import io.scalecube.cluster.membership.MemberStatus; import io.scalecube.cluster.membership.MembershipEvent; @@ -419,10 +418,7 @@ private FailureDetectorImpl createFd( .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE)) .map(member -> MembershipEvent.createAdded(member, null, 0)); - CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id()); - - return new FailureDetectorImpl( - localMember, transport, membershipFlux, config, scheduler, cidGenerator); + return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler); } private void start(List fdetectors) { diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 63cea74a..f348fdf1 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -5,7 +5,6 @@ import io.scalecube.cluster.BaseTest; import io.scalecube.cluster.ClusterConfig; -import io.scalecube.cluster.CorrelationIdGenerator; import io.scalecube.cluster.Member; import io.scalecube.cluster.fdetector.FailureDetectorImpl; import io.scalecube.cluster.gossip.GossipProtocolImpl; @@ -1129,16 +1128,13 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf Sinks.Many membershipProcessor = Sinks.many().multicast().directBestEffort(); - CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id()); - FailureDetectorImpl failureDetector = new FailureDetectorImpl( localMember, transport, membershipProcessor.asFlux().onBackpressureBuffer(), config.failureDetectorConfig(), - scheduler, - cidGenerator); + scheduler); GossipProtocolImpl gossipProtocol = new GossipProtocolImpl( @@ -1149,7 +1145,7 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf scheduler); MetadataStoreImpl metadataStore = - new MetadataStoreImpl(localMember, transport, null, config, scheduler, cidGenerator); + new MetadataStoreImpl(localMember, transport, null, config, scheduler); MembershipProtocolImpl membership = new MembershipProtocolImpl( @@ -1160,7 +1156,6 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf metadataStore, config, scheduler, - cidGenerator, new ClusterMonitorModel.Builder()); membership diff --git a/examples/pom.xml b/examples/pom.xml index 6d9b6476..99c2e068 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -41,23 +41,4 @@ - - - - org.apache.maven.plugins - maven-compiler-plugin - - - org.apache.maven.plugins - maven-surefire-plugin - - - maven-jar-plugin - - - maven-dependency-plugin - - - - diff --git a/examples/scripts/issues/187/README b/examples/scripts/issues/187/README deleted file mode 100644 index c830e470..00000000 --- a/examples/scripts/issues/187/README +++ /dev/null @@ -1,8 +0,0 @@ -# To disable 4800 -sudo iptables -A INPUT -p tcp --dport 4800 -j DROP - -# To list existing table -sudo iptables -L --line-numbers - -# To remove rule 3 of the INPUT chain -sudo iptables -D INPUT 3 diff --git a/examples/scripts/issues/187/node-i-th.sh b/examples/scripts/issues/187/node-i-th.sh deleted file mode 100755 index f69f4507..00000000 --- a/examples/scripts/issues/187/node-i-th.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env bash - -cd $(dirname $0) -cd ../../../ - -JAR_FILE=$(ls target |grep jar) -echo $JAR_FILE - -DEFAULT_JMX_OPTS="-Djava.rmi.server.hostname=0.0.0.0 --Dcom.sun.management.jmxremote.authenticate=false --Dcom.sun.management.jmxremote.ssl=false" - -DEFAULT_OOM_OPTS="-XX:+HeapDumpOnOutOfMemoryError --XX:HeapDumpPath=dumps/node-i-th_\`date\`.hprof --XX:+UseGCOverheadLimit" - -SEED=localhost:4545 - -export INSTANCE_ID=node-i-th-"$(date +%s%N | cut -b1-13)" - -java \ --cp target/${JAR_FILE}:target/lib/* \ --Dlog4j.configurationFile="log4j2-issue187-debug.xml" \ --Dlog4j2.contextSelector="org.apache.logging.log4j.core.async.AsyncLoggerContextSelector" \ -${JVM_OPTS} ${DEFAULT_JMX_OPTS} ${DEFAULT_OOM_OPTS} \ -io.scalecube.issues.i187.NodeIthRunner $SEED diff --git a/examples/scripts/issues/187/node-no-inbound-1.sh b/examples/scripts/issues/187/node-no-inbound-1.sh deleted file mode 100755 index 600e5cfd..00000000 --- a/examples/scripts/issues/187/node-no-inbound-1.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env bash - -cd $(dirname $0) -cd ../../../ - -JAR_FILE=$(ls target |grep jar) -echo $JAR_FILE - -SEED=localhost:4545 -PORT=4800 - -DEFAULT_JMX_OPTS="-Djava.rmi.server.hostname=0.0.0.0 --Dcom.sun.management.jmxremote.port=4801 --Dcom.sun.management.jmxremote.rmi.port=4801 --Dcom.sun.management.jmxremote.authenticate=false --Dcom.sun.management.jmxremote.ssl=false" - -DEFAULT_OOM_OPTS="-XX:+HeapDumpOnOutOfMemoryError --XX:HeapDumpPath=dumps/node-no-inbound_\`date\`.hprof --XX:+UseGCOverheadLimit" - -export INSTANCE_ID=node-no-inbound-$PORT - -java \ --cp target/${JAR_FILE}:target/lib/* \ --Dlog4j.configurationFile="log4j2-issue187-debug.xml" \ --Dlog4j2.contextSelector="org.apache.logging.log4j.core.async.AsyncLoggerContextSelector" \ -${JVM_OPTS} ${DEFAULT_JMX_OPTS} ${DEFAULT_OOM_OPTS} \ -io.scalecube.issues.i187.NodeNoInboundRunner $PORT $SEED diff --git a/examples/scripts/issues/187/node-no-inbound-2.sh b/examples/scripts/issues/187/node-no-inbound-2.sh deleted file mode 100755 index e8bca650..00000000 --- a/examples/scripts/issues/187/node-no-inbound-2.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env bash - -cd $(dirname $0) -cd ../../../ - -JAR_FILE=$(ls target |grep jar) -echo $JAR_FILE - -SEED=localhost:4545 -PORT=5800 - -DEFAULT_JMX_OPTS="-Djava.rmi.server.hostname=0.0.0.0 --Dcom.sun.management.jmxremote.port=5801 --Dcom.sun.management.jmxremote.rmi.port=5801 --Dcom.sun.management.jmxremote.authenticate=false --Dcom.sun.management.jmxremote.ssl=false" - -DEFAULT_OOM_OPTS="-XX:+HeapDumpOnOutOfMemoryError --XX:HeapDumpPath=dumps/node-no-inbound_\`date\`.hprof --XX:+UseGCOverheadLimit" - -export INSTANCE_ID=node-no-inbound-$PORT - -java \ --cp target/${JAR_FILE}:target/lib/* \ --Dlog4j.configurationFile="log4j2-issue187-debug.xml" \ --Dlog4j2.contextSelector="org.apache.logging.log4j.core.async.AsyncLoggerContextSelector" \ -${JVM_OPTS} ${DEFAULT_JMX_OPTS} ${DEFAULT_OOM_OPTS} \ -io.scalecube.issues.i187.NodeNoInboundRunner $PORT $SEED diff --git a/examples/scripts/issues/187/seed.sh b/examples/scripts/issues/187/seed.sh deleted file mode 100755 index ae58be82..00000000 --- a/examples/scripts/issues/187/seed.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env bash - -cd $(dirname $0) -cd ../../../ - -JAR_FILE=$(ls target |grep jar) -echo $JAR_FILE - -PORT=4545 - -DEFAULT_JMX_OPTS="-Djava.rmi.server.hostname=0.0.0.0 --Dcom.sun.management.jmxremote.port=5678 --Dcom.sun.management.jmxremote.rmi.port=5678 --Dcom.sun.management.jmxremote.authenticate=false --Dcom.sun.management.jmxremote.ssl=false" - -DEFAULT_OOM_OPTS="-XX:+HeapDumpOnOutOfMemoryError --XX:HeapDumpPath=dumps/seed_\`date\`.hprof --XX:+UseGCOverheadLimit" - -export INSTANCE_ID=seed - -java \ --cp target/${JAR_FILE}:target/lib/* \ --Dlog4j.configurationFile="log4j2-issue187-debug.xml" \ --Dlog4j2.contextSelector="org.apache.logging.log4j.core.async.AsyncLoggerContextSelector" \ -${JVM_OPTS} ${DEFAULT_JMX_OPTS} ${DEFAULT_OOM_OPTS} \ -io.scalecube.issues.i187.SeedRunner $PORT diff --git a/pom.xml b/pom.xml index 3087d19b..573b88b2 100644 --- a/pom.xml +++ b/pom.xml @@ -4,8 +4,8 @@ io.scalecube - scalecube-parent-pom - 0.2.19 + scalecube-parent + 0.2.20 scalecube-cluster-parent @@ -33,16 +33,19 @@ - 1.0.18 + 1.0.21 - 1.7.30 - 2.13.2 - 2020.0.10 - 2.11.0 + 1.7.36 + 2.17.2 + 2020.0.23 + 2.13.3 - 2.27.0 - 5.1.1 + 4.6.1 + 5.8.2 1.3 + + https://maven.pkg.github.com/scalecube/scalecube-cluster + @@ -80,12 +83,6 @@ ${hamcrest.version} test - - org.hamcrest - hamcrest-core - ${hamcrest.version} - test - io.projectreactor reactor-test @@ -102,6 +99,7 @@ ${slf4j.version} + org.apache.logging.log4j log4j-bom @@ -137,44 +135,4 @@ - - - deploy2Github - - - github - GitHub Packages - https://maven.pkg.github.com/scalecube/scalecube-cluster - - - - - deploy2Maven - - - ossrh - Central Repository OSSRH - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - - - - - org.sonatype.plugins - nexus-staging-maven-plugin - - - maven-source-plugin - - - maven-javadoc-plugin - - - maven-gpg-plugin - - - - - -