diff --git a/api/src/main/java/org/apache/distributedlog/api/kv/result/TxnResult.java b/api/src/main/java/org/apache/distributedlog/api/kv/result/TxnResult.java index b956e3eab5a..a91f472ebdb 100644 --- a/api/src/main/java/org/apache/distributedlog/api/kv/result/TxnResult.java +++ b/api/src/main/java/org/apache/distributedlog/api/kv/result/TxnResult.java @@ -22,7 +22,7 @@ /** * Txn result. */ -public interface TxnResult { +public interface TxnResult extends Result { boolean isSuccess(); diff --git a/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/KvUtils.java b/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/KvUtils.java index 923545d8a0b..20ad4980bbc 100644 --- a/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/KvUtils.java +++ b/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/KvUtils.java @@ -17,6 +17,7 @@ import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_DELETE; import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_PUT; import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_RANGE; +import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_TXN; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; @@ -24,21 +25,34 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.List; +import org.apache.distributedlog.api.kv.op.CompareOp; +import org.apache.distributedlog.api.kv.op.DeleteOp; +import org.apache.distributedlog.api.kv.op.Op; +import org.apache.distributedlog.api.kv.op.PutOp; +import org.apache.distributedlog.api.kv.op.RangeOp; import org.apache.distributedlog.api.kv.options.DeleteOption; import org.apache.distributedlog.api.kv.options.PutOption; import org.apache.distributedlog.api.kv.options.RangeOption; import org.apache.distributedlog.api.kv.result.DeleteResult; import org.apache.distributedlog.api.kv.result.PutResult; import org.apache.distributedlog.api.kv.result.RangeResult; +import org.apache.distributedlog.api.kv.result.TxnResult; import org.apache.distributedlog.clients.impl.kv.result.PutResultImpl; import org.apache.distributedlog.clients.impl.kv.result.ResultFactory; +import org.apache.distributedlog.clients.impl.kv.result.TxnResultImpl; import org.apache.distributedlog.stream.proto.kv.KeyValue; +import org.apache.distributedlog.stream.proto.kv.rpc.Compare; +import org.apache.distributedlog.stream.proto.kv.rpc.Compare.CompareResult; +import org.apache.distributedlog.stream.proto.kv.rpc.Compare.CompareTarget; import org.apache.distributedlog.stream.proto.kv.rpc.DeleteRangeRequest; import org.apache.distributedlog.stream.proto.kv.rpc.DeleteRangeResponse; import org.apache.distributedlog.stream.proto.kv.rpc.PutRequest; import org.apache.distributedlog.stream.proto.kv.rpc.PutResponse; import org.apache.distributedlog.stream.proto.kv.rpc.RangeRequest; import org.apache.distributedlog.stream.proto.kv.rpc.RangeResponse; +import org.apache.distributedlog.stream.proto.kv.rpc.RequestOp; +import org.apache.distributedlog.stream.proto.kv.rpc.TxnRequest; +import org.apache.distributedlog.stream.proto.kv.rpc.TxnResponse; import org.apache.distributedlog.stream.proto.storage.StorageContainerRequest; /** @@ -160,4 +174,145 @@ public static DeleteResult newDeleteResult( .prevKvs(fromProtoKeyValues(response.getPrevKvsList(), kvFactory)); } + public static CompareTarget toProtoTarget(org.apache.distributedlog.api.kv.op.CompareTarget target) { + switch (target) { + case MOD: + return CompareTarget.MOD; + case VALUE: + return CompareTarget.VALUE; + case CREATE: + return CompareTarget.CREATE; + case VERSION: + return CompareTarget.VERSION; + default: + return CompareTarget.UNRECOGNIZED; + } + } + + public static CompareResult toProtoResult(org.apache.distributedlog.api.kv.op.CompareResult result) { + switch (result) { + case LESS: + return CompareResult.LESS; + case EQUAL: + return CompareResult.EQUAL; + case GREATER: + return CompareResult.GREATER; + case NOT_EQUAL: + return CompareResult.NOT_EQUAL; + default: + return CompareResult.UNRECOGNIZED; + } + } + + public static Compare.Builder toProtoCompare(CompareOp cmp) { + Compare.Builder builder = Compare.newBuilder() + .setTarget(toProtoTarget(cmp.target())) + .setResult(toProtoResult(cmp.result())) + .setKey(toProtoKey(cmp.key())); + switch (cmp.target()) { + case VERSION: + builder.setVersion(cmp.revision()); + break; + case MOD: + builder.setModRevision(cmp.revision()); + break; + case CREATE: + builder.setCreateRevision(cmp.revision()); + break; + case VALUE: + builder.setValue(toProtoKey(cmp.value())); + break; + default: + break; + } + return builder; + } + + public static PutRequest.Builder toProtoPutRequest(PutOp op) { + return PutRequest.newBuilder() + .setPrevKv(op.option().prevKv()) + .setKey(toProtoKey(op.key())) + .setValue(toProtoKey(op.value())); + } + + public static DeleteRangeRequest.Builder toProtoDeleteRequest(DeleteOp op) { + DeleteRangeRequest.Builder builder = DeleteRangeRequest.newBuilder() + .setKey(toProtoKey(op.key())) + .setPrevKv(op.option().prevKv()); + if (null != op.option().endKey()) { + builder.setRangeEnd(toProtoKey(op.option().endKey())); + } + return builder; + } + + public static RangeRequest.Builder toProtoRangeRequest(RangeOp op) { + RangeRequest.Builder builder = RangeRequest.newBuilder() + .setKey(toProtoKey(op.key())) + .setCountOnly(op.option().countOnly()) + .setKeysOnly(op.option().keysOnly()) + .setLimit(op.option().limit()); + if (null != op.option().endKey()) { + builder.setRangeEnd(toProtoKey(op.option().endKey())); + } + return builder; + } + + public static RequestOp.Builder toProtoRequest(Op op) { + RequestOp.Builder reqBuilder = RequestOp.newBuilder(); + switch (op.type()) { + case DELETE: + reqBuilder.setRequestDeleteRange(toProtoDeleteRequest((DeleteOp) op)); + break; + case RANGE: + reqBuilder.setRequestRange(toProtoRangeRequest((RangeOp) op)); + break; + case PUT: + reqBuilder.setRequestPut(toProtoPutRequest((PutOp) op)); + break; + default: + throw new IllegalArgumentException("Type '" + op.type() + "' is not supported in a txn yet."); + } + return reqBuilder; + } + + public static StorageContainerRequest newKvTxnRequest( + long scId, + TxnRequest.Builder txnReq) { + return StorageContainerRequest.newBuilder() + .setScId(scId) + .setType(KV_TXN) + .setKvTxnReq(txnReq) + .build(); + } + + public static TxnResult newKvTxnResult( + TxnResponse txnResponse, + ResultFactory resultFactory, + KeyValueFactory kvFactory) { + TxnResultImpl result = resultFactory.newTxnResult(); + result.isSuccess(txnResponse.getSucceeded()); + result.results(Lists.transform(txnResponse.getResponsesList(), op -> { + switch (op.getResponseCase()) { + case RESPONSE_PUT: + return newPutResult( + op.getResponsePut(), + resultFactory, + kvFactory); + case RESPONSE_RANGE: + return newRangeResult( + op.getResponseRange(), + resultFactory, + kvFactory); + case RESPONSE_DELETE_RANGE: + return newDeleteResult( + op.getResponseDeleteRange(), + resultFactory, + kvFactory); + default: + throw new IllegalArgumentException("Unknown response type '" + op.getResponseCase() + "'"); + } + })); + return result; + } + } diff --git a/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableImpl.java b/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableImpl.java index e84c83636a9..a6d827bb560 100644 --- a/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableImpl.java +++ b/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableImpl.java @@ -31,6 +31,8 @@ import org.apache.bookkeeper.common.router.ByteBufHashRouter; import org.apache.distributedlog.api.kv.PTable; import org.apache.distributedlog.api.kv.Txn; +import org.apache.distributedlog.api.kv.op.CompareOp; +import org.apache.distributedlog.api.kv.op.Op; import org.apache.distributedlog.api.kv.op.OpFactory; import org.apache.distributedlog.api.kv.options.DeleteOption; import org.apache.distributedlog.api.kv.options.PutOption; @@ -38,6 +40,7 @@ import org.apache.distributedlog.api.kv.result.DeleteResult; import org.apache.distributedlog.api.kv.result.PutResult; import org.apache.distributedlog.api.kv.result.RangeResult; +import org.apache.distributedlog.api.kv.result.TxnResult; import org.apache.distributedlog.clients.impl.internal.api.HashStreamRanges; import org.apache.distributedlog.clients.impl.internal.api.StorageServerClientManager; import org.apache.distributedlog.clients.impl.kv.op.OpFactoryImpl; @@ -51,17 +54,40 @@ @Slf4j public class PByteBufTableImpl implements PTable { + static final IllegalStateException CAUSE = + new IllegalStateException("No range found for a given routing key"); + private static class FailRequestTxn implements Txn { - private static class FailRequestKeyValueSpace implements PTable { + @Override + public Txn If(CompareOp... cmps) { + return this; + } - private static final IllegalStateException CAUSE = - new IllegalStateException("No range found for a given routing key"); + @Override + public Txn Then(Op... ops) { + return this; + } + + @Override + public Txn Else(Op... ops) { + return this; + } + + @Override + public CompletableFuture> commit() { + return FutureUtils.exception(CAUSE); + } + } + + static class FailRequestKeyValueSpace implements PTable { private final OpFactory opFactory; + private final FailRequestTxn txn; private FailRequestKeyValueSpace(OpFactory opFactory) { this.opFactory = opFactory; + this.txn = new FailRequestTxn(); } @Override @@ -88,8 +114,7 @@ public CompletableFuture> delete(ByteBuf pKey, @Override public Txn txn(ByteBuf pKey) { - // ToDO: - return null; + return txn; } @Override @@ -250,8 +275,8 @@ public CompletableFuture> delete(ByteBuf pKey, @Override public Txn txn(ByteBuf pKey) { - // TODO: - return null; + Long range = rangeRouter.getRange(pKey); + return getTableRange(range).txn(pKey); } @Override diff --git a/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableRangeImpl.java b/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableRangeImpl.java index afc3349bed4..69f673f32eb 100644 --- a/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableRangeImpl.java +++ b/clients/java/kv/src/main/java/org/apache/distributedlog/clients/impl/kv/PByteBufTableRangeImpl.java @@ -14,13 +14,20 @@ package org.apache.distributedlog.clients.impl.kv; +import static org.apache.distributedlog.clients.impl.kv.KvUtils.toProtoCompare; +import static org.apache.distributedlog.clients.impl.kv.KvUtils.toProtoRequest; + +import com.google.common.collect.Lists; import com.google.protobuf.UnsafeByteOperations; import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.kv.PTable; import org.apache.distributedlog.api.kv.Txn; +import org.apache.distributedlog.api.kv.op.CompareOp; +import org.apache.distributedlog.api.kv.op.Op; import org.apache.distributedlog.api.kv.op.OpFactory; import org.apache.distributedlog.api.kv.options.DeleteOption; import org.apache.distributedlog.api.kv.options.PutOption; @@ -28,10 +35,12 @@ import org.apache.distributedlog.api.kv.result.DeleteResult; import org.apache.distributedlog.api.kv.result.PutResult; import org.apache.distributedlog.api.kv.result.RangeResult; +import org.apache.distributedlog.api.kv.result.TxnResult; import org.apache.distributedlog.clients.impl.container.StorageContainerChannel; import org.apache.distributedlog.clients.impl.kv.result.ResultFactory; import org.apache.distributedlog.stream.proto.RangeProperties; import org.apache.distributedlog.stream.proto.kv.rpc.RoutingHeader; +import org.apache.distributedlog.stream.proto.kv.rpc.TxnRequest; /** * A range of a table. @@ -65,9 +74,9 @@ class PByteBufTableRangeImpl implements PTable { private RoutingHeader.Builder newRoutingHeader(ByteBuf pKey) { return RoutingHeader.newBuilder() - .setStreamId(streamId) - .setRangeId(rangeProps.getRangeId()) - .setRKey(UnsafeByteOperations.unsafeWrap(pKey.nioBuffer())); + .setStreamId(streamId) + .setRangeId(rangeProps.getRangeId()) + .setRKey(UnsafeByteOperations.unsafeWrap(pKey.nioBuffer())); } @Override @@ -79,13 +88,13 @@ public CompletableFuture> get( option.endKey().retain(); } return TableRequestProcessor.of( - KvUtils.newKvRangeRequest( - scChannel.getStorageContainerId(), - KvUtils.newRangeRequest(lKey, option) - .setHeader(newRoutingHeader(pKey))), - response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory), - scChannel, - executor + KvUtils.newKvRangeRequest( + scChannel.getStorageContainerId(), + KvUtils.newRangeRequest(lKey, option) + .setHeader(newRoutingHeader(pKey))), + response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory), + scChannel, + executor ).process().whenComplete((value, cause) -> { pKey.release(); lKey.release(); @@ -104,13 +113,13 @@ public CompletableFuture> put(ByteBuf pKey, lKey.retain(); value.retain(); return TableRequestProcessor.of( - KvUtils.newKvPutRequest( - scChannel.getStorageContainerId(), - KvUtils.newPutRequest(lKey, value, option) - .setHeader(newRoutingHeader(pKey))), - response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory), - scChannel, - executor + KvUtils.newKvPutRequest( + scChannel.getStorageContainerId(), + KvUtils.newPutRequest(lKey, value, option) + .setHeader(newRoutingHeader(pKey))), + response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory), + scChannel, + executor ).process().whenComplete((ignored, cause) -> { pKey.release(); lKey.release(); @@ -128,13 +137,13 @@ public CompletableFuture> delete(ByteBuf pKey, option.endKey().retain(); } return TableRequestProcessor.of( - KvUtils.newKvDeleteRequest( - scChannel.getStorageContainerId(), - KvUtils.newDeleteRequest(lKey, option) - .setHeader(newRoutingHeader(pKey))), - response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory), - scChannel, - executor + KvUtils.newKvDeleteRequest( + scChannel.getStorageContainerId(), + KvUtils.newDeleteRequest(lKey, option) + .setHeader(newRoutingHeader(pKey))), + response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory), + scChannel, + executor ).process().whenComplete((ignored, cause) -> { pKey.release(); lKey.release(); @@ -146,8 +155,7 @@ public CompletableFuture> delete(ByteBuf pKey, @Override public Txn txn(ByteBuf pKey) { - // TODO: - return null; + return new TxnImpl(pKey); } @Override @@ -159,4 +167,73 @@ public void close() { public OpFactory opFactory() { return opFactory; } + + // + // Txn Implementation + // + + class TxnImpl implements Txn { + + private final ByteBuf pKey; + private final TxnRequest.Builder txnBuilder; + private final List resourcesToRelease; + + TxnImpl(ByteBuf pKey) { + this.pKey = pKey.retain(); + this.txnBuilder = TxnRequest.newBuilder(); + this.resourcesToRelease = Lists.newArrayList(); + } + + @Override + public Txn If(CompareOp... cmps) { + for (CompareOp cmp : cmps) { + txnBuilder.addCompare(toProtoCompare(cmp)); + resourcesToRelease.add(cmp); + } + return this; + } + + @Override + public Txn Then(Op... ops) { + for (Op op : ops) { + txnBuilder.addSuccess(toProtoRequest(op)); + resourcesToRelease.add(op); + } + return this; + } + + @Override + public Txn Else(Op... ops) { + for (Op op : ops) { + txnBuilder.addFailure(toProtoRequest(op)); + resourcesToRelease.add(op); + } + return this; + } + + @Override + public CompletableFuture> commit() { + return TableRequestProcessor.of( + KvUtils.newKvTxnRequest( + scChannel.getStorageContainerId(), + txnBuilder.setHeader(newRoutingHeader(pKey))), + response -> KvUtils.newKvTxnResult(response.getKvTxnResp(), resultFactory, kvFactory), + scChannel, + executor + ).process().whenComplete((ignored, cause) -> { + pKey.release(); + for (AutoCloseable resource : resourcesToRelease) { + closeResource(resource); + } + }); + } + + private void closeResource(AutoCloseable resource) { + try { + resource.close(); + } catch (Exception e) { + log.warn("Fail to close resource {}", resource, e); + } + } + } } diff --git a/clients/java/kv/src/test/java/org/apache/distributedlog/clients/impl/kv/TestPByteBufTableImpl.java b/clients/java/kv/src/test/java/org/apache/distributedlog/clients/impl/kv/TestPByteBufTableImpl.java index 61614cbdb55..ef046a5736b 100644 --- a/clients/java/kv/src/test/java/org/apache/distributedlog/clients/impl/kv/TestPByteBufTableImpl.java +++ b/clients/java/kv/src/test/java/org/apache/distributedlog/clients/impl/kv/TestPByteBufTableImpl.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.common.util.Bytes; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.distributedlog.api.kv.PTable; +import org.apache.distributedlog.api.kv.Txn; import org.apache.distributedlog.api.kv.options.DeleteOption; import org.apache.distributedlog.api.kv.options.OptionFactory; import org.apache.distributedlog.api.kv.options.PutOption; @@ -217,5 +218,13 @@ public void testBasicOperations() throws Exception { .delete(eq(pkey), eq(lkey), eq(option)); } } + + // test txn + for (RangeProperties rangeProps : streamRanges1.getRanges().values()) { + ByteBuf pkey = Unpooled.wrappedBuffer(Bytes.toBytes(rangeProps.getRangeId())); + Txn txn = table.txn(pkey); + verify(tableRanges.get(rangeProps.getRangeId()), times(1)) + .txn(eq(pkey)); + } } } diff --git a/server/src/main/java/org/apache/distributedlog/stream/server/grpc/GrpcTableService.java b/server/src/main/java/org/apache/distributedlog/stream/server/grpc/GrpcTableService.java index d3b219fc82c..a76de21acef 100644 --- a/server/src/main/java/org/apache/distributedlog/stream/server/grpc/GrpcTableService.java +++ b/server/src/main/java/org/apache/distributedlog/stream/server/grpc/GrpcTableService.java @@ -59,4 +59,9 @@ public void delete(StorageContainerRequest request, StorageContainerResponseHandler.of(responseObserver)); } + @Override + public void txn(StorageContainerRequest request, StreamObserver responseObserver) { + rangeStore.txn(request).whenComplete( + StorageContainerResponseHandler.of(responseObserver)); + } } diff --git a/tests/integration/src/test/java/org/apache/distributedlog/stream/tests/integration/TableClientTest.java b/tests/integration/src/test/java/org/apache/distributedlog/stream/tests/integration/TableClientTest.java index c09b8e450a9..1312dd24983 100644 --- a/tests/integration/src/test/java/org/apache/distributedlog/stream/tests/integration/TableClientTest.java +++ b/tests/integration/src/test/java/org/apache/distributedlog/stream/tests/integration/TableClientTest.java @@ -20,17 +20,23 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.distributedlog.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.distributedlog.api.StorageClient; import org.apache.distributedlog.api.kv.PTable; +import org.apache.distributedlog.api.kv.Txn; +import org.apache.distributedlog.api.kv.op.CompareResult; +import org.apache.distributedlog.api.kv.op.OpType; import org.apache.distributedlog.api.kv.options.DeleteOption; import org.apache.distributedlog.api.kv.options.DeleteOptionBuilder; import org.apache.distributedlog.api.kv.options.OptionFactory; @@ -42,6 +48,8 @@ import org.apache.distributedlog.api.kv.result.KeyValue; import org.apache.distributedlog.api.kv.result.PutResult; import org.apache.distributedlog.api.kv.result.RangeResult; +import org.apache.distributedlog.api.kv.result.Result; +import org.apache.distributedlog.api.kv.result.TxnResult; import org.apache.distributedlog.clients.StorageClientBuilder; import org.apache.distributedlog.clients.admin.StorageAdminClient; import org.apache.distributedlog.clients.config.StorageClientSettings; @@ -264,5 +272,60 @@ public void testTableAPI() throws Exception { } } } + + // test txn + byte[] lTxnKey = "txn-key".getBytes(UTF_8); + ByteBuf lTxnKeyBuf = Unpooled.wrappedBuffer(lTxnKey); + byte[] txnValue = "txn-value".getBytes(UTF_8); + ByteBuf txnValueBuf = Unpooled.wrappedBuffer(txnValue); + Txn txn = table.txn(lTxnKeyBuf); + + CompletableFuture> commitFuture = txn + .If( + table.opFactory().compareValue(CompareResult.EQUAL, lTxnKeyBuf, Unpooled.wrappedBuffer(new byte[0])) + ) + .Then( + table.opFactory().newPut( + lTxnKeyBuf, txnValueBuf, table.opFactory().optionFactory().newPutOption().build())) + .commit(); + try (TxnResult txnResult = FutureUtils.result(commitFuture)) { + assertTrue(txnResult.isSuccess()); + assertEquals(1, txnResult.results().size()); + Result opResult = txnResult.results().get(0); + assertEquals(OpType.PUT, opResult.type()); + } + + // get key + try (RangeOptionBuilder optionBuilder = optionFactory.newRangeOption()) { + try (RangeOption option = optionBuilder.build()) { + try (RangeResult getResult = FutureUtils.result(table.get( + lTxnKeyBuf, + lTxnKeyBuf, + option + ))) { + assertEquals(1, getResult.count()); + assertEquals(1, getResult.kvs().size()); + KeyValue kv = getResult.kvs().get(0); + assertEquals("txn-key", new String(ByteBufUtil.getBytes(kv.key()), UTF_8)); + assertEquals("txn-value", new String(ByteBufUtil.getBytes(kv.value()), UTF_8)); + } + } + } + + txn = table.txn(lTxnKeyBuf); + // txn failure + commitFuture = txn + .If( + table.opFactory().compareValue(CompareResult.EQUAL, lTxnKeyBuf, Unpooled.wrappedBuffer(new byte[0])) + ) + .Then( + table.opFactory().newPut( + lTxnKeyBuf, valBuf, table.opFactory().optionFactory().newPutOption().build())) + .commit(); + try (TxnResult txnResult = FutureUtils.result(commitFuture)) { + assertFalse(txnResult.isSuccess()); + assertEquals(0, txnResult.results().size()); + } + } }