Skip to content

Commit

Permalink
Enable txn on both server side and client side (YahooArchive#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie authored and zhaijack committed Jan 24, 2018
1 parent ef6f871 commit 7464dcc
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* Txn result.
*/
public interface TxnResult<K, V> {
public interface TxnResult<K, V> extends Result<K, V> {

boolean isSuccess();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,42 @@
import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_DELETE;
import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_PUT;
import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_RANGE;
import static org.apache.distributedlog.stream.proto.storage.StorageContainerRequest.Type.KV_TXN;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.List;
import org.apache.distributedlog.api.kv.op.CompareOp;
import org.apache.distributedlog.api.kv.op.DeleteOp;
import org.apache.distributedlog.api.kv.op.Op;
import org.apache.distributedlog.api.kv.op.PutOp;
import org.apache.distributedlog.api.kv.op.RangeOp;
import org.apache.distributedlog.api.kv.options.DeleteOption;
import org.apache.distributedlog.api.kv.options.PutOption;
import org.apache.distributedlog.api.kv.options.RangeOption;
import org.apache.distributedlog.api.kv.result.DeleteResult;
import org.apache.distributedlog.api.kv.result.PutResult;
import org.apache.distributedlog.api.kv.result.RangeResult;
import org.apache.distributedlog.api.kv.result.TxnResult;
import org.apache.distributedlog.clients.impl.kv.result.PutResultImpl;
import org.apache.distributedlog.clients.impl.kv.result.ResultFactory;
import org.apache.distributedlog.clients.impl.kv.result.TxnResultImpl;
import org.apache.distributedlog.stream.proto.kv.KeyValue;
import org.apache.distributedlog.stream.proto.kv.rpc.Compare;
import org.apache.distributedlog.stream.proto.kv.rpc.Compare.CompareResult;
import org.apache.distributedlog.stream.proto.kv.rpc.Compare.CompareTarget;
import org.apache.distributedlog.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.distributedlog.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.distributedlog.stream.proto.kv.rpc.PutRequest;
import org.apache.distributedlog.stream.proto.kv.rpc.PutResponse;
import org.apache.distributedlog.stream.proto.kv.rpc.RangeRequest;
import org.apache.distributedlog.stream.proto.kv.rpc.RangeResponse;
import org.apache.distributedlog.stream.proto.kv.rpc.RequestOp;
import org.apache.distributedlog.stream.proto.kv.rpc.TxnRequest;
import org.apache.distributedlog.stream.proto.kv.rpc.TxnResponse;
import org.apache.distributedlog.stream.proto.storage.StorageContainerRequest;

/**
Expand Down Expand Up @@ -160,4 +174,145 @@ public static DeleteResult<ByteBuf, ByteBuf> newDeleteResult(
.prevKvs(fromProtoKeyValues(response.getPrevKvsList(), kvFactory));
}

public static CompareTarget toProtoTarget(org.apache.distributedlog.api.kv.op.CompareTarget target) {
switch (target) {
case MOD:
return CompareTarget.MOD;
case VALUE:
return CompareTarget.VALUE;
case CREATE:
return CompareTarget.CREATE;
case VERSION:
return CompareTarget.VERSION;
default:
return CompareTarget.UNRECOGNIZED;
}
}

public static CompareResult toProtoResult(org.apache.distributedlog.api.kv.op.CompareResult result) {
switch (result) {
case LESS:
return CompareResult.LESS;
case EQUAL:
return CompareResult.EQUAL;
case GREATER:
return CompareResult.GREATER;
case NOT_EQUAL:
return CompareResult.NOT_EQUAL;
default:
return CompareResult.UNRECOGNIZED;
}
}

public static Compare.Builder toProtoCompare(CompareOp<ByteBuf, ByteBuf> cmp) {
Compare.Builder builder = Compare.newBuilder()
.setTarget(toProtoTarget(cmp.target()))
.setResult(toProtoResult(cmp.result()))
.setKey(toProtoKey(cmp.key()));
switch (cmp.target()) {
case VERSION:
builder.setVersion(cmp.revision());
break;
case MOD:
builder.setModRevision(cmp.revision());
break;
case CREATE:
builder.setCreateRevision(cmp.revision());
break;
case VALUE:
builder.setValue(toProtoKey(cmp.value()));
break;
default:
break;
}
return builder;
}

public static PutRequest.Builder toProtoPutRequest(PutOp<ByteBuf, ByteBuf> op) {
return PutRequest.newBuilder()
.setPrevKv(op.option().prevKv())
.setKey(toProtoKey(op.key()))
.setValue(toProtoKey(op.value()));
}

public static DeleteRangeRequest.Builder toProtoDeleteRequest(DeleteOp<ByteBuf, ByteBuf> op) {
DeleteRangeRequest.Builder builder = DeleteRangeRequest.newBuilder()
.setKey(toProtoKey(op.key()))
.setPrevKv(op.option().prevKv());
if (null != op.option().endKey()) {
builder.setRangeEnd(toProtoKey(op.option().endKey()));
}
return builder;
}

public static RangeRequest.Builder toProtoRangeRequest(RangeOp<ByteBuf, ByteBuf> op) {
RangeRequest.Builder builder = RangeRequest.newBuilder()
.setKey(toProtoKey(op.key()))
.setCountOnly(op.option().countOnly())
.setKeysOnly(op.option().keysOnly())
.setLimit(op.option().limit());
if (null != op.option().endKey()) {
builder.setRangeEnd(toProtoKey(op.option().endKey()));
}
return builder;
}

public static RequestOp.Builder toProtoRequest(Op<ByteBuf, ByteBuf> op) {
RequestOp.Builder reqBuilder = RequestOp.newBuilder();
switch (op.type()) {
case DELETE:
reqBuilder.setRequestDeleteRange(toProtoDeleteRequest((DeleteOp<ByteBuf, ByteBuf>) op));
break;
case RANGE:
reqBuilder.setRequestRange(toProtoRangeRequest((RangeOp<ByteBuf, ByteBuf>) op));
break;
case PUT:
reqBuilder.setRequestPut(toProtoPutRequest((PutOp<ByteBuf, ByteBuf>) op));
break;
default:
throw new IllegalArgumentException("Type '" + op.type() + "' is not supported in a txn yet.");
}
return reqBuilder;
}

public static StorageContainerRequest newKvTxnRequest(
long scId,
TxnRequest.Builder txnReq) {
return StorageContainerRequest.newBuilder()
.setScId(scId)
.setType(KV_TXN)
.setKvTxnReq(txnReq)
.build();
}

public static TxnResult<ByteBuf, ByteBuf> newKvTxnResult(
TxnResponse txnResponse,
ResultFactory<ByteBuf, ByteBuf> resultFactory,
KeyValueFactory<ByteBuf, ByteBuf> kvFactory) {
TxnResultImpl<ByteBuf, ByteBuf> result = resultFactory.newTxnResult();
result.isSuccess(txnResponse.getSucceeded());
result.results(Lists.transform(txnResponse.getResponsesList(), op -> {
switch (op.getResponseCase()) {
case RESPONSE_PUT:
return newPutResult(
op.getResponsePut(),
resultFactory,
kvFactory);
case RESPONSE_RANGE:
return newRangeResult(
op.getResponseRange(),
resultFactory,
kvFactory);
case RESPONSE_DELETE_RANGE:
return newDeleteResult(
op.getResponseDeleteRange(),
resultFactory,
kvFactory);
default:
throw new IllegalArgumentException("Unknown response type '" + op.getResponseCase() + "'");
}
}));
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
import org.apache.bookkeeper.common.router.ByteBufHashRouter;
import org.apache.distributedlog.api.kv.PTable;
import org.apache.distributedlog.api.kv.Txn;
import org.apache.distributedlog.api.kv.op.CompareOp;
import org.apache.distributedlog.api.kv.op.Op;
import org.apache.distributedlog.api.kv.op.OpFactory;
import org.apache.distributedlog.api.kv.options.DeleteOption;
import org.apache.distributedlog.api.kv.options.PutOption;
import org.apache.distributedlog.api.kv.options.RangeOption;
import org.apache.distributedlog.api.kv.result.DeleteResult;
import org.apache.distributedlog.api.kv.result.PutResult;
import org.apache.distributedlog.api.kv.result.RangeResult;
import org.apache.distributedlog.api.kv.result.TxnResult;
import org.apache.distributedlog.clients.impl.internal.api.HashStreamRanges;
import org.apache.distributedlog.clients.impl.internal.api.StorageServerClientManager;
import org.apache.distributedlog.clients.impl.kv.op.OpFactoryImpl;
Expand All @@ -51,17 +54,40 @@
@Slf4j
public class PByteBufTableImpl implements PTable<ByteBuf, ByteBuf> {

static final IllegalStateException CAUSE =
new IllegalStateException("No range found for a given routing key");

private static class FailRequestTxn implements Txn<ByteBuf, ByteBuf> {

private static class FailRequestKeyValueSpace implements PTable<ByteBuf, ByteBuf> {
@Override
public Txn<ByteBuf, ByteBuf> If(CompareOp<ByteBuf, ByteBuf>... cmps) {
return this;
}

private static final IllegalStateException CAUSE =
new IllegalStateException("No range found for a given routing key");
@Override
public Txn<ByteBuf, ByteBuf> Then(Op<ByteBuf, ByteBuf>... ops) {
return this;
}

@Override
public Txn<ByteBuf, ByteBuf> Else(Op<ByteBuf, ByteBuf>... ops) {
return this;
}

@Override
public CompletableFuture<TxnResult<ByteBuf, ByteBuf>> commit() {
return FutureUtils.exception(CAUSE);
}
}

static class FailRequestKeyValueSpace implements PTable<ByteBuf, ByteBuf> {

private final OpFactory<ByteBuf, ByteBuf> opFactory;
private final FailRequestTxn txn;

private FailRequestKeyValueSpace(OpFactory<ByteBuf, ByteBuf> opFactory) {
this.opFactory = opFactory;
this.txn = new FailRequestTxn();
}

@Override
Expand All @@ -88,8 +114,7 @@ public CompletableFuture<DeleteResult<ByteBuf, ByteBuf>> delete(ByteBuf pKey,

@Override
public Txn<ByteBuf, ByteBuf> txn(ByteBuf pKey) {
// ToDO:
return null;
return txn;
}

@Override
Expand Down Expand Up @@ -250,8 +275,8 @@ public CompletableFuture<DeleteResult<ByteBuf, ByteBuf>> delete(ByteBuf pKey,

@Override
public Txn<ByteBuf, ByteBuf> txn(ByteBuf pKey) {
// TODO:
return null;
Long range = rangeRouter.getRange(pKey);
return getTableRange(range).txn(pKey);
}

@Override
Expand Down
Loading

0 comments on commit 7464dcc

Please sign in to comment.