diff --git a/compatible/src/test/java/io/seata/rm/datasource/DataSourceProxyTest.java b/compatible/src/test/java/io/seata/rm/datasource/DataSourceProxyTest.java index e707c4c568f..4a6a02c7920 100644 --- a/compatible/src/test/java/io/seata/rm/datasource/DataSourceProxyTest.java +++ b/compatible/src/test/java/io/seata/rm/datasource/DataSourceProxyTest.java @@ -17,6 +17,7 @@ package io.seata.rm.datasource; import java.sql.Connection; +import java.util.Properties; import javax.sql.DataSource; @@ -61,6 +62,7 @@ public void testNotSupportDb() { dataSource.setDriver(mockDriver); dataSource.setUsername(username); dataSource.setPassword("password"); + dataSource.setConnectProperties(new Properties()); Throwable throwable = Assertions.assertThrows(IllegalStateException.class, () -> new DataSourceProxy(dataSource)); assertThat(throwable).hasMessageContaining("AT mode don't support the dbtype"); diff --git a/compatible/src/test/java/io/seata/rm/datasource/mock/MockDataSource.java b/compatible/src/test/java/io/seata/rm/datasource/mock/MockDataSource.java index 7854c43b0eb..32893fded8c 100644 --- a/compatible/src/test/java/io/seata/rm/datasource/mock/MockDataSource.java +++ b/compatible/src/test/java/io/seata/rm/datasource/mock/MockDataSource.java @@ -20,6 +20,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; import java.util.logging.Logger; import javax.sql.DataSource; @@ -27,7 +28,7 @@ public class MockDataSource implements DataSource { @Override public Connection getConnection() throws SQLException { - return new MockConnection(new MockDriver(), "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true", null); + return new MockConnection(new MockDriver(), "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true", new Properties()); } @Override diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java index dfab706294e..8caa2cf9854 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java @@ -16,37 +16,22 @@ */ package org.apache.seata.rm.datasource; +import javax.sql.DataSource; import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; - -import javax.sql.DataSource; -import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.Constants; -import org.apache.seata.common.loader.EnhancedServiceNotFoundException; -import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.context.RootContext; import org.apache.seata.core.model.BranchType; import org.apache.seata.core.model.Resource; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.rm.datasource.sql.struct.TableMetaCacheFactory; -import org.apache.seata.rm.datasource.undo.UndoLogManager; -import org.apache.seata.rm.datasource.undo.UndoLogManagerFactory; -import org.apache.seata.rm.datasource.util.JdbcUtils; import org.apache.seata.sqlparser.util.JdbcConstants; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_TABLE; - /** * The type Data source proxy. - * */ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { @@ -64,18 +49,7 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource private String userName; - private String kernelVersion; - - private String productVersion; - - private final Map variables = new HashMap<>(); - - /** - * POLARDB-X 1.X -> TDDL - * POLARDB-X 2.X & MySQL 5.6 -> PXC - * POLARDB-X 2.X & MySQL 5.7 -> AliSQL-X - */ - private static final String[] POLARDB_X_PRODUCT_KEYWORD = {"TDDL","AliSQL-X","PXC"}; + private SeataDataSourceProxyMetadata dataSourceProxyMetadata; /** * Instantiates a new Data source proxy. @@ -103,21 +77,15 @@ public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) { private void init(DataSource dataSource, String resourceGroupId) { this.resourceGroupId = resourceGroupId; - try (Connection connection = dataSource.getConnection()) { - jdbcUrl = connection.getMetaData().getURL(); - dbType = JdbcUtils.getDbType(jdbcUrl); - if (JdbcConstants.ORACLE.equals(dbType)) { - userName = connection.getMetaData().getUserName(); - } else if (JdbcConstants.MYSQL.equals(dbType)) { - validMySQLVersion(connection); - checkDerivativeProduct(); - } - checkUndoLogTableExist(connection); - + try { + dataSourceProxyMetadata = SeataDataSourceProxyMetadataFactory.create(dataSource); } catch (SQLException e) { - throw new IllegalStateException("can not init dataSource", e); + throw new IllegalStateException("can not init datasource metadata", e); } - if (JdbcConstants.SQLSERVER.equals(dbType)) { + jdbcUrl = dataSourceProxyMetadata.getJdbcUrl(); + dbType = dataSourceProxyMetadata.getDbType(); + userName = dataSourceProxyMetadata.getUserName(); + if (JdbcConstants.SQLSERVER.equals(dataSourceProxyMetadata.getDbType())) { LOGGER.info("SQLServer support in AT mode is currently an experimental function, " + "if you have any problems in use, please feedback to us"); } @@ -128,59 +96,6 @@ private void init(DataSource dataSource, String resourceGroupId) { RootContext.setDefaultBranchType(this.getBranchType()); } - /** - * Define derivative product version for MySQL Kernel - * - */ - private void checkDerivativeProduct() { - if (!JdbcConstants.MYSQL.equals(dbType)) { - return; - } - // check for polardb-x - if (isPolardbXProduct()) { - dbType = JdbcConstants.POLARDBX; - return; - } - // check for other products base on mysql kernel - } - - private boolean isPolardbXProduct() { - if (StringUtils.isBlank(productVersion)) { - return false; - } - for (String keyword : POLARDB_X_PRODUCT_KEYWORD) { - if (productVersion.contains(keyword)) { - return true; - } - } - return false; - } - - /** - * check existence of undolog table - * - * if the table not exist fast fail, or else keep silence - * - * @param conn db connection - */ - private void checkUndoLogTableExist(Connection conn) { - UndoLogManager undoLogManager; - try { - undoLogManager = UndoLogManagerFactory.getUndoLogManager(dbType); - } catch (EnhancedServiceNotFoundException e) { - String errMsg = String.format("AT mode don't support the dbtype: %s", dbType); - throw new IllegalStateException(errMsg, e); - } - - boolean undoLogTableExist = undoLogManager.hasUndoLogTable(conn); - if (!undoLogTableExist) { - String undoLogTableName = ConfigurationFactory.getInstance() - .getConfig(ConfigurationKeys.TRANSACTION_UNDO_LOG_TABLE, DEFAULT_TRANSACTION_UNDO_LOG_TABLE); - String errMsg = String.format("in AT mode, %s table not exist", undoLogTableName); - throw new IllegalStateException(errMsg); - } - } - /** * publish tableMeta refresh event */ @@ -207,6 +122,14 @@ public String getDbType() { return dbType; } + /** + * Get datasource proxy metadata + * @return seata datasource proxy metadata + */ + public SeataDataSourceProxyMetadata getDataSourceProxyMetadata() { + return dataSourceProxyMetadata; + } + @Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); @@ -365,7 +288,6 @@ private void initPGResourceId() { * The general form of the connection URL for SqlServer is * jdbc:sqlserver://[serverName[\instanceName][:portNumber]][;property=value[;property=value]] * required connection properties: [INSTANCENAME], [databaseName,database] - * */ private void initSqlServerResourceId() { if (jdbcUrl.contains(";")) { @@ -399,42 +321,4 @@ public BranchType getBranchType() { return BranchType.AT; } - public String getKernelVersion() { - return kernelVersion; - } - - public String getVariableValue(String name) { - return variables.get(name); - } - - private void validMySQLVersion(Connection connection) { - if (!JdbcConstants.MYSQL.equals(dbType)) { - return; - } - try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW VARIABLES"); - ResultSet rs = preparedStatement.executeQuery()) { - while (rs.next()) { - String name = rs.getString(1); - String value = rs.getString(2); - if (StringUtils.isNotBlank(name)) { - variables.put(name.toLowerCase(), value); - } - } - String version = variables.get("version"); - if (StringUtils.isBlank(version)) { - return; - } - int dashIdx = version.indexOf('-'); - // in mysql: 5.6.45, in polardb-x: 5.6.45-TDDL-xxx - if (dashIdx > 0) { - kernelVersion = version.substring(0, dashIdx); - productVersion = version.substring(dashIdx + 1); - } else { - kernelVersion = version; - productVersion = version; - } - } catch (Exception e) { - LOGGER.error("check mysql version fail error: {}", e.getMessage()); - } - } } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/SeataDataSourceProxyMetadata.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/SeataDataSourceProxyMetadata.java new file mode 100644 index 00000000000..4decac141a8 --- /dev/null +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/SeataDataSourceProxyMetadata.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource; + +import javax.sql.DataSource; +import java.sql.SQLException; +import java.util.Map; + +/** + * The type Data source proxy metadata. + */ +public interface SeataDataSourceProxyMetadata { + + /** + * Init datasource metadata + * + * @param dataSource the datasource + * @throws SQLException sql exception + */ + void init(DataSource dataSource) throws SQLException; + + /** + * Get variable value by name + * + * @param name the name + * @return value + */ + String getVariableValue(String name); + + /** + * Get variables + * + * @return all variable + */ + Map getVariables(); + + /** + * Get jdbc url + * + * @return jdbc url + */ + String getJdbcUrl(); + + /** + * Gets db type. + * + * @return the db type + */ + String getDbType(); + + /** + * Get database connection username + * + * @return username + */ + String getUserName(); + + /** + * Get kernel version + * + * @return kernel version + */ + String getKernelVersion(); + +} diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/SeataDataSourceProxyMetadataFactory.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/SeataDataSourceProxyMetadataFactory.java new file mode 100644 index 00000000000..f816bc0caa4 --- /dev/null +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/SeataDataSourceProxyMetadataFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.seata.rm.datasource.metadata.DefaultDataSourceProxyMetadata; +import org.apache.seata.rm.datasource.metadata.MySQLDataSourceProxyMetadata; +import org.apache.seata.rm.datasource.util.JdbcUtils; +import org.apache.seata.sqlparser.util.JdbcConstants; + +/** + * datasource proxy metadata factory + */ +public class SeataDataSourceProxyMetadataFactory { + + public static SeataDataSourceProxyMetadata create(DataSource dataSource) throws SQLException { + SeataDataSourceProxyMetadata dataSourceProxyMetadata = null; + try (Connection connection = dataSource.getConnection()) { + String jdbcUrl = connection.getMetaData().getURL(); + String dbType = JdbcUtils.getDbType(jdbcUrl); + if (JdbcConstants.MYSQL.equals(dbType) + || JdbcConstants.MARIADB.equals(dbType) + || JdbcConstants.POLARDBX.equals(dbType)) { + dataSourceProxyMetadata = new MySQLDataSourceProxyMetadata(); + } else { + dataSourceProxyMetadata = new DefaultDataSourceProxyMetadata(); + } + } + dataSourceProxyMetadata.init(dataSource); + return dataSourceProxyMetadata; + } + +} diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLInsertExecutor.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLInsertExecutor.java index 923f2b5c416..761cc954537 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLInsertExecutor.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLInsertExecutor.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.seata.common.exception.NotSupportYetException; import org.apache.seata.common.exception.ShouldNeverHappenException; @@ -37,8 +36,8 @@ import org.apache.seata.rm.datasource.StatementProxy; import org.apache.seata.rm.datasource.exec.BaseInsertExecutor; import org.apache.seata.rm.datasource.exec.StatementCallback; -import org.apache.seata.sqlparser.struct.ColumnMeta; import org.apache.seata.sqlparser.SQLRecognizer; +import org.apache.seata.sqlparser.struct.ColumnMeta; import org.apache.seata.sqlparser.struct.Defaultable; import org.apache.seata.sqlparser.struct.Null; import org.apache.seata.sqlparser.struct.SqlMethodExpr; @@ -48,7 +47,6 @@ /** * The type My sql insert executor. - * */ @LoadLevel(name = JdbcConstants.MYSQL, scope = Scope.PROTOTYPE) public class MySQLInsertExecutor extends BaseInsertExecutor implements Defaultable { @@ -60,13 +58,6 @@ public class MySQLInsertExecutor extends BaseInsertExecutor implements Defaultab */ public static final String ERR_SQL_STATE = "S1009"; - /** - * The cache of auto increment step of database - * the key is the db's resource id - * the value is the step - */ - public static final Map RESOURCE_ID_STEP_CACHE = new ConcurrentHashMap<>(8); - /** * Instantiates a new Abstract dml base executor. * @@ -80,19 +71,17 @@ public MySQLInsertExecutor(StatementProxy statementProxy, StatementCallback stat } @Override - public Map> getPkValues() throws SQLException { - Map> pkValuesMap = null; + public Map> getPkValues() throws SQLException { + Map> pkValuesMap = null; List pkColumnNameList = getTableMeta().getPrimaryKeyOnlyName(); boolean isContainsPk = containsPK(); //when there is only one pk in the table if (pkColumnNameList.size() == 1) { if (isContainsPk) { pkValuesMap = getPkValuesByColumn(); - } - else if (containsColumns()) { + } else if (containsColumns()) { pkValuesMap = getPkValuesByAuto(); - } - else { + } else { pkValuesMap = getPkValuesByColumn(); } } else { @@ -100,7 +89,7 @@ else if (containsColumns()) { //1,all pk columns are filled value. //2,the auto increment pk column value is null, and other pk value are not null. pkValuesMap = getPkValuesByColumn(); - for (String columnName:pkColumnNameList) { + for (String columnName : pkColumnNameList) { if (!pkValuesMap.containsKey(columnName)) { ColumnMeta pkColumnMeta = getTableMeta().getColumnMeta(columnName); if (Objects.nonNull(pkColumnMeta) && pkColumnMeta.isAutoincrement()) { @@ -181,11 +170,11 @@ public Map> getPkValuesByAuto() throws SQLException { } @Override - public Map> getPkValuesByColumn() throws SQLException { - Map> pkValuesMap = parsePkValuesFromStatement(); + public Map> getPkValuesByColumn() throws SQLException { + Map> pkValuesMap = parsePkValuesFromStatement(); Set keySet = new HashSet<>(pkValuesMap.keySet()); //auto increment - for (String pkKey:keySet) { + for (String pkKey : keySet) { List pkValues = pkValuesMap.get(pkKey); // pk auto generated while single insert primary key is expression if (pkValues.size() == 1 && (pkValues.get(0) instanceof SqlMethodExpr)) { @@ -215,22 +204,10 @@ public List getPkValuesByDefault(String pkKey) throws SQLException { } protected Map> autoGeneratePks(BigDecimal cursor, String autoColumnName, Integer updateCount) throws SQLException { - BigDecimal step = BigDecimal.ONE; - String resourceId = statementProxy.getConnectionProxy().getDataSourceProxy().getResourceId(); - if (RESOURCE_ID_STEP_CACHE.containsKey(resourceId)) { - step = RESOURCE_ID_STEP_CACHE.get(resourceId); - } else { - ResultSet increment = null; - try { - increment = statementProxy.getTargetStatement().executeQuery("SHOW VARIABLES LIKE 'auto_increment_increment'"); - increment.next(); - step = new BigDecimal(increment.getString(2)); - RESOURCE_ID_STEP_CACHE.put(resourceId, step); - } finally { - IOUtil.close(increment); - } - } + String increment = statementProxy.getConnectionProxy().getDataSourceProxy() + .getDataSourceProxyMetadata().getVariableValue("auto_increment_increment"); + BigDecimal step = new BigDecimal(increment); List pkValues = new ArrayList<>(); for (int i = 0; i < updateCount; i++) { @@ -239,7 +216,7 @@ protected Map> autoGeneratePks(BigDecimal cursor, String au } Map> pkValuesMap = new HashMap<>(1, 1.001f); - pkValuesMap.put(autoColumnName,pkValues); + pkValuesMap.put(autoColumnName, pkValues); return pkValuesMap; } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java index b682105fa0a..81a65e54d52 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java @@ -311,6 +311,6 @@ private String buildGroupBy(List pkColumns,List allSelectColumns } private String getDbVersion() { - return statementProxy.getConnectionProxy().getDataSourceProxy().getKernelVersion(); + return statementProxy.getConnectionProxy().getDataSourceProxy().getDataSourceProxyMetadata().getKernelVersion(); } } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/AbstractDataSourceProxyMetadata.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/AbstractDataSourceProxyMetadata.java new file mode 100644 index 00000000000..a6fb3bf946c --- /dev/null +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/AbstractDataSourceProxyMetadata.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource.metadata; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.loader.EnhancedServiceNotFoundException; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.rm.datasource.SeataDataSourceProxyMetadata; +import org.apache.seata.rm.datasource.undo.UndoLogManager; +import org.apache.seata.rm.datasource.undo.UndoLogManagerFactory; +import org.apache.seata.rm.datasource.util.JdbcUtils; + +import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_TABLE; + +public abstract class AbstractDataSourceProxyMetadata implements SeataDataSourceProxyMetadata { + + protected String jdbcUrl; + protected String dbType; + protected String userName; + + @Override + public void init(DataSource dataSource) throws SQLException { + try (Connection connection = dataSource.getConnection()) { + jdbcUrl = connection.getMetaData().getURL(); + dbType = JdbcUtils.getDbType(jdbcUrl); + userName = connection.getMetaData().getUserName(); + checkUndoLogTableExist(connection); + } + } + + /** + * check existence of undolog table + *

+ * if the table not exist fast fail, or else keep silence + * + * @param conn db connection + */ + protected void checkUndoLogTableExist(Connection conn) { + UndoLogManager undoLogManager; + try { + undoLogManager = UndoLogManagerFactory.getUndoLogManager(dbType); + } catch (EnhancedServiceNotFoundException e) { + String errMsg = String.format("AT mode don't support the dbtype: %s", dbType); + throw new IllegalStateException(errMsg, e); + } + + boolean undoLogTableExist = undoLogManager.hasUndoLogTable(conn); + if (!undoLogTableExist) { + String undoLogTableName = ConfigurationFactory.getInstance() + .getConfig(ConfigurationKeys.TRANSACTION_UNDO_LOG_TABLE, DEFAULT_TRANSACTION_UNDO_LOG_TABLE); + String errMsg = String.format("in AT mode, %s table not exist", undoLogTableName); + throw new IllegalStateException(errMsg); + } + } + + @Override + public String getJdbcUrl() { + return jdbcUrl; + } + + @Override + public String getDbType() { + return dbType; + } + + @Override + public String getUserName() { + return userName; + } +} diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/DefaultDataSourceProxyMetadata.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/DefaultDataSourceProxyMetadata.java new file mode 100644 index 00000000000..e44d3218256 --- /dev/null +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/DefaultDataSourceProxyMetadata.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource.metadata; + +import javax.sql.DataSource; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class DefaultDataSourceProxyMetadata extends AbstractDataSourceProxyMetadata { + + @Override + public void init(DataSource dataSource) throws SQLException { + super.init(dataSource); + } + + @Override + public String getVariableValue(String name) { + return null; + } + + @Override + public Map getVariables() { + return new HashMap<>(); + } + + @Override + public String getKernelVersion() { + return null; + } + +} diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/MySQLDataSourceProxyMetadata.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/MySQLDataSourceProxyMetadata.java new file mode 100644 index 00000000000..c883e71f039 --- /dev/null +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/metadata/MySQLDataSourceProxyMetadata.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource.metadata; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.rm.datasource.util.JdbcUtils; +import org.apache.seata.sqlparser.util.JdbcConstants; + +/** + * mysql datasource proxy metadata + */ +public class MySQLDataSourceProxyMetadata extends AbstractDataSourceProxyMetadata { + + /** + * POLARDB-X 1.X -> TDDL + * POLARDB-X 2.X & MySQL 5.6 -> PXC + * POLARDB-X 2.X & MySQL 5.7 -> AliSQL-X + */ + private static final String[] POLARDB_X_PRODUCT_KEYWORD = {"TDDL", "AliSQL-X", "PXC"}; + + private final Map variables = new HashMap<>(); + private String kernelVersion; + private String productVersion; + + @Override + public void init(DataSource dataSource) throws SQLException { + try (Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement("SHOW VARIABLES"); + ResultSet rs = preparedStatement.executeQuery()) { + jdbcUrl = connection.getMetaData().getURL(); + dbType = JdbcUtils.getDbType(jdbcUrl); + userName = connection.getMetaData().getUserName(); + while (rs.next()) { + String name = rs.getString(1); + String value = rs.getString(2); + if (StringUtils.isNotBlank(name)) { + variables.put(name.toLowerCase(), value); + } + } + checkUndoLogTableExist(connection); + } + + validMySQLVersion(); + checkDerivativeProduct(); + } + + @Override + public String getVariableValue(String name) { + return variables.get(name); + } + + @Override + public Map getVariables() { + return new HashMap<>(variables); + } + + @Override + public String getKernelVersion() { + return kernelVersion; + } + + private void validMySQLVersion() { + String version = variables.get("version"); + if (org.apache.commons.lang.StringUtils.isBlank(version)) { + return; + } + int dashIdx = version.indexOf('-'); + // in mysql: 5.6.45, in polardb-x: 5.6.45-TDDL-xxx + if (dashIdx > 0) { + kernelVersion = version.substring(0, dashIdx); + productVersion = version.substring(dashIdx + 1); + } else { + kernelVersion = version; + productVersion = version; + } + } + + /** + * Define derivative product version for MySQL Kernel + */ + private void checkDerivativeProduct() { + // check for polardb-x + if (isPolardbXProduct()) { + dbType = JdbcConstants.POLARDBX; + } + // check for other products base on mysql kernel + } + + private boolean isPolardbXProduct() { + if (org.apache.commons.lang.StringUtils.isBlank(productVersion)) { + return false; + } + for (String keyword : POLARDB_X_PRODUCT_KEYWORD) { + if (productVersion.contains(keyword)) { + return true; + } + } + return false; + } +} diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/mysql/MySQLUndoLogManager.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/mysql/MySQLUndoLogManager.java index bc299af723d..b75a7a6efa9 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/mysql/MySQLUndoLogManager.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/mysql/MySQLUndoLogManager.java @@ -122,7 +122,7 @@ protected Pair> getSubRollbackInfo(Connection conn, String @Override protected String getMaxAllowedPacket(DataSourceProxy dataSourceProxy) { - return dataSourceProxy.getVariableValue("max_allowed_packet"); + return dataSourceProxy.getDataSourceProxyMetadata().getVariableValue("max_allowed_packet"); } @Override diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java index 1f4bf9659eb..becf5696b64 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/DataSourceProxyTest.java @@ -16,14 +16,13 @@ */ package org.apache.seata.rm.datasource; +import javax.sql.DataSource; import java.lang.reflect.Field; import java.sql.Connection; import java.sql.SQLException; -import javax.sql.DataSource; +import java.util.Properties; import com.alibaba.druid.pool.DruidDataSource; - -import org.apache.seata.rm.datasource.DataSourceProxy; import org.apache.seata.rm.datasource.mock.MockDataSource; import org.apache.seata.rm.datasource.mock.MockDriver; import org.apache.seata.rm.datasource.sql.struct.TableMetaCacheFactory; @@ -37,7 +36,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; public class DataSourceProxyTest { @@ -64,6 +64,7 @@ public void testNotSupportDb() { dataSource.setDriver(mockDriver); dataSource.setUsername(username); dataSource.setPassword("password"); + dataSource.setConnectProperties(new Properties()); Throwable throwable = Assertions.assertThrows(IllegalStateException.class, () -> new DataSourceProxy(dataSource)); assertThat(throwable).hasMessageContaining("AT mode don't support the dbtype"); diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MariadbInsertExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MariadbInsertExecutorTest.java index 51b5336af15..3d9522c4a05 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MariadbInsertExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MariadbInsertExecutorTest.java @@ -19,7 +19,6 @@ import java.lang.reflect.Field; import java.sql.SQLException; import java.sql.Types; -import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -32,19 +31,16 @@ import org.apache.seata.rm.datasource.DataSourceProxyTest; import org.apache.seata.rm.datasource.PreparedStatementProxy; import org.apache.seata.rm.datasource.StatementProxy; -import org.apache.seata.rm.datasource.exec.StatementCallback; import org.apache.seata.rm.datasource.exec.mysql.MySQLInsertExecutor; import org.apache.seata.rm.datasource.mock.MockDriver; import org.apache.seata.rm.datasource.mock.MockMariadbDataSource; -import org.apache.seata.rm.datasource.mock.MockResultSet; -import org.apache.seata.sqlparser.struct.TableMeta; import org.apache.seata.sqlparser.SQLInsertRecognizer; +import org.apache.seata.sqlparser.struct.TableMeta; import org.apache.seata.sqlparser.util.JdbcConstants; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.mockito.Mockito; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -63,10 +59,6 @@ public void init() throws SQLException { when(statementProxy.getConnectionProxy()).thenReturn(connectionProxy); when(statementProxy.getTargetStatement()).thenReturn(statementProxy); - MockResultSet resultSet = new MockResultSet(statementProxy); - resultSet.mockResultSet(Arrays.asList("Variable_name", "Value"), new Object[][]{{"auto_increment_increment", "1"}}); - when(statementProxy.getTargetStatement().executeQuery("SHOW VARIABLES LIKE 'auto_increment_increment'")).thenReturn(resultSet); - StatementCallback statementCallback = mock(StatementCallback.class); sqlInsertRecognizer = mock(SQLInsertRecognizer.class); tableMeta = mock(TableMeta.class); diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MultiExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MultiExecutorTest.java index ffcd118cd95..b0163ce1545 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MultiExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MultiExecutorTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -77,6 +78,7 @@ public static void init() throws Throwable { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mock:xxx"); dataSource.setDriver(mockDriver); + dataSource.setConnectProperties(new Properties()); DataSourceProxy dataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource); diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MySQLInsertExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MySQLInsertExecutorTest.java index eaed8d4abb9..dd517d5c970 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MySQLInsertExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/MySQLInsertExecutorTest.java @@ -36,7 +36,6 @@ import com.alibaba.druid.sql.SQLUtils; import com.alibaba.druid.sql.ast.SQLStatement; import com.google.common.collect.Lists; -import org.apache.seata.rm.datasource.exec.StatementCallback; import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.rm.datasource.ConnectionProxy; import org.apache.seata.rm.datasource.DataSourceProxy; @@ -44,25 +43,24 @@ import org.apache.seata.rm.datasource.PreparedStatementProxy; import org.apache.seata.rm.datasource.StatementProxy; import org.apache.seata.rm.datasource.exec.mysql.MySQLInsertExecutor; +import org.apache.seata.rm.datasource.metadata.MySQLDataSourceProxyMetadata; import org.apache.seata.rm.datasource.mock.MockDataSource; import org.apache.seata.rm.datasource.mock.MockDriver; -import org.apache.seata.rm.datasource.mock.MockResultSet; -import org.apache.seata.sqlparser.druid.mysql.MySQLInsertRecognizer; -import org.apache.seata.sqlparser.struct.ColumnMeta; -import org.apache.seata.sqlparser.struct.TableMeta; import org.apache.seata.rm.datasource.sql.struct.TableRecords; import org.apache.seata.sqlparser.SQLInsertRecognizer; +import org.apache.seata.sqlparser.druid.mysql.MySQLInsertRecognizer; +import org.apache.seata.sqlparser.struct.ColumnMeta; import org.apache.seata.sqlparser.struct.Null; import org.apache.seata.sqlparser.struct.SqlDefaultExpr; import org.apache.seata.sqlparser.struct.SqlMethodExpr; import org.apache.seata.sqlparser.struct.SqlSequenceExpr; +import org.apache.seata.sqlparser.struct.TableMeta; import org.apache.seata.sqlparser.util.JdbcConstants; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; - import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -91,7 +89,7 @@ public class MySQLInsertExecutorTest { protected MySQLInsertExecutor newInsertExecutor; protected final int pkIndex = 0; - protected HashMap pkIndexMap; + protected HashMap pkIndexMap; @BeforeEach public void init() throws SQLException { @@ -104,16 +102,12 @@ public void init() throws SQLException { when(statementProxy.getConnectionProxy()).thenReturn(connectionProxy); when(statementProxy.getTargetStatement()).thenReturn(statementProxy); - MockResultSet resultSet = new MockResultSet(statementProxy); - resultSet.mockResultSet(Arrays.asList("Variable_name", "Value"), new Object[][]{{"auto_increment_increment", "1"}}); - when(statementProxy.getTargetStatement().executeQuery("SHOW VARIABLES LIKE 'auto_increment_increment'")).thenReturn(resultSet); - StatementCallback statementCallback = mock(StatementCallback.class); sqlInsertRecognizer = mock(SQLInsertRecognizer.class); tableMeta = mock(TableMeta.class); insertExecutor = Mockito.spy(new MySQLInsertExecutor(statementProxy, statementCallback, sqlInsertRecognizer)); - pkIndexMap = new HashMap(){ + pkIndexMap = new HashMap() { { put(ID_COLUMN, pkIndex); } @@ -121,21 +115,21 @@ public void init() throws SQLException { // new test init property List returnValueColumnLabels = Lists.newArrayList("id", "user_id", "name", "sex", "update_time"); - Object[][] returnValue = new Object[][] { - new Object[] {1, 1, "will", 1, 0}, + Object[][] returnValue = new Object[][]{ + new Object[]{1, 1, "will", 1, 0}, }; - Object[][] columnMetas = new Object[][] { - new Object[] {"", "", "table_insert_executor_test", "id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "user_id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "name", Types.VARCHAR, "VARCHAR", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "sex", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "update_time", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "YES", "NO"}, + Object[][] columnMetas = new Object[][]{ + new Object[]{"", "", "table_insert_executor_test", "id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "user_id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "name", Types.VARCHAR, "VARCHAR", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "sex", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "update_time", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "YES", "NO"}, }; - Object[][] indexMetas = new Object[][] { - new Object[] {"PRIMARY", "id", false, "", 3, 1, "A", 34}, - new Object[] {"PRIMARY", "user_id", false, "", 3, 1, "A", 34}, + Object[][] indexMetas = new Object[][]{ + new Object[]{"PRIMARY", "id", false, "", 3, 1, "A", 34}, + new Object[]{"PRIMARY", "user_id", false, "", 3, 1, "A", 34}, }; - Object[][] onUpdateColumnsReturnValue = new Object[][] { + Object[][] onUpdateColumnsReturnValue = new Object[][]{ new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0} }; @@ -238,8 +232,8 @@ public void testBeforeAndAfterImageUpperColumn() throws SQLException { @Test public void testAfterImage_ByColumn() throws SQLException { doReturn(true).when(insertExecutor).containsPK(); - Map> pkValuesMap =new HashMap<>(); - pkValuesMap.put("id",Arrays.asList(new Object[]{PK_VALUE})); + Map> pkValuesMap = new HashMap<>(); + pkValuesMap.put("id", Arrays.asList(new Object[]{PK_VALUE})); doReturn(pkValuesMap).when(insertExecutor).getPkValuesByColumn(); TableRecords tableRecords = new TableRecords(); doReturn(tableRecords).when(insertExecutor).buildTableRecords(pkValuesMap); @@ -253,8 +247,8 @@ public void testAfterImage_ByColumn() throws SQLException { public void testAfterImage_ByAuto() throws SQLException { doReturn(false).when(insertExecutor).containsPK(); doReturn(true).when(insertExecutor).containsColumns(); - Map> pkValuesMap =new HashMap<>(); - pkValuesMap.put("id",Arrays.asList(new Object[]{PK_VALUE})); + Map> pkValuesMap = new HashMap<>(); + pkValuesMap.put("id", Arrays.asList(new Object[]{PK_VALUE})); doReturn(pkValuesMap).when(insertExecutor).getPkValuesByAuto(); TableRecords tableRecords = new TableRecords(); doReturn(tableRecords).when(insertExecutor).buildTableRecords(pkValuesMap); @@ -269,8 +263,8 @@ public void testAfterImage_Exception() { Assertions.assertThrows(SQLException.class, () -> { doReturn(false).when(insertExecutor).containsPK(); doReturn(true).when(insertExecutor).containsColumns(); - Map> pkValuesMap =new HashMap<>(); - pkValuesMap.put("id",Arrays.asList(new Object[]{PK_VALUE})); + Map> pkValuesMap = new HashMap<>(); + pkValuesMap.put("id", Arrays.asList(new Object[]{PK_VALUE})); doReturn(pkValuesMap).when(insertExecutor).getPkValuesByAuto(); doReturn(null).when(insertExecutor).buildTableRecords(pkValuesMap); doReturn(tableMeta).when(insertExecutor).getTableMeta(); @@ -301,7 +295,7 @@ public void testGetPkValuesByColumn() throws SQLException { List pkValues = new ArrayList<>(); pkValues.add(PK_VALUE); doReturn(pkIndexMap).when(insertExecutor).getPkIndex(); - Map> pkValuesList = insertExecutor.getPkValuesByColumn(); + Map> pkValuesList = insertExecutor.getPkValuesByColumn(); Assertions.assertIterableEquals(pkValuesList.get(ID_COLUMN), pkValues); } @@ -326,13 +320,17 @@ public void testGetPkValuesByColumn_PkValue_Null() throws SQLException { ColumnMeta cm = new ColumnMeta(); cm.setColumnName(ID_COLUMN); cm.setIsAutoincrement("YES"); - when(tableMeta.getPrimaryKeyMap()).thenReturn(new HashMap(){{put(ID_COLUMN,cm);}}); + when(tableMeta.getPrimaryKeyMap()).thenReturn(new HashMap() {{ + put(ID_COLUMN, cm); + }}); List pkValuesAuto = new ArrayList<>(); pkValuesAuto.add(PK_VALUE); //mock getPkValuesByAuto - doReturn(new HashMap>(){{put(ID_COLUMN,pkValuesAuto);}}).when(insertExecutor).getPkValuesByAuto(); + doReturn(new HashMap>() {{ + put(ID_COLUMN, pkValuesAuto); + }}).when(insertExecutor).getPkValuesByAuto(); doReturn(pkIndexMap).when(insertExecutor).getPkIndex(); - Map> pkValuesList = insertExecutor.getPkValuesByColumn(); + Map> pkValuesList = insertExecutor.getPkValuesByColumn(); //pk value = Null so getPkValuesByAuto verify(insertExecutor).getPkValuesByAuto(); Assertions.assertIterableEquals(pkValuesList.get(ID_COLUMN), pkValuesAuto); @@ -383,7 +381,7 @@ public void testGetPkValuesByAuto_SQLException_WarnLog() throws SQLException { when(statementProxy.getGeneratedKeys()).thenThrow(e); ResultSet genKeys = mock(ResultSet.class); when(statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()")).thenReturn(genKeys); - Map> pkValueMap=insertExecutor.getPkValuesByAuto(); + Map> pkValueMap = insertExecutor.getPkValuesByAuto(); Assertions.assertTrue(pkValueMap.get(ID_COLUMN).isEmpty()); } @@ -401,8 +399,8 @@ public void testGetPkValuesByAuto_GeneratedKeys_NoResult() throws SQLException { when(statementProxy.getGeneratedKeys()).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); when(resultSet.getObject(1)).thenReturn(PK_VALUE); - Map> pkValues = insertExecutor.getPkValuesByAuto(); - Assertions.assertEquals(pkValues.get(ID_COLUMN).size(),0); + Map> pkValues = insertExecutor.getPkValuesByAuto(); + Assertions.assertEquals(pkValues.get(ID_COLUMN).size(), 0); } @Test @@ -421,7 +419,7 @@ public void testGetPkValuesByAuto_GeneratedKeys_HasResult() throws SQLException when(resultSet.getObject(1)).thenReturn(PK_VALUE); List pkValues = new ArrayList<>(); pkValues.add(PK_VALUE); - Map> pkValuesList = insertExecutor.getPkValuesByAuto(); + Map> pkValuesList = insertExecutor.getPkValuesByAuto(); Assertions.assertIterableEquals(pkValuesList.get(ID_COLUMN), pkValues); } @@ -442,7 +440,7 @@ public void testGetPkValuesByAuto_ExecuteQuery_HasResult() throws SQLException { when(resultSet.getObject(1)).thenReturn(PK_VALUE); List pkValues = new ArrayList<>(); pkValues.add(PK_VALUE); - Map> pkValuesList = insertExecutor.getPkValuesByAuto(); + Map> pkValuesList = insertExecutor.getPkValuesByAuto(); Assertions.assertIterableEquals(pkValuesList.get(ID_COLUMN), pkValues); } @@ -456,13 +454,12 @@ public void test_getPkIndex() { @Test - public void test_checkPkValuesForMultiPk() - { - Map> pkValues = new HashMap<>(); + public void test_checkPkValuesForMultiPk() { + Map> pkValues = new HashMap<>(); List pkValues1 = new ArrayList(); List pkValues2 = new ArrayList(); - pkValues.put("id",pkValues1); - pkValues.put("userCode",pkValues2); + pkValues.put("id", pkValues1); + pkValues.put("userCode", pkValues2); //all pk support value pkValues1.add(1); @@ -692,7 +689,15 @@ public void test_checkPkValues() { } @Test - public void test_autoGeneratePks() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + public void test_autoGeneratePks() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException { + MySQLDataSourceProxyMetadata dataSourceProxyMetadata = (MySQLDataSourceProxyMetadata) + insertExecutor.statementProxy.getConnectionProxy().getDataSourceProxy().getDataSourceProxyMetadata(); + Map variables = new HashMap<>(); + variables.put("auto_increment_increment", "1"); + Field variablesField = dataSourceProxyMetadata.getClass().getDeclaredField("variables"); + variablesField.setAccessible(true); + variablesField.set(dataSourceProxyMetadata, variables); + Method method = MySQLInsertExecutor.class.getDeclaredMethod("autoGeneratePks", new Class[]{BigDecimal.class, String.class, Integer.class}); method.setAccessible(true); Object resp = method.invoke(insertExecutor, BigDecimal.ONE, "ID", 3); @@ -716,7 +721,7 @@ private List mockInsertColumns() { } private void mockParameters() { - Map> paramters = new HashMap<>(4); + Map> paramters = new HashMap<>(4); ArrayList arrayList0 = new ArrayList<>(); arrayList0.add(PK_VALUE); ArrayList arrayList1 = new ArrayList<>(); @@ -734,7 +739,7 @@ private void mockParameters() { } private void mockParametersPkWithNull() { - Map> parameters = new HashMap<>(4); + Map> parameters = new HashMap<>(4); ArrayList arrayList0 = new ArrayList<>(); arrayList0.add(Null.get()); ArrayList arrayList1 = new ArrayList<>(); @@ -752,7 +757,7 @@ private void mockParametersPkWithNull() { } private void mockParametersOfOnePk() { - Map> paramters = new HashMap<>(4); + Map> paramters = new HashMap<>(4); ArrayList arrayList1 = new ArrayList<>(); arrayList1.add(PK_VALUE); paramters.put(1, arrayList1); diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PlainExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PlainExecutorTest.java index 6e2d0f76a70..3e1bceecca4 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PlainExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PlainExecutorTest.java @@ -19,6 +19,8 @@ import java.sql.SQLException; import java.sql.Types; import java.util.List; +import java.util.Properties; + import com.alibaba.druid.mock.MockStatement; import com.alibaba.druid.mock.MockStatementBase; import com.alibaba.druid.pool.DruidDataSource; @@ -56,6 +58,7 @@ public void init() throws SQLException { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mock:xxx"); dataSource.setDriver(mockDriver); + dataSource.setConnectProperties(new Properties()); DataSourceProxy dataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource); diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PolarDBXInsertExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PolarDBXInsertExecutorTest.java index 1bfe853f421..0e0d6a57446 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PolarDBXInsertExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/PolarDBXInsertExecutorTest.java @@ -19,9 +19,9 @@ import java.lang.reflect.Field; import java.sql.SQLException; import java.sql.Types; -import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Properties; import com.alibaba.druid.mock.MockStatement; import com.alibaba.druid.mock.MockStatementBase; @@ -32,10 +32,9 @@ import org.apache.seata.rm.datasource.DataSourceProxyTest; import org.apache.seata.rm.datasource.PreparedStatementProxy; import org.apache.seata.rm.datasource.StatementProxy; -import org.apache.seata.rm.datasource.exec.StatementCallback; import org.apache.seata.rm.datasource.exec.polardbx.PolarDBXInsertExecutor; +import org.apache.seata.rm.datasource.metadata.MySQLDataSourceProxyMetadata; import org.apache.seata.rm.datasource.mock.MockDriver; -import org.apache.seata.rm.datasource.mock.MockResultSet; import org.apache.seata.sqlparser.SQLInsertRecognizer; import org.apache.seata.sqlparser.struct.TableMeta; import org.apache.seata.sqlparser.util.JdbcConstants; @@ -47,7 +46,6 @@ /** * Insert executor test for PolarDB-X - * */ public class PolarDBXInsertExecutorTest extends MySQLInsertExecutorTest { @BeforeEach @@ -59,6 +57,9 @@ public void init() throws SQLException { DataSourceProxy dataSourceProxy = mock(DataSourceProxy.class); when(dataSourceProxy.getResourceId()).thenReturn("jdbc:mysql://127.0.0.1:3306/seata"); when(dataSourceProxy.getDbType()).thenReturn(JdbcConstants.POLARDBX); + MySQLDataSourceProxyMetadata dataSourceProxyMetadata = Mockito.mock(MySQLDataSourceProxyMetadata.class); + when(dataSourceProxyMetadata.getVariableValue("auto_increment_increment")).thenReturn("1"); + when(dataSourceProxy.getDataSourceProxyMetadata()).thenReturn(dataSourceProxyMetadata); when(connectionProxy.getDataSourceProxy()).thenReturn(dataSourceProxy); @@ -66,10 +67,6 @@ public void init() throws SQLException { when(statementProxy.getConnectionProxy()).thenReturn(connectionProxy); when(statementProxy.getTargetStatement()).thenReturn(statementProxy); - MockResultSet resultSet = new MockResultSet(statementProxy); - resultSet.mockResultSet(Arrays.asList("Variable_name", "Value"), new Object[][]{{"auto_increment_increment", "1"}}); - when(statementProxy.getTargetStatement().executeQuery("SHOW VARIABLES LIKE 'auto_increment_increment'")).thenReturn(resultSet); - StatementCallback statementCallback = mock(StatementCallback.class); sqlInsertRecognizer = mock(SQLInsertRecognizer.class); tableMeta = mock(TableMeta.class); @@ -83,21 +80,21 @@ public void init() throws SQLException { // new test init property List returnValueColumnLabels = Lists.newArrayList("id", "user_id", "name", "sex", "update_time"); - Object[][] returnValue = new Object[][] { - new Object[] {1, 1, "will", 1, 0}, + Object[][] returnValue = new Object[][]{ + new Object[]{1, 1, "will", 1, 0}, }; - Object[][] columnMetas = new Object[][] { - new Object[] {"", "", "table_insert_executor_test", "id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "user_id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "name", Types.VARCHAR, "VARCHAR", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "sex", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, - new Object[] {"", "", "table_insert_executor_test", "update_time", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "YES", "NO"}, + Object[][] columnMetas = new Object[][]{ + new Object[]{"", "", "table_insert_executor_test", "id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "user_id", Types.INTEGER, "INTEGER", 64, 0, 10, 1, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "name", Types.VARCHAR, "VARCHAR", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "sex", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "NO", "NO"}, + new Object[]{"", "", "table_insert_executor_test", "update_time", Types.INTEGER, "INTEGER", 64, 0, 10, 0, "", "", 0, 0, 64, 2, "YES", "NO"}, }; - Object[][] indexMetas = new Object[][] { - new Object[] {"PRIMARY", "id", false, "", 3, 1, "A", 34}, - new Object[] {"PRIMARY", "user_id", false, "", 3, 1, "A", 34}, + Object[][] indexMetas = new Object[][]{ + new Object[]{"PRIMARY", "id", false, "", 3, 1, "A", 34}, + new Object[]{"PRIMARY", "user_id", false, "", 3, 1, "A", 34}, }; - Object[][] onUpdateColumnsReturnValue = new Object[][] { + Object[][] onUpdateColumnsReturnValue = new Object[][]{ new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0} }; @@ -105,6 +102,7 @@ public void init() throws SQLException { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mock:xxx"); dataSource.setDriver(mockDriver); + dataSource.setConnectProperties(new Properties()); DataSourceProxy newDataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource); try { diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/SelectForUpdateExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/SelectForUpdateExecutorTest.java index ca3dc38fd5a..e19320031a9 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/SelectForUpdateExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/SelectForUpdateExecutorTest.java @@ -19,6 +19,8 @@ import java.lang.reflect.Field; import java.sql.Types; import java.util.List; +import java.util.Properties; + import com.alibaba.druid.mock.MockStatement; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.sql.SQLUtils; @@ -70,6 +72,7 @@ public static void init() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mock:xxx"); dataSource.setDriver(mockDriver); + dataSource.setConnectProperties(new Properties()); DataSourceProxy dataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource); diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateExecutorTest.java index 9de02c73654..f3509118930 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateExecutorTest.java @@ -20,6 +20,7 @@ import java.sql.SQLException; import java.sql.Types; import java.util.List; +import java.util.Properties; import com.alibaba.druid.mock.MockStatement; import com.alibaba.druid.mock.MockStatementBase; @@ -72,6 +73,7 @@ public static void init() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mock:xxx"); dataSource.setDriver(mockDriver); + dataSource.setConnectProperties(new Properties()); DataSourceProxy dataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource); diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateJoinExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateJoinExecutorTest.java index 3a8ce32ce72..4baff651859 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateJoinExecutorTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/exec/UpdateJoinExecutorTest.java @@ -20,6 +20,7 @@ import java.sql.SQLException; import java.sql.Types; import java.util.List; +import java.util.Properties; import com.alibaba.druid.mock.MockStatement; import com.alibaba.druid.mock.MockStatementBase; @@ -81,6 +82,7 @@ private StatementProxy mockStatementProxy(List returnValueColumnLabels, DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mock:xxx"); dataSource.setDriver(mockDriver); + dataSource.setConnectProperties(new Properties()); DataSourceProxy dataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource); try { diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockDataSource.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockDataSource.java index c9f946446f6..90580b7261e 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockDataSource.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockDataSource.java @@ -21,13 +21,14 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; import java.util.logging.Logger; public class MockDataSource implements DataSource { @Override public Connection getConnection() throws SQLException { - return new MockConnection(new MockDriver(), "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true", null); + return new MockConnection(new MockDriver(), "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true", new Properties()); } @Override diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockMariadbDataSource.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockMariadbDataSource.java index cdc28780c6e..15ae9d3d265 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockMariadbDataSource.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/mock/MockMariadbDataSource.java @@ -18,11 +18,12 @@ import java.sql.Connection; import java.sql.SQLException; +import java.util.Properties; public class MockMariadbDataSource extends MockDataSource { @Override public Connection getConnection() throws SQLException { - return new MockConnection(new MockDriver(), "jdbc:mariadb://127.0.0.1:3306/seata?rewriteBatchedStatements=true", null); + return new MockConnection(new MockDriver(), "jdbc:mariadb://127.0.0.1:3306/seata?rewriteBatchedStatements=true", new Properties()); } }