Skip to content

Commit

Permalink
feat(sundial): Support register HashFunc with `CachedConsistentHash…
Browse files Browse the repository at this point in the history
…Ring.register()`.

[Sundial]Fixed always create 4 replica node with ketama hash.
[Sundial]Fixed lose spring transaction when use split schema type.
 [Sundial]Fixed duplicate generated sharing id within one millisecond.
  • Loading branch information
yizzuide committed Feb 20, 2022
1 parent b2e58f9 commit 7d9ef61
Show file tree
Hide file tree
Showing 24 changed files with 234 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public <T> T compile(String expression, Object root, Class<T> resultType) {
return null;
}

// 缓存表达式解析抽象树对象
private Object parseExpression(String expression) throws OgnlException {
Object node = expressionCache.get(expression);
if (node == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public class DataSourceFactory implements EnvironmentAware {

@PostConstruct
public void init() {
// 设置数据源模板前缀
DEFAULT_DATASOURCE_PREFIX = sundialProperties.getConfigPrefix();
// DataSource匿名属性匹配
ALIASES.addAliases("url", "jdbc-url");
ALIASES.addAliases("username", "user");
}
Expand Down Expand Up @@ -90,18 +92,22 @@ public DataSource createDataSource() throws Exception {
@SuppressWarnings({"rawtypes", "unchecked"})
public DataSource createDataSource(SundialProperties.Datasource dataSourceConf) throws Exception {
Map dataSourceProperties = binder.bind(DEFAULT_DATASOURCE_PREFIX, Map.class).get();
// 新的配置项覆盖spring.datasource原有配置来创建新的DataSource
if (dataSourceConf != null) {
Map<String, Object> dataSourceConfMap = DataTypeConvertUtil.beanToMap(dataSourceConf);
dataSourceProperties.putAll(dataSourceConfMap);
}
// 根据数据源类型创建实例
DataSource dataSource = sundialProperties.getDatasourceType().newInstance();
// 绑定数据源配置值
bind(dataSource, dataSourceProperties);
return dataSource;
}

@SuppressWarnings("rawtypes")
private void bind(DataSource result, Map properties) {
ConfigurationPropertySource source = new MapConfigurationPropertySource(properties);
// 创建绑定对象,拷贝并添加匿名key的值
Binder binder = new Binder(source.withAliases(ALIASES));
binder.bind(ConfigurationPropertyName.EMPTY, Bindable.ofInstance(result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ public class DelegatingDataSourceAdvice implements MethodInterceptor {

@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
// 获取Method对象
// ProxyMethodInvocation pmi = (ProxyMethodInvocation) methodInvocation;
// ProceedingJoinPoint pjp = new MethodInvocationProceedingJoinPoint(pmi);
// Method method = methodInvocation.getMethod();
try {
// 调用方法前,选择数据源
SundialHolder.setDataSourceType(getKeyName());
return methodInvocation.proceed();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.lang.annotation.*;

/**
* Enable 模块
* EnableSundial
* @author jsq [email protected]
* @since 3.4.0
* @version 3.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package com.github.yizzuide.milkomeda.sundial;

import com.github.yizzuide.milkomeda.universe.env.Environment;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.transaction.Transaction;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
Expand All @@ -37,46 +38,48 @@

/**
* MultiDataSourceTransaction
* 多数据源事务
* 主从多数据源事务
*
* @author yizzuide
* @since 3.7.1
* @version 3.12.10
* Create at 2020/05/31 11:48
*/
@Slf4j
public class MultiDataSourceTransaction implements Transaction {

// 数据源
private final DataSource dataSource;

// 主连接
private Connection mainConnection;

private final String mainDataSourceType;

private final ConcurrentMap<String, Connection> otherConnectionMap;

// 连接是否有事务
private boolean isConnectionTransactional;

// 自动提交
private boolean autoCommit;
// 其它连接
private final ConcurrentMap<String, Connection> otherConnectionMap;
// 只读从库识别key
private static final String readOnlyType = "read-only";

public MultiDataSourceTransaction(DataSource dataSource) {
Assert.notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
mainDataSourceType = SundialHolder.getDataSourceType();
otherConnectionMap = new ConcurrentHashMap<>();
}

@Override
public Connection getConnection() throws SQLException {
// 动态的根据DatabaseType获取不同的Connection
String dataSourceType = SundialHolder.getDataSourceType();
if (dataSourceType.equals(mainDataSourceType)) {
// 如果当前为主连接
if (!dataSourceType.startsWith(readOnlyType)) {
if (mainConnection == null) {
openMainConnection();
}
return mainConnection;
} else {
if (!otherConnectionMap.containsKey(dataSourceType)) {
try {
// 从连接不支持事务(用于只读)
Connection conn = dataSource.getConnection();
otherConnectionMap.put(dataSourceType, conn);
return conn;
Expand All @@ -89,10 +92,11 @@ public Connection getConnection() throws SQLException {
}

private void openMainConnection() throws SQLException {
// 将主连接交由Spring事务管理
this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.mainConnection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);
if (log.isDebugEnabled()) {
if (Environment.isShowLog()) {
log.debug("Sundial JDBC Connection [" + this.mainConnection + "] will" +
(this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
}
Expand All @@ -102,7 +106,7 @@ private void openMainConnection() throws SQLException {
public void commit() throws SQLException {
// 非Spring事务管理的提交处理
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (log.isDebugEnabled()) {
if (Environment.isShowLog()) {
log.debug("Sundial Committing JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.commit();
Expand All @@ -116,7 +120,7 @@ public void commit() throws SQLException {
public void rollback() throws SQLException {
// 非Spring事务管理的回滚处理
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (log.isDebugEnabled()) {
if (Environment.isShowLog()) {
log.debug("Sundial Rolling back JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

package com.github.yizzuide.milkomeda.sundial;

import com.github.yizzuide.milkomeda.universe.algorithm.hash.*;
import com.github.yizzuide.milkomeda.universe.algorithm.hash.CachedConsistentHashRing;
import lombok.extern.slf4j.Slf4j;

import java.math.BigDecimal;
Expand All @@ -30,11 +30,7 @@
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ShardingFunction
Expand All @@ -50,8 +46,6 @@ public class ShardingFunction {

public static final String BEAN_ID = "ShardingFunction";

private final Map<String, ConsistentHashRing<Long>> cachedConsistentHashMap = new ConcurrentHashMap<>();

/**
* 格式化函数
* @param format 格式字符串
Expand Down Expand Up @@ -101,7 +95,7 @@ public long seq(long key, long bitStart, long bitCount) {
* @return 目标节点
*/
public long fnv(String key, long nodeCount, int replicas) {
return hash(key, nodeCount, replicas, new FNV132Hash());
return hash("fnv", key, nodeCount, replicas);
}

/**
Expand All @@ -112,7 +106,7 @@ public long fnv(String key, long nodeCount, int replicas) {
* @return 目标节点
*/
public long murmur(String key, long nodeCount, int replicas) {
return hash(key, nodeCount, replicas, new MurmurHash());
return hash("murmur", key, nodeCount, replicas);
}

/**
Expand All @@ -123,25 +117,20 @@ public long murmur(String key, long nodeCount, int replicas) {
* @return 目标节点
*/
public long ketama(String key, long nodeCount, int replicas) {
return hash(key, nodeCount, replicas, new KetamaHash());
return hash("ketama", key, nodeCount, replicas);
}

private long hash(String key, long nodeCount, int replicas, HashFunc hashFunc) {
String cacheKey = nodeCount + "_" + replicas;
// 缓存哈希环
ConsistentHashRing<Long> consistentHashRing = cachedConsistentHashMap.get(cacheKey);
if (consistentHashRing == null) {
// 生成节点
List<Long> nodes = new ArrayList<>();
for (long i = 0; i < nodeCount; i++) {
nodes.add(i);
}
// 创建哈希环
consistentHashRing = new ConsistentHashRing<>(hashFunc, replicas, nodes);
// 添加到缓存
cachedConsistentHashMap.put(cacheKey, consistentHashRing);
}
return consistentHashRing.get(key);
/**
* 自定义算法一致性哈希实现
* @param hashName 哈希算法名
* @param key 拆分键
* @param nodeCount 节点数
* @param replicas 每个节点复制的虚拟节点数,推荐设置4的倍数
* @return 目标节点
* @since 3.12.10
*/
public long hash(String hashName, String key, long nodeCount, int replicas) {
return CachedConsistentHashRing.getInstance().lookForHashNode(key, nodeCount, replicas, hashName);
}

/**
Expand Down Expand Up @@ -179,9 +168,9 @@ public long roll(long key, long slideWindow, double expandWarnPercent) {
assert key >= 0 && slideWindow > 0;
if (expandWarnPercent > 0) {
// 获取系数
double factor = (double)key / (double)slideWindow;
BigDecimal factor = BigDecimal.valueOf(key).divide(BigDecimal.valueOf(slideWindow), 2, RoundingMode.DOWN);
// 截取小数得当前分配百分比
double percent = factor - (long)factor;
double percent = factor.subtract(new BigDecimal(factor.longValue())).doubleValue();
if (percent > expandWarnPercent) {
log.warn("Roll function of sundial sharding up to percent: {}", percent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ShardingId {
// 开始时间 2020-01-01
private static final long EPOCH = 1577808000000L;
// 时间位(可用69年,也就是2089-01-01)
// 计算可用年数:BigDecimal.valueOf(Math.pow(2, 41)).divide(BigDecimal.valueOf(31536000000L/*1年的毫秒数*/), RoundingMode.DOWN)
public static final long TIME_BITS = 41L;
// 机器位(集群8台,满足一定量的并发需求)
private final static long WORKER_ID_BITS = 3L;
Expand Down Expand Up @@ -80,8 +81,8 @@ public static long nextId(long workerId, long businessId, long sharding) {
}
if (preTime == timestamp) {
seqStart = (seqStart + 1) & MAX_SN;
// 如果序列号处于起点,获取比上次更新的时间
if (seqStart == 0) {
// 如果序列号处于最大,循环到超过1毫秒
if (seqStart == MAX_SN) {
timestamp = nextTime(preTime);
}
} else {
Expand All @@ -96,6 +97,16 @@ public static long nextId(long workerId, long businessId, long sharding) {
}
}

/**
* 从id获取存储的时间戳
* @param id id值
* @return 时间戳
* @since 3.12.10
*/
public static long extractEpochMill(long id) {
return (id >> (SN_BITS + SHARD_BITS + BUSINESS_ID_BITS + WORKER_ID_BITS)) + EPOCH;
}

private static long currentTime() {
return System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public DataSource dynamicRouteDataSource(DataSourceFactory dataSourceFactory) {
if (CollectionUtils.isEmpty(datasourceConfigSet)) {
return new DynamicRouteDataSource(defaultDataSource, targetDataSources);
}
// 不回其它数据源节点
// 添加其它数据源节点
for (Map.Entry<String, SundialProperties.Datasource> entry : datasourceConfigSet) {
DataSource dataSource = dataSourceFactory.createDataSource(entry.getValue());
targetDataSources.put(entry.getKey(), dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.ibatis.session.RowBounds;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
Expand All @@ -48,7 +49,7 @@
*
* @author yizzuide
* @since 3.8.0
* @version 3.9.0
* @version 3.12.10
* Create at 2020/06/16 11:18
*/
@Slf4j
Expand Down Expand Up @@ -104,10 +105,11 @@ public Object intercept(Invocation invocation) throws Throwable {
root.setP(params);
root.setFn(shardingFunction);
String schema = null;
// 分库、分库分表
// 包含分库的处理
if (sundial.shardingType() != ShardingType.TABLE && !StringUtils.isEmpty(nodeExp)) {
String node = SimpleElParser.parse(nodeExp, root, String.class);
SundialProperties.DataNode dataNode = props.getSharding().getNodes().get(node);
// node_001 --转--> node_1
if (dataNode == null) {
int separatorIndex = node.lastIndexOf(props.getSharding().getIndexSeparator());
String index = node.substring(separatorIndex + 1);
Expand All @@ -117,9 +119,12 @@ public Object intercept(Invocation invocation) throws Throwable {
// 是否使用主连接
if (sundial.key().equals(DynamicRouteDataSource.MASTER_KEY)) {
SundialHolder.setDataSourceType(dataNode.getLeader());
} else if(sundial.key().equals("follows")) { // 从库任选
if (!CollectionUtils.isEmpty(dataNode.getFollows())) {
SundialHolder.setDataSourceType(dataNode.getFollows().stream().findAny().orElse(DynamicRouteDataSource.MASTER_KEY));
}
} else {
// 暂时使用第一个从连接
SundialHolder.setDataSourceType(dataNode.getFollows().get(0));
SundialHolder.setDataSourceType(sundial.key());
}
// 需要添加的数据库名
if (!StringUtils.isEmpty(dataNode.getSchema())) {
Expand Down Expand Up @@ -156,7 +161,7 @@ public Object intercept(Invocation invocation) throws Throwable {
sql = sql.replace(tableName, schema == null ? part : schema + "." + part);
updateSql(sql, invocation, ms, args, boundSql);
}
return invocation.proceed();
return invocation.proceed();
}

@Override
Expand Down
Loading

0 comments on commit 7d9ef61

Please sign in to comment.