From 0aa7503e0eeebef9fc686139ceecc419728087e9 Mon Sep 17 00:00:00 2001 From: huangchengming Date: Sun, 18 Feb 2024 14:13:42 +0800 Subject: [PATCH 1/5] The return complete/fail is determined based the ResultCode. --- .../java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java index 6bec906..e2e1063 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java @@ -34,7 +34,6 @@ import java.io.*; import java.lang.reflect.Constructor; import java.time.Instant; -import java.util.Base64; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; @@ -233,7 +232,13 @@ Future create(String path, V v, Optional timeToLive) { : curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT); creator.inBackground((cl, el) -> { if (el.getType() == CuratorEventType.CREATE) { - vertx.runOnContext(aVoid -> future.complete(el.getStat())); + KeeperException.Code code = KeeperException.Code.get(el.getResultCode()); + if (code == KeeperException.Code.OK) { + vertx.runOnContext(aVoid -> future.complete(el.getStat())); + } else { + KeeperException ex = KeeperException.create(code, path); + vertx.runOnContext(aVoid -> future.fail(ex)); + } } }).forPath(path, asByte(v)); } catch (Exception ex) { From 9d1c4d690cab4b7502b739e08ba81cc340b981ab Mon Sep 17 00:00:00 2001 From: huangchengming Date: Sun, 18 Feb 2024 14:17:05 +0800 Subject: [PATCH 2/5] The return complete/fail is determined based the ResultCode. --- src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java index e2e1063..24a6da8 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java @@ -34,6 +34,7 @@ import java.io.*; import java.lang.reflect.Constructor; import java.time.Instant; +import java.util.Base64; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; From cffc3f79baaf23034de522c2fabe4605761d4f07 Mon Sep 17 00:00:00 2001 From: huangchengming Date: Sun, 18 Feb 2024 15:06:24 +0800 Subject: [PATCH 3/5] hcmsxy@163.com Signed-off-by: huangchengming --- .../java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java index 6bec906..24a6da8 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java @@ -233,7 +233,13 @@ Future create(String path, V v, Optional timeToLive) { : curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT); creator.inBackground((cl, el) -> { if (el.getType() == CuratorEventType.CREATE) { - vertx.runOnContext(aVoid -> future.complete(el.getStat())); + KeeperException.Code code = KeeperException.Code.get(el.getResultCode()); + if (code == KeeperException.Code.OK) { + vertx.runOnContext(aVoid -> future.complete(el.getStat())); + } else { + KeeperException ex = KeeperException.create(code, path); + vertx.runOnContext(aVoid -> future.fail(ex)); + } } }).forPath(path, asByte(v)); } catch (Exception ex) { From 8ec094497b5f429d9cca223e04eb19104027822e Mon Sep 17 00:00:00 2001 From: Chengminghuang Date: Sun, 18 Feb 2024 15:29:10 +0800 Subject: [PATCH 4/5] Remove unused ref --- src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java index 24a6da8..4965b4f 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java @@ -34,7 +34,7 @@ import java.io.*; import java.lang.reflect.Constructor; import java.time.Instant; -import java.util.Base64; + import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; From 6948270c8bb4d681533adec071f4e33118049d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=AA=E5=B1=B1=E6=9C=89=E5=AE=A2?= Date: Sat, 24 Feb 2024 13:41:02 +0800 Subject: [PATCH 5/5] Add files via upload add test case for "extendedTypesEnabled" flag. --- ...redAsyncMapUnimplementedExceptionTest.java | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapUnimplementedExceptionTest.java diff --git a/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapUnimplementedExceptionTest.java b/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapUnimplementedExceptionTest.java new file mode 100644 index 0000000..dcfb8ab --- /dev/null +++ b/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapUnimplementedExceptionTest.java @@ -0,0 +1,163 @@ +/* + * Copyright 2018 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.vertx.core.shareddata; + +import io.vertx.core.Vertx; +import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager; +import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMap; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.KeeperException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.jayway.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Created by huang.chengming + */ +public class ZKClusteredAsyncMapUnimplementedExceptionTest { + + private int doneFlag = 0; + private Object doneResult = null; + + private void reset() { + doneFlag = 0; + doneResult = null; + } + + @Test + public void testZKUnimplementedException() throws Exception { + + TestingServer zkServer = mockZookeeperServer(false); + CuratorFramework curator = mockCuratorFramework(zkServer.getConnectString()); + curator.start(); + + Vertx vertx = Vertx.builder() + .withClusterManager(new ZookeeperClusterManager(curator)) + .buildClustered() + .toCompletionStage() + .toCompletableFuture() + .get(); + + + ZKAsyncMap zkMap = new ZKAsyncMap<>(vertx, curator, "test-Map"); + + // put value + reset(); + zkMap.put("1", "1-value").onComplete(ar -> doneFlag = 1); + await().atMost(5, SECONDS).until(() -> doneFlag == 1); + + + // check value + reset(); + zkMap.get("1") + .onSuccess(ok -> doneResult = ok) + .onFailure(err -> doneResult = err) + .onComplete(ar -> doneFlag = 1); + await().atMost(5, SECONDS).until(() -> doneFlag == 1); + Assert.assertEquals("1-value", doneResult); + + + // check TTL + zkMap.put("2", "2-value", 1000) + .onSuccess(ok -> doneResult = ok) + .onFailure(err -> doneResult = err) + .onComplete(ar -> doneFlag = 1); + await().atMost(5, SECONDS).until(() -> doneFlag == 1); + Assert.assertTrue(doneResult instanceof KeeperException); + + /* + * When "extendedTypesEnabled" is not set, TTL nodes cannot be used. + * TTL Nodes must be enabled via System property as they are disabled by default. + * Create TTL Nodes without System property set the server will throw KeeperException.UnimplementedException. + */ + Assert.assertTrue(doneResult instanceof KeeperException.UnimplementedException); + + curator.close(); + zkServer.close(); + } + + + @Test + public void testEnabledExtendedTypesFlag() throws Exception { + + TestingServer zkServer = mockZookeeperServer(true); + CuratorFramework curator = mockCuratorFramework(zkServer.getConnectString()); + curator.start(); + + Vertx vertx = Vertx.builder() + .withClusterManager(new ZookeeperClusterManager(curator)) + .buildClustered() + .toCompletionStage() + .toCompletableFuture() + .get(); + + + ZKAsyncMap zkMap = new ZKAsyncMap<>(vertx, curator, "test2-Map"); + + // check TTL + reset(); + zkMap.put("3", "3-value", 1000) + .onSuccess(ok -> doneResult = ok) + .onFailure(err -> doneResult = err) + .onComplete(ar -> doneFlag = 1); + await().atMost(5, SECONDS).until(() -> doneFlag == 1); + Assert.assertNull(doneResult); + + + reset(); + zkMap.get("3") + .onSuccess(ok -> doneResult = ok) + .onFailure(err -> doneResult = err) + .onComplete(ar -> doneFlag = 1); + await().atMost(5, SECONDS).until(() -> doneFlag == 1); + Assert.assertEquals("3-value", doneResult); + + curator.close(); + zkServer.close(); + + } + + + // -------------------- + + private TestingServer mockZookeeperServer(boolean extendedTypesEnabled) throws Exception { + Map prop = new HashMap<>(); + prop.put("extendedTypesEnabled", String.valueOf(extendedTypesEnabled)); + InstanceSpec spec = new InstanceSpec(null, -1, -1, -1, true, -1, 10000, 20, prop); + return new TestingServer(spec, true); + } + + private CuratorFramework mockCuratorFramework(String connectString) { + return CuratorFrameworkFactory.builder() + .connectString(connectString) + .namespace("io.vertx") + .sessionTimeoutMs(10 * 1000) + .connectionTimeoutMs(10 * 1000) + .retryPolicy(new RetryOneTime(1000)) + .build(); + } + +}