diff --git a/com.creditease.uav.cache.redis/pom.xml b/com.creditease.uav.cache.redis/pom.xml index 02e4532d..67c6b05c 100644 --- a/com.creditease.uav.cache.redis/pom.xml +++ b/com.creditease.uav.cache.redis/pom.xml @@ -37,6 +37,42 @@ commons-logging commons-logging 1.2 - + + + io.netty + netty-all + 4.1.11.Final + + + io.lettuce + lettuce-core + 5.0.5.RELEASE + + + io.netty + netty-common + + + io.netty + netty-transport + + + io.netty + netty-buffer + + + io.netty + netty-resolver + + + io.netty + netty-handler + + + io.netty + netty-codec + + + \ No newline at end of file diff --git a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java index 828d422a..f073b02e 100644 --- a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java +++ b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java @@ -430,10 +430,20 @@ public static void build(String cacheServerAddress, int minConcurrent, int maxCo private CacheService service; private L1Cache l1cache; - + + private static CacheClientMode clientMode; + protected CacheManager(String cacheServerAddress, int minConcurrent, int maxConcurrent, int queueSize, String password) { - service = CacheFactory.instance().createCacheService(CacheClientMode.AREDIS, cacheServerAddress, minConcurrent, + + if (cacheServerAddress.contains(",")) { + clientMode = CacheClientMode.LETTUCE; + } + else { + clientMode = CacheClientMode.AREDIS; + } + + service = CacheFactory.instance().createCacheService(clientMode, cacheServerAddress, minConcurrent, maxConcurrent, queueSize, password); l1cache = new L1Cache(); } @@ -1239,7 +1249,8 @@ public void putHash(String region, String key, Map fieldValues, * @param fieldNames * @return */ - public Map getHash(String region, String key, String... fieldNames) { + @SuppressWarnings("unchecked") + public Map getHash(String region, String key, String... fieldNames) { if (null == fieldNames) { Collections.emptyMap(); @@ -1253,8 +1264,13 @@ public Map getHash(String region, String key, String... fieldNam if (null != results && results.length > 0) { - Map value = genHMGetResults(fieldNames, results); - + Map value = new HashMap(); + if (clientMode.equals(CacheClientMode.LETTUCE) && results[0] instanceof Map) { + value = (Map) results[0]; + } + else { + value = genHMGetResults(fieldNames, results); + } return value; } @@ -1321,8 +1337,13 @@ public Map getHashAll(String region, String key) { if (null != results && results.length > 0) { - Map value = genHMGetResults(null, results); - + Map value = new HashMap(); + if (clientMode.equals(CacheClientMode.LETTUCE) && results[0] instanceof Map) { + value = (Map) results[0]; + } + else { + value = genHMGetResults(null, results); + } this.l1cache.put(rkey, value); return value; @@ -1362,7 +1383,14 @@ public void getHashAll(String region, String key, AsyncCacheCallback 0) { - Map mresult = genHMGetResults(null, result); + Map mresult = new HashMap(); + + if (clientMode.equals(CacheClientMode.LETTUCE) && result[0] instanceof Map) { + mresult = (Map) result[0]; + } + else { + mresult = genHMGetResults(null, result); + } /** * get hash all是可以放L1Cache @@ -1403,12 +1431,18 @@ public void getHash(String region, String key, AsyncCacheCallback() { - @Override + @SuppressWarnings("unchecked") + @Override public void process(CommandInfo[] command, Object[] result, Throwable throwable) { - if (fcallback != null && null != result && result.length > 0) { - Map mresult = genHMGetResults(ffieldname, result); - + if (fcallback != null && null != result && result.length > 0) { + Map mresult = new HashMap(); + if (clientMode.equals(CacheClientMode.LETTUCE) && result[0] instanceof Map) { + mresult = (Map) result[0]; + } + else { + mresult = genHMGetResults(ffieldname, result); + } fcallback.onResult(mresult); } } @@ -1795,7 +1829,7 @@ public List lrange(String region, String key, int start, int end) { Object[] results = service.submitCommands( new CommandInfo(CommandInfo.RedisCommand.LRANGE, rkey, String.valueOf(start), String.valueOf(end))); - if (null != results && results.length > 0) { + if (null != results && results.length > 0 && results[0] != null) { Object[] objs = (Object[]) results[0]; diff --git a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/LettuceAsyncService.java b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/LettuceAsyncService.java new file mode 100644 index 00000000..daa604b2 --- /dev/null +++ b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/LettuceAsyncService.java @@ -0,0 +1,644 @@ +/*- + * << + * UAVStack + * == + * Copyright (C) 2016 - 2018 UAVStack + * == + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * >> + */ + +package com.creditease.uav.cache.redis; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.creditease.agent.helpers.StringHelper; +import com.creditease.agent.log.SystemLogger; +import com.creditease.agent.log.api.ISystemLogger; +import com.creditease.uav.cache.redis.api.AbstractAsyncHandler; +import com.creditease.uav.cache.redis.api.CacheService; +import com.creditease.uav.cache.redis.api.CommandInfo; + +import io.lettuce.core.KeyValue; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; + +/** + * LettuceAsyncService description: lettuce实现类 + * + * @param + * + */ +public class LettuceAsyncService implements CacheService { + + static ISystemLogger logger = SystemLogger.getLogger(LettuceAsyncService.class); + private static StatefulRedisClusterConnection connect; + private static RedisClusterClient client; + private static long expireTimeLong = 10; + + public enum EnumMethod { + HKEYS() { + + @Override + public Object send(String[] params) { + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture> result = commandAsync.hkeys(params[0]); + List res = null; + try { + res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + + HGETALL() { + + @Override + public Object send(String[] params) { + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture> result = commandAsync.hgetall(params[0]); + try { + Map res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + HDEL() { + + @Override + public Object send(String[] params) { + // 异步不要求返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.hdel(params[0], + Arrays.copyOfRange(params, 1, params.length)); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + + } + }, + RPUSH() { + + @Override + public Object send(String[] params) { + // 异步不要求返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture results = commandAsync.rpush(params[0], new String[] { params[1] }); + lcr.setResult(results); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + RPOP() { + + @Override + public Object send(String[] params) { + // 无论同步异步都要求返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.rpop(params[0]); + try { + String res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LSET() { + + @Override + public Object send(String[] params) { + // 异步不要求返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.lset(params[0], Long.parseLong(params[1]), params[2]); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LREM() { + + @Override + public Object send(String[] params) { + // 同步异步都要求返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + try { + RedisFuture result = commandAsync.lrem(params[0], Long.parseLong(params[1]), params[2]); + long res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult((int) res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setResult(0); + lcr.setRunState(false); + } + return lcr; + } + }, + LRANGE() { + + @Override + public Object send(String[] params) { + // 同步异步均需要返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + RedisFuture> result = commandAsync.lrange(params[0], Long.parseLong(params[1]), + Long.parseLong(params[2])); + + try { + List res = result.get(expireTimeLong, TimeUnit.SECONDS); + + if (res == null || res.isEmpty()) { + lcr.setResult(null); + } + else { + String[] resultArray = new String[res.size()]; + for (int i = 0; i < res.size(); i++) { + resultArray[i] = res.get(i).toString(); + } + lcr.setResult(resultArray); + } + + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setResult(Collections.emptyList()); + lcr.setRunState(false); + } + return lcr; + } + }, + LPUSH() { + + @Override + public Object send(String[] params) { + // 只有异步操作 不需要返回值 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.lpush(params[0], new String[] { params[1] }); + lcr.setResult(result); + lcr.setRunState(true); + + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LPOP() { + + @Override + public Object send(String[] params) { + // 同步异步均需返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture results = commandAsync.lpop(params[0]); + try { + String res = results.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LLEN() { + + @Override + public Object send(String[] params) { + // 同步异步均需返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + try { + RedisFuture result = commandAsync.llen(params[0]); + long res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult((int) res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setResult(0); + lcr.setRunState(false); + } + return lcr; + } + }, + LINDEX() { + + @Override + public Object send(String[] params) { + // 同步异步均需返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.lindex(params[0], Long.parseLong(params[1])); + String res = ""; + try { + res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + } + }, + INCR() { + + @Override + public Object send(String[] params) { + // 只有同步操作 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.incr(params[0]); + String res = ""; + try { + res = String.valueOf(result.get(expireTimeLong, TimeUnit.SECONDS)); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + } + }, + + DECR() { + + @Override + public Object send(String[] params) { + // 只有同步操作 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.decr(params[0]); + String res = ""; + try { + res = String.valueOf(result.get(expireTimeLong, TimeUnit.SECONDS)); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + + } + }, + HMSET() { + + @Override + public Object send(String[] params) { + // 只有异步 + Map mapValues = new HashMap(); + for (int i = 1; i < params.length; i++) { + mapValues.put(params[i].toString(), params[i + 1].toString()); + i++; + } + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commands = connect.async(); + + try { + RedisFuture result = commands.hmset(params[0], mapValues); + String res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(result); + lcr.setRunState(res.equals("OK")); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + HMGET() { + + @Override + public Object send(String[] params) { + // 同步与异步均需要返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture>> result = commandAsync.hmget(params[0], + Arrays.copyOfRange(params, 1, params.length)); + Map results = new HashMap(); + List> resultMap = null; + try { + resultMap = result.get(expireTimeLong, TimeUnit.SECONDS); + } + catch (Exception e) { + lcr.setRunState(false); + return lcr; + } + + for (int i = 0; i < resultMap.size(); i++) { + if (resultMap.get(i).hasValue() == true) { + results.put(resultMap.get(i).getKey(), resultMap.get(i).getValue()); + } + } + lcr.setResult(results); + lcr.setRunState(true); + return lcr; + } + + }, + EXPIRE() { + + @Override + public Object send(String[] params) { + // 只有异步不需要返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + try { + + RedisFuture result = commandAsync.expire(params[0], Long.parseLong(params[1])); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + + return lcr; + } + }, + DEL() { + + @Override + public Object send(String[] params) { + // 只有异步不需要返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.del(params[0]); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + + return lcr; + } + }, + GET() { + + @Override + public Object send(String[] params) { + // 同步异步都需要返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commands = connect.async(); + RedisFuture resultSync = commands.get(params[0]); + String result = null; + try { + result = resultSync.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + + return lcr; + } + }, + EXISTS() { + + @Override + public Object send(String[] params) { + // 只有同步 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.exists(params[0]); + String res = ""; + try { + res = String.valueOf(result.get(expireTimeLong, TimeUnit.SECONDS)); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + + } + + }, + SET() { + + @Override + public Object send(String[] params) { + // 异步不需要结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.set(params[0], params[1]); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + + }; + + private static Map map = new HashMap(); + + static { + for (EnumMethod legEnum : EnumMethod.values()) { + map.put(legEnum.name(), legEnum); + } + } + + public static EnumMethod getMethod(String symbol) { + return map.get(symbol); + } + + public abstract Object send(String[] params); + + } + + /** + * @param redisServerAddress + * @param minConcurrent + * @param maxConcurrent + * @param queueSize + * @param password + */ + public LettuceAsyncService(String redisServerAddress, int minConcurrent, int maxConcurrent, int queueSize) { + this(redisServerAddress, minConcurrent, maxConcurrent, queueSize, null); + } + + public LettuceAsyncService(String redisServerAddress, int minConcurrent, int maxConcurrent, int queueSize, + String password) { + String[] redisCluster = redisServerAddress.split(","); + List nodes = new ArrayList(); + for (int i = 0; i < redisCluster.length; i++) { + String[] uriArray = redisCluster[i].split(":"); + Integer port = Integer.valueOf(uriArray[1]); + if (!StringHelper.isEmpty(password)) { + nodes.add(RedisURI.Builder.redis(uriArray[0], port).withPassword(password).build()); + } + else { + nodes.add(RedisURI.create(uriArray[0], port)); + } + } + + client = RedisClusterClient.create(nodes); + + connect = client.connect(); + + } + + @Override + public void start() { + // Do nothing but must pass sonar check + + } + + @Override + public void shutdown() { + connect.close(); + client.shutdown(); + } + + static class LettuceCommandResult { + + private Object result; + private boolean runState; + + /** + * @return the result + */ + public Object getResult() { + return result; + } + + /** + * @param result + * the result to set + */ + public void setResult(Object result) { + this.result = result; + } + + /** + * @return the runState + */ + public boolean isRunState() { + return runState; + } + + /** + * @param runState + * the runState to set + */ + public void setRunState(boolean runState) { + this.runState = runState; + } + + } + + @Override + public Object[] submitCommands(CommandInfo... commands) { + Object[] result = new Object[commands.length]; + for (int i = 0; i < commands.length; i++) { + LettuceCommandResult lcr = (LettuceCommandResult) EnumMethod.getMethod(commands[i].getCommand().name()) + .send(commands[i].getParam()); + result[i] = lcr.getResult(); + + } + return result; + } + + @Override + public void submitCommands(AbstractAsyncHandler handler, CommandInfo... commands) { + Object[] infos = new Object[commands.length]; + for (int i = 0; i < commands.length; i++) { + LettuceCommandResult lcr = null; + try { + lcr = (LettuceCommandResult) EnumMethod.getMethod(commands[i].getCommand().name()) + .send(commands[i].getParam()); + infos[i] = lcr.getResult(); + commands[i].setState(lcr.isRunState()); + } + catch (Exception e) { + infos[i] = null; + commands[i].setState(false); + } + + } + handler.process(commands, infos, null); + } + +} diff --git a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java index d55fff52..34d37ffa 100644 --- a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java +++ b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java @@ -21,11 +21,12 @@ package com.creditease.uav.cache.redis.api; import com.creditease.uav.cache.redis.AredisAsyncService; +import com.creditease.uav.cache.redis.LettuceAsyncService; public class CacheFactory { public enum CacheClientMode { - AREDIS + AREDIS, LETTUCE } private static CacheFactory factory = new CacheFactory(); @@ -56,8 +57,12 @@ public CacheService createCacheService(CacheClientMode mode, String redisServerA switch (mode) { case AREDIS: + service = new AredisAsyncService(redisServerAddress, minConcurrent, maxConcurrent, QueueSize, password); + break; + case LETTUCE: default: - service = new AredisAsyncService(redisServerAddress, minConcurrent, maxConcurrent, QueueSize, password); + service = new LettuceAsyncService(redisServerAddress, minConcurrent, maxConcurrent, QueueSize, + password); break; }