From 7d9ef6142bcfb9b414337a8567fc9a8864d2b4a5 Mon Sep 17 00:00:00 2001 From: yizzuide Date: Sun, 20 Feb 2022 19:00:08 +0800 Subject: [PATCH] feat(sundial): Support register `HashFunc` with `CachedConsistentHashRing.register()`. [Sundial]Fixed always create 4 replica node with ketama hash. [Sundial]Fixed lose spring transaction when use split schema type. [Sundial]Fixed duplicate generated sharing id within one millisecond. --- .../jupiter/JupiterOnglCompiler.java | 1 + .../milkomeda/sundial/DataSourceFactory.java | 6 ++ .../sundial/DelegatingDataSourceAdvice.java | 2 + .../milkomeda/sundial/EnableSundial.java | 2 +- .../sundial/MultiDataSourceTransaction.java | 32 +++--- .../milkomeda/sundial/ShardingFunction.java | 45 ++++----- .../milkomeda/sundial/ShardingId.java | 15 ++- .../milkomeda/sundial/SundialConfig.java | 2 +- .../milkomeda/sundial/SundialInterceptor.java | 15 ++- .../milkomeda/sundial/SundialTweakConfig.java | 16 ++- .../hash/CachedConsistentHashRing.java | 98 +++++++++++++++++++ .../algorithm/hash/ConsistentHashRing.java | 34 +++++-- .../universe/algorithm/hash/KetamaHash.java | 33 ++----- .../env/CustomPropertySourceLocator.java | 4 +- .../env/SourcesLogApplicationListener.java | 1 + .../milkomeda/wormhole/WormholeEventBus.java | 3 +- .../resources/META-INF/additional.properties | 2 +- .../demo/MilkomedaApplicationListener.java | 6 +- .../demo/sundial/SundialController.java | 6 +- .../demo/sundial/mapper/TOrder2Mapper.java | 3 + .../universe/RequestAstrolabeHandler.java | 2 +- .../milkomeda/demo/wormhole/QuotaService.java | 2 +- .../src/main/resources/application.yml | 12 ++- .../main/resources/config/mybatis-config.xml | 2 +- 24 files changed, 234 insertions(+), 110 deletions(-) create mode 100644 Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/CachedConsistentHashRing.java diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/jupiter/JupiterOnglCompiler.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/jupiter/JupiterOnglCompiler.java index a6b2485b..6b63bb43 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/jupiter/JupiterOnglCompiler.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/jupiter/JupiterOnglCompiler.java @@ -57,6 +57,7 @@ public T compile(String expression, Object root, Class resultType) { return null; } + // 缓存表达式解析抽象树对象 private Object parseExpression(String expression) throws OgnlException { Object node = expressionCache.get(expression); if (node == null) { diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DataSourceFactory.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DataSourceFactory.java index e2b76e49..0a801016 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DataSourceFactory.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DataSourceFactory.java @@ -62,7 +62,9 @@ public class DataSourceFactory implements EnvironmentAware { @PostConstruct public void init() { + // 设置数据源模板前缀 DEFAULT_DATASOURCE_PREFIX = sundialProperties.getConfigPrefix(); + // DataSource匿名属性匹配 ALIASES.addAliases("url", "jdbc-url"); ALIASES.addAliases("username", "user"); } @@ -90,11 +92,14 @@ public DataSource createDataSource() throws Exception { @SuppressWarnings({"rawtypes", "unchecked"}) public DataSource createDataSource(SundialProperties.Datasource dataSourceConf) throws Exception { Map dataSourceProperties = binder.bind(DEFAULT_DATASOURCE_PREFIX, Map.class).get(); + // 新的配置项覆盖spring.datasource原有配置来创建新的DataSource if (dataSourceConf != null) { Map dataSourceConfMap = DataTypeConvertUtil.beanToMap(dataSourceConf); dataSourceProperties.putAll(dataSourceConfMap); } + // 根据数据源类型创建实例 DataSource dataSource = sundialProperties.getDatasourceType().newInstance(); + // 绑定数据源配置值 bind(dataSource, dataSourceProperties); return dataSource; } @@ -102,6 +107,7 @@ public DataSource createDataSource(SundialProperties.Datasource dataSourceConf) @SuppressWarnings("rawtypes") private void bind(DataSource result, Map properties) { ConfigurationPropertySource source = new MapConfigurationPropertySource(properties); + // 创建绑定对象,拷贝并添加匿名key的值 Binder binder = new Binder(source.withAliases(ALIASES)); binder.bind(ConfigurationPropertyName.EMPTY, Bindable.ofInstance(result)); } diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DelegatingDataSourceAdvice.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DelegatingDataSourceAdvice.java index b2fd9960..6fca67a9 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DelegatingDataSourceAdvice.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/DelegatingDataSourceAdvice.java @@ -45,10 +45,12 @@ public class DelegatingDataSourceAdvice implements MethodInterceptor { @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { + // 获取Method对象 // ProxyMethodInvocation pmi = (ProxyMethodInvocation) methodInvocation; // ProceedingJoinPoint pjp = new MethodInvocationProceedingJoinPoint(pmi); // Method method = methodInvocation.getMethod(); try { + // 调用方法前,选择数据源 SundialHolder.setDataSourceType(getKeyName()); return methodInvocation.proceed(); } finally { diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/EnableSundial.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/EnableSundial.java index cdb7e37b..36415440 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/EnableSundial.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/EnableSundial.java @@ -26,7 +26,7 @@ import java.lang.annotation.*; /** - * Enable 模块 + * EnableSundial * @author jsq 786063250@qq.com * @since 3.4.0 * @version 3.7.1 diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/MultiDataSourceTransaction.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/MultiDataSourceTransaction.java index 574b3574..af7dfde8 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/MultiDataSourceTransaction.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/MultiDataSourceTransaction.java @@ -21,6 +21,7 @@ package com.github.yizzuide.milkomeda.sundial; +import com.github.yizzuide.milkomeda.universe.env.Environment; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.transaction.Transaction; import org.springframework.jdbc.CannotGetJdbcConnectionException; @@ -37,31 +38,31 @@ /** * MultiDataSourceTransaction - * 多数据源事务 + * 主从多数据源事务 * * @author yizzuide * @since 3.7.1 + * @version 3.12.10 * Create at 2020/05/31 11:48 */ @Slf4j public class MultiDataSourceTransaction implements Transaction { - + // 数据源 private final DataSource dataSource; - + // 主连接 private Connection mainConnection; - - private final String mainDataSourceType; - - private final ConcurrentMap otherConnectionMap; - + // 连接是否有事务 private boolean isConnectionTransactional; - + // 自动提交 private boolean autoCommit; + // 其它连接 + private final ConcurrentMap otherConnectionMap; + // 只读从库识别key + private static final String readOnlyType = "read-only"; public MultiDataSourceTransaction(DataSource dataSource) { Assert.notNull(dataSource, "No DataSource specified"); this.dataSource = dataSource; - mainDataSourceType = SundialHolder.getDataSourceType(); otherConnectionMap = new ConcurrentHashMap<>(); } @@ -69,7 +70,8 @@ public MultiDataSourceTransaction(DataSource dataSource) { public Connection getConnection() throws SQLException { // 动态的根据DatabaseType获取不同的Connection String dataSourceType = SundialHolder.getDataSourceType(); - if (dataSourceType.equals(mainDataSourceType)) { + // 如果当前为主连接 + if (!dataSourceType.startsWith(readOnlyType)) { if (mainConnection == null) { openMainConnection(); } @@ -77,6 +79,7 @@ public Connection getConnection() throws SQLException { } else { if (!otherConnectionMap.containsKey(dataSourceType)) { try { + // 从连接不支持事务(用于只读) Connection conn = dataSource.getConnection(); otherConnectionMap.put(dataSourceType, conn); return conn; @@ -89,10 +92,11 @@ public Connection getConnection() throws SQLException { } private void openMainConnection() throws SQLException { + // 将主连接交由Spring事务管理 this.mainConnection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.mainConnection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource); - if (log.isDebugEnabled()) { + if (Environment.isShowLog()) { log.debug("Sundial JDBC Connection [" + this.mainConnection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); } @@ -102,7 +106,7 @@ private void openMainConnection() throws SQLException { public void commit() throws SQLException { // 非Spring事务管理的提交处理 if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) { - if (log.isDebugEnabled()) { + if (Environment.isShowLog()) { log.debug("Sundial Committing JDBC Connection [" + this.mainConnection + "]"); } this.mainConnection.commit(); @@ -116,7 +120,7 @@ public void commit() throws SQLException { public void rollback() throws SQLException { // 非Spring事务管理的回滚处理 if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) { - if (log.isDebugEnabled()) { + if (Environment.isShowLog()) { log.debug("Sundial Rolling back JDBC Connection [" + this.mainConnection + "]"); } this.mainConnection.rollback(); diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingFunction.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingFunction.java index 25455eb7..1ad9af2e 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingFunction.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingFunction.java @@ -21,7 +21,7 @@ package com.github.yizzuide.milkomeda.sundial; -import com.github.yizzuide.milkomeda.universe.algorithm.hash.*; +import com.github.yizzuide.milkomeda.universe.algorithm.hash.CachedConsistentHashRing; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; @@ -30,11 +30,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * ShardingFunction @@ -50,8 +46,6 @@ public class ShardingFunction { public static final String BEAN_ID = "ShardingFunction"; - private final Map> cachedConsistentHashMap = new ConcurrentHashMap<>(); - /** * 格式化函数 * @param format 格式字符串 @@ -101,7 +95,7 @@ public long seq(long key, long bitStart, long bitCount) { * @return 目标节点 */ public long fnv(String key, long nodeCount, int replicas) { - return hash(key, nodeCount, replicas, new FNV132Hash()); + return hash("fnv", key, nodeCount, replicas); } /** @@ -112,7 +106,7 @@ public long fnv(String key, long nodeCount, int replicas) { * @return 目标节点 */ public long murmur(String key, long nodeCount, int replicas) { - return hash(key, nodeCount, replicas, new MurmurHash()); + return hash("murmur", key, nodeCount, replicas); } /** @@ -123,25 +117,20 @@ public long murmur(String key, long nodeCount, int replicas) { * @return 目标节点 */ public long ketama(String key, long nodeCount, int replicas) { - return hash(key, nodeCount, replicas, new KetamaHash()); + return hash("ketama", key, nodeCount, replicas); } - private long hash(String key, long nodeCount, int replicas, HashFunc hashFunc) { - String cacheKey = nodeCount + "_" + replicas; - // 缓存哈希环 - ConsistentHashRing consistentHashRing = cachedConsistentHashMap.get(cacheKey); - if (consistentHashRing == null) { - // 生成节点 - List nodes = new ArrayList<>(); - for (long i = 0; i < nodeCount; i++) { - nodes.add(i); - } - // 创建哈希环 - consistentHashRing = new ConsistentHashRing<>(hashFunc, replicas, nodes); - // 添加到缓存 - cachedConsistentHashMap.put(cacheKey, consistentHashRing); - } - return consistentHashRing.get(key); + /** + * 自定义算法一致性哈希实现 + * @param hashName 哈希算法名 + * @param key 拆分键 + * @param nodeCount 节点数 + * @param replicas 每个节点复制的虚拟节点数,推荐设置4的倍数 + * @return 目标节点 + * @since 3.12.10 + */ + public long hash(String hashName, String key, long nodeCount, int replicas) { + return CachedConsistentHashRing.getInstance().lookForHashNode(key, nodeCount, replicas, hashName); } /** @@ -179,9 +168,9 @@ public long roll(long key, long slideWindow, double expandWarnPercent) { assert key >= 0 && slideWindow > 0; if (expandWarnPercent > 0) { // 获取系数 - double factor = (double)key / (double)slideWindow; + BigDecimal factor = BigDecimal.valueOf(key).divide(BigDecimal.valueOf(slideWindow), 2, RoundingMode.DOWN); // 截取小数得当前分配百分比 - double percent = factor - (long)factor; + double percent = factor.subtract(new BigDecimal(factor.longValue())).doubleValue(); if (percent > expandWarnPercent) { log.warn("Roll function of sundial sharding up to percent: {}", percent); } diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingId.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingId.java index 81cf8a91..75ee8460 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingId.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/ShardingId.java @@ -33,6 +33,7 @@ public class ShardingId { // 开始时间 2020-01-01 private static final long EPOCH = 1577808000000L; // 时间位(可用69年,也就是2089-01-01) + // 计算可用年数:BigDecimal.valueOf(Math.pow(2, 41)).divide(BigDecimal.valueOf(31536000000L/*1年的毫秒数*/), RoundingMode.DOWN) public static final long TIME_BITS = 41L; // 机器位(集群8台,满足一定量的并发需求) private final static long WORKER_ID_BITS = 3L; @@ -80,8 +81,8 @@ public static long nextId(long workerId, long businessId, long sharding) { } if (preTime == timestamp) { seqStart = (seqStart + 1) & MAX_SN; - // 如果序列号处于起点,获取比上次更新的时间 - if (seqStart == 0) { + // 如果序列号处于最大,循环到超过1毫秒 + if (seqStart == MAX_SN) { timestamp = nextTime(preTime); } } else { @@ -96,6 +97,16 @@ public static long nextId(long workerId, long businessId, long sharding) { } } + /** + * 从id获取存储的时间戳 + * @param id id值 + * @return 时间戳 + * @since 3.12.10 + */ + public static long extractEpochMill(long id) { + return (id >> (SN_BITS + SHARD_BITS + BUSINESS_ID_BITS + WORKER_ID_BITS)) + EPOCH; + } + private static long currentTime() { return System.currentTimeMillis(); } diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialConfig.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialConfig.java index c951516d..9c5b737b 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialConfig.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialConfig.java @@ -94,7 +94,7 @@ public DataSource dynamicRouteDataSource(DataSourceFactory dataSourceFactory) { if (CollectionUtils.isEmpty(datasourceConfigSet)) { return new DynamicRouteDataSource(defaultDataSource, targetDataSources); } - // 不回其它数据源节点 + // 添加其它数据源节点 for (Map.Entry entry : datasourceConfigSet) { DataSource dataSource = dataSourceFactory.createDataSource(entry.getValue()); targetDataSources.put(entry.getKey(), dataSource); diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialInterceptor.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialInterceptor.java index 0a1978dd..fd6ad0fa 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialInterceptor.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialInterceptor.java @@ -35,6 +35,7 @@ import org.apache.ibatis.session.RowBounds; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import javax.annotation.Resource; @@ -48,7 +49,7 @@ * * @author yizzuide * @since 3.8.0 - * @version 3.9.0 + * @version 3.12.10 * Create at 2020/06/16 11:18 */ @Slf4j @@ -104,10 +105,11 @@ public Object intercept(Invocation invocation) throws Throwable { root.setP(params); root.setFn(shardingFunction); String schema = null; - // 分库、分库分表 + // 包含分库的处理 if (sundial.shardingType() != ShardingType.TABLE && !StringUtils.isEmpty(nodeExp)) { String node = SimpleElParser.parse(nodeExp, root, String.class); SundialProperties.DataNode dataNode = props.getSharding().getNodes().get(node); + // node_001 --转--> node_1 if (dataNode == null) { int separatorIndex = node.lastIndexOf(props.getSharding().getIndexSeparator()); String index = node.substring(separatorIndex + 1); @@ -117,9 +119,12 @@ public Object intercept(Invocation invocation) throws Throwable { // 是否使用主连接 if (sundial.key().equals(DynamicRouteDataSource.MASTER_KEY)) { SundialHolder.setDataSourceType(dataNode.getLeader()); + } else if(sundial.key().equals("follows")) { // 从库任选 + if (!CollectionUtils.isEmpty(dataNode.getFollows())) { + SundialHolder.setDataSourceType(dataNode.getFollows().stream().findAny().orElse(DynamicRouteDataSource.MASTER_KEY)); + } } else { - // 暂时使用第一个从连接 - SundialHolder.setDataSourceType(dataNode.getFollows().get(0)); + SundialHolder.setDataSourceType(sundial.key()); } // 需要添加的数据库名 if (!StringUtils.isEmpty(dataNode.getSchema())) { @@ -156,7 +161,7 @@ public Object intercept(Invocation invocation) throws Throwable { sql = sql.replace(tableName, schema == null ? part : schema + "." + part); updateSql(sql, invocation, ms, args, boundSql); } - return invocation.proceed(); + return invocation.proceed(); } @Override diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialTweakConfig.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialTweakConfig.java index 46bd6733..7956f224 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialTweakConfig.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/sundial/SundialTweakConfig.java @@ -21,22 +21,20 @@ package com.github.yizzuide.milkomeda.sundial; -import com.github.yizzuide.milkomeda.util.ReflectUtil; -import org.apache.commons.lang3.tuple.Pair; import org.apache.ibatis.mapping.Environment; +import org.apache.ibatis.session.Configuration; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import java.lang.reflect.Field; - /** * SundialTweakConfig * * @author yizzuide * @since 3.7.1 + * @version 3.12.10 * Create at 2020/05/31 14:57 */ @ConditionalOnClass(MybatisAutoConfiguration.class) @@ -45,11 +43,11 @@ public class SundialTweakConfig { @Autowired public void configSqlSessionFactory(SqlSessionFactory sqlSessionFactory) { MultiDataSourceTransactionFactory multiDataSourceTransactionFactory = new MultiDataSourceTransactionFactory(); - Pair envFieldBundle = ReflectUtil.getFieldBundlePath(sqlSessionFactory, "configuration.environment"); - Environment originEnvironment = (Environment) envFieldBundle.getValue(); + Configuration configuration = sqlSessionFactory.getConfiguration(); + // 以前使用反射路径方式(现在已有公开的API可以设置) + // Pair envFieldBundle = ReflectUtil.getFieldBundlePath(sqlSessionFactory, "configuration.environment"); + Environment originEnvironment = configuration.getEnvironment(); Environment environment = new Environment(originEnvironment.getId(), multiDataSourceTransactionFactory, originEnvironment.getDataSource()); - Pair confFieldBundle = ReflectUtil.getFieldBundlePath(sqlSessionFactory, "configuration"); - Object configuration = confFieldBundle.getValue(); - ReflectUtil.invokeMethod(configuration, "setEnvironment", new Class[]{Environment.class}, environment); + configuration.setEnvironment(environment); } } diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/CachedConsistentHashRing.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/CachedConsistentHashRing.java new file mode 100644 index 00000000..57cc85fa --- /dev/null +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/CachedConsistentHashRing.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2022 yizzuide All rights Reserved. + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.github.yizzuide.milkomeda.universe.algorithm.hash; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * CachedConsistentHashRing + * 可缓存的哈希环 + * + * @author yizzuide + * @since 3.12.10 + * Create at 2022/02/20 14:47 + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CachedConsistentHashRing { + // 哈希算法 + private final Map hashFuncMap = new HashMap<>(); + // 一致性Hash环 + private final Map> container = new ConcurrentHashMap<>(); + + private static class CachedConsistentHashRingHolder { + static CachedConsistentHashRing cachedConsistentHashRing = new CachedConsistentHashRing(); + } + + public static CachedConsistentHashRing getInstance() { + return CachedConsistentHashRingHolder.cachedConsistentHashRing; + } + + static { + getInstance().register("fnv", new FNV132Hash()) + .register("murmur", new MurmurHash()) + .register("ketama", new KetamaHash()); + } + + /** + * 查找命中的哈希节点 + * @param key 拆分键 + * @param nodeCount 节点数 + * @param replicas 每个节点复制的虚拟节点数 + * @param hashName 哈希算法名,已注册的有:fnv, murmur, ketama + * @return 哈希节点 + */ + public long lookForHashNode(String key, long nodeCount, int replicas, String hashName) { + String cacheKey = nodeCount + "_" + replicas; + // 缓存哈希环 + ConsistentHashRing consistentHashRing = container.get(cacheKey); + if (consistentHashRing == null) { + // 生成节点 + List nodes = new ArrayList<>(); + for (long i = 0; i < nodeCount; i++) { + nodes.add(i); + } + // 创建哈希环 + consistentHashRing = new ConsistentHashRing<>(hashFuncMap.get(hashName), replicas, nodes); + // 添加到缓存 + container.put(cacheKey, consistentHashRing); + } + return consistentHashRing.get(key); + } + + /** + * 注册hash算法实现 + * @param hashName hash算法 + * @param hashFunc hash实现 + * @return CachedConsistentHashRing + */ + public CachedConsistentHashRing register(String hashName, HashFunc hashFunc) { + hashFuncMap.put(hashName, hashFunc); + return this; + } +} diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/ConsistentHashRing.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/ConsistentHashRing.java index c4168246..d339fefb 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/ConsistentHashRing.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/ConsistentHashRing.java @@ -31,10 +31,11 @@ * * @author yizzuide * @since 3.8.0 + * @version 3.12.10 * Create at 2020/06/18 14:43 */ public class ConsistentHashRing { - // 虚拟节点个数 + // 默认虚拟节点个数 private final static int V_NODE_SIZE = 4; // 虚拟节点后缀 private final static String V_NODE_SUFFIX = "#"; @@ -67,40 +68,44 @@ public ConsistentHashRing(HashFunc hashFunc, int replicas, Collection nodes) } /** - * 根据设置的虚拟节点来拷贝 + * 每添加一个实体节点,生成多个映射的虚拟节点 * @param node 节点 */ public void add(T node) { + // 如果是平衡一致性hash的实现 if (hashFunc instanceof ConsistentHashFunc) { ConsistentHashFunc consistentHashFunc = (ConsistentHashFunc) this.hashFunc; int balanceFactor = consistentHashFunc.balanceFactor(); - int count = V_NODE_SIZE / balanceFactor; + int count = replicas / balanceFactor; for (int i = 0; i < count; i++) { - consistentHashFunc.balanceHash(node.toString() + V_NODE_SUFFIX + i, (hk) -> hashRing.put(hk, node)); + consistentHashFunc.balanceHash(genVirtualKey(node, i), (hk) -> hashRing.put(hk, node)); } return; } + // 其它hash实现 for (int i = 0; i < replicas; i++) { - hashRing.put(hashFunc.hash(node.toString() + V_NODE_SUFFIX + i), node); + hashRing.put(hashFunc.hash(genVirtualKey(node, i)), node); } } /** - * 移除一点节点的所有虚拟节点 + * 移除一个节点的所有虚拟节点 * @param node 节点 */ public void remove(T node) { + // 如果是平衡一致性hash的实现 if (hashFunc instanceof ConsistentHashFunc) { ConsistentHashFunc consistentHashFunc = (ConsistentHashFunc) this.hashFunc; int balanceFactor = consistentHashFunc.balanceFactor(); - int count = V_NODE_SIZE / balanceFactor; + int count = replicas / balanceFactor; for (int i = 0; i < count; i++) { - consistentHashFunc.balanceHash(node.toString() + V_NODE_SUFFIX + i, hashRing::remove); + consistentHashFunc.balanceHash(genVirtualKey(node, i), hashRing::remove); } return; } + // 其它hash实现 for (int i = 0; i < replicas; i++) { - hashRing.remove(hashFunc.hash(node.toString() + V_NODE_SUFFIX + i)); + hashRing.remove(hashFunc.hash(genVirtualKey(node, i))); } } @@ -122,4 +127,15 @@ public T get(Object key) { } return locateEntry.getValue(); } + + /** + * 生成虚拟节点key + * @param node 实体节点 + * @param seq 序号 + * @return 虚拟节点key + * @since 3.12.10 + */ + private String genVirtualKey(Object node, int seq) { + return node.toString() + V_NODE_SUFFIX + seq; + } } diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/KetamaHash.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/KetamaHash.java index 223cb801..50f2310d 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/KetamaHash.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/algorithm/hash/KetamaHash.java @@ -21,12 +21,12 @@ package com.github.yizzuide.milkomeda.universe.algorithm.hash; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; +import org.springframework.util.DigestUtils; + import java.util.function.Consumer; /** - * KetamaHash算法:区别于其它Hash算法,针对高效的一致性哈希算法方式,超高负载平衡性,高不变流量 + * KetamaHash算法:区别于其它Hash算法,针对高效的一致性哈希算法方式,超高负载平衡性,高不变流量,高分散性 * * @author yizzuide * @since 3.8.0 @@ -34,15 +34,7 @@ */ public class KetamaHash implements ConsistentHashFunc { - private static final MessageDigest md5Digest; - - static { - try { - md5Digest = MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("MD5 not supported", e); - } - } + public static final int DEFAULT_BALANCE_FACTOR = 4; @Override public long hash(Object key) { @@ -51,7 +43,7 @@ public long hash(Object key) { @Override public long rawHash(Object key) { - byte[] bKey = computeMd5(key.toString()); + byte[] bKey = DigestUtils.md5Digest(key.toString().getBytes()); return ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8) | @@ -60,12 +52,12 @@ public long rawHash(Object key) { @Override public int balanceFactor() { - return 4; + return DEFAULT_BALANCE_FACTOR; } @Override public void balanceHash(Object key, Consumer generator) { - byte[] digest = computeMd5(key.toString()); + byte[] digest = DigestUtils.md5Digest(key.toString().getBytes()); int factor = balanceFactor(); for (int h = 0; h < factor; h++) { long hk = ((long) (digest[3 + h * factor] & 0xFF) << 24) | @@ -75,15 +67,4 @@ public void balanceHash(Object key, Consumer generator) { generator.accept(hk); } } - - private static byte[] computeMd5(String k) { - MessageDigest md5; - try { - md5 = (MessageDigest) md5Digest.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException("clone of MD5 not supported", e); - } - md5.update(k.getBytes()); - return md5.digest(); - } } diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/CustomPropertySourceLocator.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/CustomPropertySourceLocator.java index 656743ad..697ce968 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/CustomPropertySourceLocator.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/CustomPropertySourceLocator.java @@ -44,8 +44,8 @@ public PropertySource locate(Environment environment) { return new MapPropertySource("milkomeda", mapResource); }*/ - // 添加自定义属性来源方式二:实现EnvironmentPostProcessor接口,把CollectionsPropertySourceLocator注册到spring.factories - // Spring启动时发出ApplicationEnvironmentPreparedEvent事件,通过ConfigFileApplicationListener加载SPI配置的所有排好序的EnvironmentPostProcessor实例 + // 添加自定义属性来源方式二:实现EnvironmentPostProcessor接口,把CustomPropertySourceLocator注册到spring.factories + // Spring启动时发出ApplicationEnvironmentPreparedEvent事件,通过ConfigFileApplicationListener.onApplicationEnvironmentPreparedEvent(...)加载SPI配置的所有排好序的EnvironmentPostProcessor实例 @Override public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { CollectionsPropertySource.addToEnvironment(environment); diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/SourcesLogApplicationListener.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/SourcesLogApplicationListener.java index 644fedad..713a5712 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/SourcesLogApplicationListener.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/universe/env/SourcesLogApplicationListener.java @@ -41,6 +41,7 @@ * @see org.springframework.boot.context.config.AnsiOutputApplicationListener * Create at 2020/04/11 11:56 */ +// ApplicationEnvironmentPreparedEvent:环境配置准备好了,可以查看或修改 @Slf4j public class SourcesLogApplicationListener implements ApplicationListener, Ordered { diff --git a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/wormhole/WormholeEventBus.java b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/wormhole/WormholeEventBus.java index 1b8ba78b..2181d24a 100644 --- a/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/wormhole/WormholeEventBus.java +++ b/Milkomeda/src/main/java/com/github/yizzuide/milkomeda/wormhole/WormholeEventBus.java @@ -35,6 +35,7 @@ /** * WormholeEventBus + * 事件总线 * * @author yizzuide * @since 3.3.0 @@ -63,7 +64,7 @@ public void publish(WormholeEvent event, String action) { * 发布领域事件 * @param event 领域事件 * @param action 动作名 - * @param callback 回调 + * @param callback 通知事件方执行完成回调 * @param 领域事件数据类型 */ public void publish(WormholeEvent event, String action, diff --git a/Milkomeda/src/main/resources/META-INF/additional.properties b/Milkomeda/src/main/resources/META-INF/additional.properties index 8515adbe..88049018 100644 --- a/Milkomeda/src/main/resources/META-INF/additional.properties +++ b/Milkomeda/src/main/resources/META-INF/additional.properties @@ -4,6 +4,6 @@ collections.emptyMap={} # Create empty list with response addition yml config. collections.emptyList=[] -# Calculated value equal +# Calculated value is equality condition.equals= condition.diff= \ No newline at end of file diff --git a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/MilkomedaApplicationListener.java b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/MilkomedaApplicationListener.java index ecce0bd0..84bfb718 100644 --- a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/MilkomedaApplicationListener.java +++ b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/MilkomedaApplicationListener.java @@ -2,6 +2,7 @@ import com.github.yizzuide.milkomeda.ice.IceHolder; import com.github.yizzuide.milkomeda.universe.context.ApplicationContextHolder; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.lang.NonNull; @@ -13,6 +14,7 @@ * @author yizzuide * Create at 2020/04/16 18:24 */ +@Slf4j // Springboot 2.3: WebServerInitializedEvent is now published before the ContextRefreshedEvent @Component public class MilkomedaApplicationListener implements ApplicationListener { @@ -28,8 +30,10 @@ public void onApplicationEvent(@NonNull ContextRefreshedEvent event) { // 激活Dead queue里的job(重试超过次数的job) IceHolder.activeDeadJobs(); - // 调用环境变量,给Spring EL表达 #env 提供数据源 + // 添加自定义环境变量到milkomedaProperties,给Spring EL表达 #env 提供数据源,如:#env.product ApplicationContextHolder.getEnvironment().put("product", "milkomeda"); + // 可以从Spring ConfigurableEnvironment中获取 + log.info("#env - {}", ApplicationContextHolder.getEnvironment().get("product")); // 从json配置加载覆盖yml配置 // String ruleItems = "[{\"match\":\"true\"},{\"domain\":\"t_order\",\"fields\":\"id\",\"filter\":\"user_id={$attr.userInfo.uid}\",\"match\":\"size()==0\",\"syntax\":\"el\"}]"; diff --git a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/SundialController.java b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/SundialController.java index a9d67dfc..201887a9 100644 --- a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/SundialController.java +++ b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/SundialController.java @@ -32,9 +32,9 @@ public Object add(@PathVariable("orderNo") Long orderNo) { TOrder tOrder = new TOrder(); tOrder.setOrderNo(orderNo); tOrder.setCreateTime(new Date()); - tOrder.setProductId(1L); - tOrder.setProductName("测试"); - tOrder.setUserId(122L); + tOrder.setProductId(180L); + tOrder.setProductName("小爱"); + tOrder.setUserId(12L); dataSourceService.insert(tOrder); return tOrder; } diff --git a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/mapper/TOrder2Mapper.java b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/mapper/TOrder2Mapper.java index 567cdd21..57620558 100644 --- a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/mapper/TOrder2Mapper.java +++ b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/sundial/mapper/TOrder2Mapper.java @@ -25,6 +25,9 @@ public interface TOrder2Mapper { // - 自定义序列号截取:seq // - 定制序列号截取: id(需要通过ShardingId类生成) // - 一致性Hash函数:ketama、fnv、murmur + // - 自定义Hash函数:hash + // 1. 调用前注册:CachedConsistentHashRing.getInstance().register("hashName", HashFunc实现); + // 2. 表达式调用:fn.hash("hashName", key, nodeCount, replicas) // @Sundial(shardingType = ShardingType.TABLE, partExp = "fn.format(table + '_%03d', fn.ketama(p.orderNo, 2, 4))") // ShardingType.SCHEMA:仅分库 @Sundial(shardingType = ShardingType.SCHEMA, nodeExp = "fn.format('node_%03d', fn.ketama(p.orderNo, 2, 4))") diff --git a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/universe/RequestAstrolabeHandler.java b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/universe/RequestAstrolabeHandler.java index af8a4d08..8bf7e7b5 100644 --- a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/universe/RequestAstrolabeHandler.java +++ b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/universe/RequestAstrolabeHandler.java @@ -18,7 +18,7 @@ public class RequestAstrolabeHandler implements AstrolabeHandler { @Override public void preHandle(ServletRequest request) { -// log.info("AstrolabeHandler请求前:{}", ((HttpServletRequest)request).getRequestURI()); + // ((HttpServletRequest)request).getRequestURI() log.info("AstrolabeHandler请求前:{}", WebContext.getRequest().getRequestURI()); } } diff --git a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/wormhole/QuotaService.java b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/wormhole/QuotaService.java index a9e8d387..53aa44b8 100644 --- a/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/wormhole/QuotaService.java +++ b/MilkomedaDemo/src/main/java/com/github/yizzuide/milkomeda/demo/wormhole/QuotaService.java @@ -16,8 +16,8 @@ * Create at 2020/05/05 15:39 */ @Slf4j -@WormholeEventHandler @Service +@WormholeEventHandler public class QuotaService { @Async diff --git a/MilkomedaDemo/src/main/resources/application.yml b/MilkomedaDemo/src/main/resources/application.yml index 435f860a..b5256180 100644 --- a/MilkomedaDemo/src/main/resources/application.yml +++ b/MilkomedaDemo/src/main/resources/application.yml @@ -25,7 +25,9 @@ server: spring: application: - name: milkomeda + name: @spring.application.name@ + profiles: + active: @spring.profiles.active@ # springboot 2.3优雅关机等待时间 lifecycle: timeout-per-shutdown-phase: 20s @@ -269,7 +271,7 @@ milkomeda: ice: # 在分布式布署时,需要设置实例名 - instance-name: @spring.application.name@ + instance-name: ${spring.application.name} # 开启Job作业(作为消费端使用时,设置为false) # enable-job-timer: false # 是否用于分布式job作业 @@ -399,7 +401,7 @@ milkomeda: filter: #enable: true # 测试表达式计算 - enable: ${condition.equals(${spring.application.name}, milkomeda)} + enable: ${condition.equals(${spring.application.name}, milkomeda-demo)} filters: - name: ipLimiterFilter @@ -414,8 +416,10 @@ milkomeda: ## 数据源配置 sundial: datasource-type: com.alibaba.druid.pool.DruidDataSource + # 设置原有数据源配置前缀,作为下面instances数据源创建新的DataSource的模板 config-prefix: spring.datasource.druid instances: + # 只读从库必需命名为以read-only为前缀,用于解决主从多数据源事务问题(特别是与分库功能一起使用时的事务管理问题) read-only: username: root password: ${MYSQL_PWD} @@ -426,7 +430,7 @@ milkomeda: url: jdbc:mysql://localhost:3306?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&rewriteBatchedStatements=true&allowMultiQueries=true&allowPublicKeyRetrieval=true # 主从切面策略 strategy: - - key-name: read-only # 这里上面instances里配置的key + - key-name: read-only # 这里用上面instances里配置的key pointcut-expression: execution(* com..mapper.*.query*(..)) # 开启分库分表 diff --git a/MilkomedaDemo/src/main/resources/config/mybatis-config.xml b/MilkomedaDemo/src/main/resources/config/mybatis-config.xml index d138eef5..029eb288 100644 --- a/MilkomedaDemo/src/main/resources/config/mybatis-config.xml +++ b/MilkomedaDemo/src/main/resources/config/mybatis-config.xml @@ -76,7 +76,7 @@ - +