Skip to content

Commit

Permalink
[Bug] [connector-jdbc] Nullable Column source have null data could be…
Browse files Browse the repository at this point in the history
… unexpected results. (apache#5560) (#5)

Co-authored-by: MoSence <[email protected]>
  • Loading branch information
tedshim and mosence authored Dec 1, 2023
1 parent 1233aa0 commit 3be869e
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;

import java.math.BigDecimal;
import java.sql.Date;
Expand Down Expand Up @@ -51,51 +52,51 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S
int resultSetIndex = fieldIndex + 1;
switch (seaTunnelDataType.getSqlType()) {
case STRING:
fields[fieldIndex] = rs.getString(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex);
break;
case BOOLEAN:
fields[fieldIndex] = rs.getBoolean(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex);
break;
case TINYINT:
fields[fieldIndex] = rs.getByte(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
break;
case SMALLINT:
fields[fieldIndex] = rs.getShort(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex);
break;
case INT:
fields[fieldIndex] = rs.getInt(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
break;
case BIGINT:
fields[fieldIndex] = rs.getLong(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
break;
case FLOAT:
fields[fieldIndex] = rs.getFloat(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex);
break;
case DOUBLE:
fields[fieldIndex] = rs.getDouble(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex);
break;
case DECIMAL:
fields[fieldIndex] = rs.getBigDecimal(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex);
break;
case DATE:
Date sqlDate = rs.getDate(resultSetIndex);
Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null);
break;
case TIME:
Time sqlTime = rs.getTime(resultSetIndex);
Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex);
Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
case BYTES:
fields[fieldIndex] = rs.getBytes(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

public class DatabaseIdentifier {
public static final String DB_2 = "DB2";
public static final String DAMENG = "Dameng";
public static final String GBASE_8A = "Gbase8a";
public static final String HIVE = "HIVE";
public static final String INFORMIX = "Informix";
public static final String KINGBASE = "KingBase";
public static final String MYSQL = "MySQL";
public static final String ORACLE = "Oracle";
public static final String PHOENIX = "Phoenix";
public static final String POSTGRESQL = "Postgres";
public static final String REDSHIFT = "Redshift";
public static final String SAP_HANA = "SapHana";
public static final String SNOWFLAKE = "Snowflake";
public static final String SQLITE = "Sqlite";
public static final String SQLSERVER = "SqlServer";
public static final String TABLE_STORE = "Tablestore";
public static final String TERADATA = "Teradata";
public static final String VERTICA = "Vertica";
public static final String OCENABASE = "OceanBase";
public static final String TIDB = "TiDB";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Optional;

public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return DatabaseIdentifier.KINGBASE;
}

@Override
@SuppressWarnings("checkstyle:Indentation")
public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
Object[] fields = new Object[typeInfo.getTotalFields()];
for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
int resultSetIndex = fieldIndex + 1;
switch (seaTunnelDataType.getSqlType()) {
case STRING:
fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex);
break;
case BOOLEAN:
fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex);
break;
case TINYINT:
fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
break;
case SMALLINT:
fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex);
break;
case INT:
fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
break;
case BIGINT:
fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
break;
case FLOAT:
fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex);
break;
case DOUBLE:
fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex);
break;
case DECIMAL:
fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex);
break;
case DATE:
Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(Date::toLocalDate).orElse(null);
break;
case TIME:
Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(Time::toLocalTime).orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(Timestamp::toLocalDateTime)
.orElse(null);
break;
case BYTES:
fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
break;
case ROW:
case MAP:
case ARRAY:
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + seaTunnelDataType);
}
}
return new SeaTunnelRow(fields);
}

@Override
public PreparedStatement toExternal(
SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement)
throws SQLException {
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
int statementIndex = fieldIndex + 1;
Object fieldValue = row.getField(fieldIndex);
if (fieldValue == null) {
statement.setObject(statementIndex, null);
continue;
}

switch (seaTunnelDataType.getSqlType()) {
case STRING:
statement.setString(statementIndex, (String) row.getField(fieldIndex));
break;
case BOOLEAN:
statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex));
break;
case TINYINT:
statement.setByte(statementIndex, (Byte) row.getField(fieldIndex));
break;
case SMALLINT:
statement.setShort(statementIndex, (Short) row.getField(fieldIndex));
break;
case INT:
statement.setInt(statementIndex, (Integer) row.getField(fieldIndex));
break;
case BIGINT:
statement.setLong(statementIndex, (Long) row.getField(fieldIndex));
break;
case FLOAT:
statement.setFloat(statementIndex, (Float) row.getField(fieldIndex));
break;
case DOUBLE:
statement.setDouble(statementIndex, (Double) row.getField(fieldIndex));
break;
case DECIMAL:
statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex));
break;
case DATE:
LocalDate localDate = (LocalDate) row.getField(fieldIndex);
statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));
break;
case TIME:
LocalTime localTime = (LocalTime) row.getField(fieldIndex);
statement.setTime(statementIndex, java.sql.Time.valueOf(localTime));
break;
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
statement.setTimestamp(
statementIndex, java.sql.Timestamp.valueOf(localDateTime));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));
break;
case NULL:
statement.setNull(statementIndex, java.sql.Types.NULL);
break;
case ROW:
case MAP:
case ARRAY:
default:
throw new JdbcConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + seaTunnelDataType);
}
}
return statement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;

import java.sql.Date;
import java.sql.ResultSet;
Expand All @@ -39,11 +41,10 @@ public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return null;
return DatabaseIdentifier.POSTGRESQL;
}

@Override
@SuppressWarnings("checkstyle:Indentation")
public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException {
Object[] fields = new Object[typeInfo.getTotalFields()];
for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
Expand All @@ -60,52 +61,52 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S
? null
: rs.getObject(resultSetIndex).toString();
} else {
fields[fieldIndex] = rs.getString(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getString(rs, resultSetIndex);
}
break;
case BOOLEAN:
fields[fieldIndex] = rs.getBoolean(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getBoolean(rs, resultSetIndex);
break;
case TINYINT:
fields[fieldIndex] = rs.getByte(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
break;
case SMALLINT:
fields[fieldIndex] = rs.getShort(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getShort(rs, resultSetIndex);
break;
case INT:
fields[fieldIndex] = rs.getInt(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
break;
case BIGINT:
fields[fieldIndex] = rs.getLong(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
break;
case FLOAT:
fields[fieldIndex] = rs.getFloat(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getFloat(rs, resultSetIndex);
break;
case DOUBLE:
fields[fieldIndex] = rs.getDouble(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getDouble(rs, resultSetIndex);
break;
case DECIMAL:
fields[fieldIndex] = rs.getBigDecimal(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getBigDecimal(rs, resultSetIndex);
break;
case DATE:
Date sqlDate = rs.getDate(resultSetIndex);
Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null);
break;
case TIME:
Time sqlTime = rs.getTime(resultSetIndex);
Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex);
Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
case BYTES:
fields[fieldIndex] = rs.getBytes(resultSetIndex);
fields[fieldIndex] = JdbcUtils.getBytes(rs, resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
Expand Down
Loading

0 comments on commit 3be869e

Please sign in to comment.