Skip to content

Commit

Permalink
feature: support sadd&smemebers&spop protocol (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Apr 9, 2024
1 parent ba16ec6 commit 55bd678
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 42 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# redis2asp
# redispike-proxy
High-performance Aerospike proxy for the Redis protocold

### compatibility
Expand All @@ -10,15 +10,15 @@ Redis 3.x - 7.x

Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main · funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java)

| feature | support | note |
|---------|---------|------|
| String | done | |
| Hash | done | HSETNX only supports the key level, not the column level |
| Scan | | |
| List | | |
| Set | wait | |
| ZSet | wait | |
| keys | done | |
| feature | support | note |
|---------|-----------------------------------------------------------|----------------------------------------------------------|
| String | done | |
| Hash | done | hsetnx only supports the key level, not the column level |
| Scan | | |
| List | | |
| Set | sadd done<br/>spop done<br/>smembers done <br/>other wait | |
| ZSet | wait | |
| keys | done | |

### Performance Test Report
aerospike 3.x 2c4g redispike-proxy 2c4g:
Expand Down
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ coverage:
patch: no
project:
default:
threshold: 1%
threshold: 5%
if_not_found: success
changes: no
precision: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import icu.funkye.redispike.handler.process.impl.HGetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.HSetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.KeysRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SPopRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.CommandRequestProcessor;
import icu.funkye.redispike.handler.process.impl.DelRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SAddRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SMembersRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequest;
import icu.funkye.redispike.protocol.response.BulkResponse;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,6 +66,12 @@ public RedisCommandHandler() {
processorMap.put(hGetAllRequestProcessor.getCmdCode().value(), hGetAllRequestProcessor);
HGetRequestProcessor hGetRequestProcessor = new HGetRequestProcessor();
processorMap.put(hGetRequestProcessor.getCmdCode().value(), hGetRequestProcessor);
SMembersRequestProcessor smembersRequestProcessor = new SMembersRequestProcessor();
processorMap.put(smembersRequestProcessor.getCmdCode().value(), smembersRequestProcessor);
SAddRequestProcessor sAddRequestProcessor = new SAddRequestProcessor();
processorMap.put(sAddRequestProcessor.getCmdCode().value(), sAddRequestProcessor);
SPopRequestProcessor sPopRequestProcessor = new SPopRequestProcessor();
processorMap.put(sPopRequestProcessor.getCmdCode().value(), sPopRequestProcessor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import icu.funkye.redispike.util.IntegerUtils;

public class HSetRequestProcessor extends AbstractRedisRequestProcessor<HSetRequest> {
WritePolicy defaultWritePolicy;

public HSetRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(HSetRequest.class.hashCode()));
this.defaultWritePolicy = client.getWritePolicyDefault();
this.defaultWritePolicy.sendKey = true;
}

@Override
Expand All @@ -44,10 +48,10 @@ public void handle(RemotingContext ctx, HSetRequest request) {
request.getKv().forEach((k, v) -> list.add(new Bin(k, v)));
WritePolicy writePolicy;
if (request.getOperate() != null && request.getOperate() == Operate.NX) {
writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy = new WritePolicy(defaultWritePolicy);
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
} else {
writePolicy = client.getWritePolicyDefault();
writePolicy = defaultWritePolicy;
}
client.put(AeroSpikeClientFactory.eventLoops.next(), new WriteListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.util.ArrayList;
import java.util.List;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.WritePolicy;
import com.alipay.remoting.RemotingContext;

import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.SAddRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class SAddRequestProcessor extends AbstractRedisRequestProcessor<SAddRequest> {
WritePolicy defaultWritePolicy;

public SAddRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(SAddRequest.class.hashCode()));
this.defaultWritePolicy = client.getWritePolicyDefault();
this.defaultWritePolicy.sendKey = true;
}

@Override
public void handle(RemotingContext ctx, SAddRequest request) {
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
List<Bin> list = new ArrayList<>();
for (String field : request.getFields()) {
list.add(new Bin(field, ""));
}
client.put(AeroSpikeClientFactory.eventLoops.next(), new WriteListener() {
@Override
public void onSuccess(Key key) {
request.setResponse(String.valueOf(request.getFields().size()));
ctx.writeAndFlush(request.getResponse());
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
}
}, defaultWritePolicy, key, list.toArray(new Bin[0]));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.listener.RecordListener;
import com.alipay.remoting.RemotingContext;

import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.SMembersRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class SMembersRequestProcessor extends AbstractRedisRequestProcessor<SMembersRequest> {

public SMembersRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(SMembersRequest.class.hashCode()));
}

@Override
public void handle(RemotingContext ctx, SMembersRequest request) {
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
client.get(AeroSpikeClientFactory.eventLoops.next(), new RecordListener() {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(request.getResponse());
return;
}
record.bins.keySet().forEach(request::setResponse);
ctx.writeAndFlush(request.getResponse());
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
}
}, client.getReadPolicyDefault(), key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.util.List;
import java.util.Objects;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Language;
import com.aerospike.client.Value;
import com.aerospike.client.listener.ExecuteListener;
import com.aerospike.client.task.RegisterTask;
import com.alipay.remoting.RemotingContext;

import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.SPopRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class SPopRequestProcessor extends AbstractRedisRequestProcessor<SPopRequest> {
String luaScriptPath = Objects.requireNonNull(this.getClass().getClassLoader().getResource("lua/spop.lua"))
.getPath();

public SPopRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(SPopRequest.class.hashCode()));
RegisterTask task = client.register(null, SPopRequestProcessor.class.getClassLoader(), "lua/spop.lua",
"spop.lua", Language.LUA);
task.waitTillComplete();
}

@Override
public void handle(RemotingContext ctx, SPopRequest request) {
// Call the Lua script
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
client.execute(AeroSpikeClientFactory.eventLoops.next(), new ExecuteListener() {
@Override
public void onSuccess(Key key, Object obj) {
if (obj instanceof String) {
String[] response = ((String) obj).split(",");
for (String s : response) {
request.setResponse(s);
}
}
ctx.writeAndFlush(request.getResponse());
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
}
}, client.getWritePolicyDefault(), key, "spop", "random_delete_bins", request.getCount() == null ? Value.get(1)
: Value.get(request.getCount()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import icu.funkye.redispike.protocol.request.HGetRequest;
import icu.funkye.redispike.protocol.request.HSetRequest;
import icu.funkye.redispike.protocol.request.KeysRequest;
import icu.funkye.redispike.protocol.request.SAddRequest;
import icu.funkye.redispike.protocol.request.SMembersRequest;
import icu.funkye.redispike.protocol.request.SPopRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
Expand Down Expand Up @@ -58,8 +61,8 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {

private RedisRequest<?> convert2RedisRequest(List<String> params) {
String cmd = params.get(0);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("cmd: {}", params);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("cmd: {}", params);
}
switch (cmd) {
case "hdel":
Expand All @@ -85,6 +88,13 @@ private RedisRequest<?> convert2RedisRequest(List<String> params) {
return new HGetRequest(params.get(1), params.size() > 2 ? params.get(2) : null);
case "hgetall":
return new HGetAllRequest(params.get(1));
case "sadd":
return new SAddRequest(params);
case "smembers":
return new SMembersRequest(params.get(1));
case "spop":
params.remove(0);
return new SPopRequest(params.remove(0), params.size() > 0 ? Integer.parseInt(params.get(0)) : null);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ public RedisResponse<String> getResponse() {

@Override
public String toString() {
return "GetRequest{" + "key='" + key + '\'' + ", response=" + response + '}';
return "HGetAllRequest{" + "key='" + key + '\'' + ", response=" + response + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,10 @@ public void setResponse(RedisResponse<String> response) {
public String getKey() {
return key;
}

@Override
public String toString() {
return "HSetRequest{" + "originalCommand='" + originalCommand + '\'' + ", key='" + key + '\'' + ", kv=" + kv
+ ", operate=" + operate + ", response=" + response + '}';
}
}
Loading

0 comments on commit 55bd678

Please sign in to comment.