From 3be869e42af79029e3bedaf8ff8337e10d5688bd Mon Sep 17 00:00:00 2001 From: "ted.shim" <79561371+tedshim@users.noreply.github.com> Date: Fri, 1 Dec 2023 18:57:01 +0900 Subject: [PATCH] [Bug] [connector-jdbc] Nullable Column source have null data could be unexpected results. (#5560) (#5) Co-authored-by: MoSence --- .../converter/AbstractJdbcRowConverter.java | 27 +-- .../internal/dialect/DatabaseIdentifier.java | 41 ++++ .../kingbase/KingbaseJdbcRowConverter.java | 189 ++++++++++++++++++ .../psql/PostgresJdbcRowConverter.java | 31 +-- .../seatunnel/jdbc/utils/JdbcUtils.java | 106 ++++++++++ 5 files changed, 366 insertions(+), 28 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java index 07aa7959946..a247a883c31 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; import java.math.BigDecimal; import java.sql.Date; @@ -51,51 +52,51 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S int resultSetIndex = fieldIndex + 1; switch (seaTunnelDataType.getSqlType()) { case STRING: - fields[fieldIndex] = rs.getString(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex); break; case BOOLEAN: - fields[fieldIndex] = rs.getBoolean(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex); break; case TINYINT: - fields[fieldIndex] = rs.getByte(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex); break; case SMALLINT: - fields[fieldIndex] = rs.getShort(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex); break; case INT: - fields[fieldIndex] = rs.getInt(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex); break; case BIGINT: - fields[fieldIndex] = rs.getLong(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex); break; case FLOAT: - fields[fieldIndex] = rs.getFloat(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex); break; case DOUBLE: - fields[fieldIndex] = rs.getDouble(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex); break; case DECIMAL: - fields[fieldIndex] = rs.getBigDecimal(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex); break; case DATE: - Date sqlDate = rs.getDate(resultSetIndex); + Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null); break; case TIME: - Time sqlTime = rs.getTime(resultSetIndex); + Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null); break; case TIMESTAMP: - Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex); + Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTimestamp) .map(e -> e.toLocalDateTime()) .orElse(null); break; case BYTES: - fields[fieldIndex] = rs.getBytes(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex); break; case NULL: fields[fieldIndex] = null; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java new file mode 100644 index 00000000000..3b1738afb27 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; + +public class DatabaseIdentifier { + public static final String DB_2 = "DB2"; + public static final String DAMENG = "Dameng"; + public static final String GBASE_8A = "Gbase8a"; + public static final String HIVE = "HIVE"; + public static final String INFORMIX = "Informix"; + public static final String KINGBASE = "KingBase"; + public static final String MYSQL = "MySQL"; + public static final String ORACLE = "Oracle"; + public static final String PHOENIX = "Phoenix"; + public static final String POSTGRESQL = "Postgres"; + public static final String REDSHIFT = "Redshift"; + public static final String SAP_HANA = "SapHana"; + public static final String SNOWFLAKE = "Snowflake"; + public static final String SQLITE = "Sqlite"; + public static final String SQLSERVER = "SqlServer"; + public static final String TABLE_STORE = "Tablestore"; + public static final String TERADATA = "Teradata"; + public static final String VERTICA = "Vertica"; + public static final String OCENABASE = "OceanBase"; + public static final String TIDB = "TiDB"; +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java new file mode 100644 index 00000000000..1bf590e69ae --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Optional; + +public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return DatabaseIdentifier.KINGBASE; + } + + @Override + @SuppressWarnings("checkstyle:Indentation") + public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException { + Object[] fields = new Object[typeInfo.getTotalFields()]; + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + SeaTunnelDataType seaTunnelDataType = typeInfo.getFieldType(fieldIndex); + int resultSetIndex = fieldIndex + 1; + switch (seaTunnelDataType.getSqlType()) { + case STRING: + fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex); + break; + case BOOLEAN: + fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex); + break; + case TINYINT: + fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex); + break; + case SMALLINT: + fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex); + break; + case INT: + fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex); + break; + case BIGINT: + fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex); + break; + case FLOAT: + fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex); + break; + case DOUBLE: + fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex); + break; + case DECIMAL: + fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex); + break; + case DATE: + Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlDate).map(Date::toLocalDate).orElse(null); + break; + case TIME: + Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlTime).map(Time::toLocalTime).orElse(null); + break; + case TIMESTAMP: + Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlTimestamp) + .map(Timestamp::toLocalDateTime) + .orElse(null); + break; + case BYTES: + fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex); + break; + case NULL: + fields[fieldIndex] = null; + break; + case ROW: + case MAP: + case ARRAY: + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType); + } + } + return new SeaTunnelRow(fields); + } + + @Override + public PreparedStatement toExternal( + SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement) + throws SQLException { + for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { + SeaTunnelDataType seaTunnelDataType = rowType.getFieldType(fieldIndex); + int statementIndex = fieldIndex + 1; + Object fieldValue = row.getField(fieldIndex); + if (fieldValue == null) { + statement.setObject(statementIndex, null); + continue; + } + + switch (seaTunnelDataType.getSqlType()) { + case STRING: + statement.setString(statementIndex, (String) row.getField(fieldIndex)); + break; + case BOOLEAN: + statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex)); + break; + case TINYINT: + statement.setByte(statementIndex, (Byte) row.getField(fieldIndex)); + break; + case SMALLINT: + statement.setShort(statementIndex, (Short) row.getField(fieldIndex)); + break; + case INT: + statement.setInt(statementIndex, (Integer) row.getField(fieldIndex)); + break; + case BIGINT: + statement.setLong(statementIndex, (Long) row.getField(fieldIndex)); + break; + case FLOAT: + statement.setFloat(statementIndex, (Float) row.getField(fieldIndex)); + break; + case DOUBLE: + statement.setDouble(statementIndex, (Double) row.getField(fieldIndex)); + break; + case DECIMAL: + statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex)); + break; + case DATE: + LocalDate localDate = (LocalDate) row.getField(fieldIndex); + statement.setDate(statementIndex, java.sql.Date.valueOf(localDate)); + break; + case TIME: + LocalTime localTime = (LocalTime) row.getField(fieldIndex); + statement.setTime(statementIndex, java.sql.Time.valueOf(localTime)); + break; + case TIMESTAMP: + LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex); + statement.setTimestamp( + statementIndex, java.sql.Timestamp.valueOf(localDateTime)); + break; + case BYTES: + statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex)); + break; + case NULL: + statement.setNull(statementIndex, java.sql.Types.NULL); + break; + case ROW: + case MAP: + case ARRAY: + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType); + } + } + return statement; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java index 2b7dc47a9e0..26b56589a8c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java @@ -23,6 +23,8 @@ import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; import java.sql.Date; import java.sql.ResultSet; @@ -39,11 +41,10 @@ public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter { @Override public String converterName() { - return null; + return DatabaseIdentifier.POSTGRESQL; } @Override - @SuppressWarnings("checkstyle:Indentation") public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException { Object[] fields = new Object[typeInfo.getTotalFields()]; for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { @@ -60,52 +61,52 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S ? null : rs.getObject(resultSetIndex).toString(); } else { - fields[fieldIndex] = rs.getString(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex); } break; case BOOLEAN: - fields[fieldIndex] = rs.getBoolean(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex); break; case TINYINT: - fields[fieldIndex] = rs.getByte(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex); break; case SMALLINT: - fields[fieldIndex] = rs.getShort(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex); break; case INT: - fields[fieldIndex] = rs.getInt(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex); break; case BIGINT: - fields[fieldIndex] = rs.getLong(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex); break; case FLOAT: - fields[fieldIndex] = rs.getFloat(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex); break; case DOUBLE: - fields[fieldIndex] = rs.getDouble(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex); break; case DECIMAL: - fields[fieldIndex] = rs.getBigDecimal(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex); break; case DATE: - Date sqlDate = rs.getDate(resultSetIndex); + Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null); break; case TIME: - Time sqlTime = rs.getTime(resultSetIndex); + Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null); break; case TIMESTAMP: - Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex); + Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex); fields[fieldIndex] = Optional.ofNullable(sqlTimestamp) .map(e -> e.toLocalDateTime()) .orElse(null); break; case BYTES: - fields[fieldIndex] = rs.getBytes(resultSetIndex); + fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex); break; case NULL: fields[fieldIndex] = null; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java new file mode 100644 index 00000000000..b9f7f1eac3f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.jdbc.utils; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; + +public final class JdbcUtils { + + private JdbcUtils() {} + + public static String getString(ResultSet resultSet, int columnIndex) throws SQLException { + return resultSet.getString(columnIndex); + } + + public static Boolean getBoolean(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getBoolean(columnIndex); + } + + public static Byte getByte(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getByte(columnIndex); + } + + public static Short getShort(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getShort(columnIndex); + } + + public static Integer getInt(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getInt(columnIndex); + } + + public static Long getLong(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getLong(columnIndex); + } + + public static Float getFloat(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getFloat(columnIndex); + } + + public static Double getDouble(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getDouble(columnIndex); + } + + public static BigDecimal getBigDecimal(ResultSet resultSet, int columnIndex) + throws SQLException { + return resultSet.getBigDecimal(columnIndex); + } + + public static Date getDate(ResultSet resultSet, int columnIndex) throws SQLException { + return resultSet.getDate(columnIndex); + } + + public static Time getTime(ResultSet resultSet, int columnIndex) throws SQLException { + return resultSet.getTime(columnIndex); + } + + public static Timestamp getTimestamp(ResultSet resultSet, int columnIndex) throws SQLException { + return resultSet.getTimestamp(columnIndex); + } + + public static byte[] getBytes(ResultSet resultSet, int columnIndex) throws SQLException { + if (null == resultSet.getObject(columnIndex)) { + return null; + } + return resultSet.getBytes(columnIndex); + } +}