diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecord.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecord.java index 7639a2fa..4d7bafe3 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecord.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecord.java @@ -112,10 +112,11 @@ static SqlFunction computeMapping( } else { return resultSet -> nullableBytes(resultSet.getBytes(column)); } + case ARRAY: + return resultSet -> resultSet.getArray(column); case BINARY: case VARBINARY: case LONGVARBINARY: - case ARRAY: case BLOB: return resultSet -> nullableBytes(resultSet.getBytes(column)); case DOUBLE: diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java index b657778e..cc74d66b 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java @@ -97,24 +97,39 @@ public ByteBuffer convertResultSetIntoAvroBytes() throws SQLException, IOExcepti binaryEncoder.writeNull(); } else { binaryEncoder.writeIndex(1); - if (value instanceof String) { - binaryEncoder.writeString((String) value); - } else if (value instanceof Long) { - binaryEncoder.writeLong((Long) value); - } else if (value instanceof Integer) { - binaryEncoder.writeInt((Integer) value); - } else if (value instanceof Boolean) { - binaryEncoder.writeBoolean((Boolean) value); - } else if (value instanceof ByteBuffer) { - binaryEncoder.writeBytes((ByteBuffer) value); - } else if (value instanceof Double) { - binaryEncoder.writeDouble((Double) value); - } else if (value instanceof Float) { - binaryEncoder.writeFloat((Float) value); - } + writeValue(value, binaryEncoder); } } binaryEncoder.flush(); return ByteBuffer.wrap(out.getBufffer(), 0, out.size()); } + + private void writeValue(Object value, BinaryEncoder binaryEncoder) + throws SQLException, IOException { + if (value instanceof String) { + binaryEncoder.writeString((String) value); + } else if (value instanceof Long) { + binaryEncoder.writeLong((Long) value); + } else if (value instanceof Integer) { + binaryEncoder.writeInt((Integer) value); + } else if (value instanceof Boolean) { + binaryEncoder.writeBoolean((Boolean) value); + } else if (value instanceof ByteBuffer) { + binaryEncoder.writeBytes((ByteBuffer) value); + } else if (value instanceof Double) { + binaryEncoder.writeDouble((Double) value); + } else if (value instanceof Float) { + binaryEncoder.writeFloat((Float) value); + } else if (value instanceof java.sql.Array) { + binaryEncoder.writeArrayStart(); + Object[] array = (Object[]) ((java.sql.Array) value).getArray(); + binaryEncoder.setItemCount(array.length); + for (Object arrayItem : array) { + binaryEncoder.startItem(); + writeValue(arrayItem, binaryEncoder); + } + + binaryEncoder.writeArrayEnd(); + } + } } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java index 9b07d4c0..1e846b6a 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java @@ -74,6 +74,8 @@ public static Schema createSchemaByReadingOneRow( try (Statement statement = connection.createStatement()) { final ResultSet resultSet = statement.executeQuery(queryBuilderArgs.sqlQueryWithLimitOne()); + resultSet.next(); + final Schema schema = createAvroSchema( resultSet, @@ -107,7 +109,7 @@ public static Schema createAvroSchema( .prop("tableName", tableName) .prop("connectionUrl", connectionUrl) .fields(); - return createAvroFields(meta, builder, useLogicalTypes).endRecord(); + return createAvroFields(resultSet, builder, useLogicalTypes).endRecord(); } static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException { @@ -123,11 +125,13 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep } private static SchemaBuilder.FieldAssembler createAvroFields( - final ResultSetMetaData meta, - final SchemaBuilder.FieldAssembler builder, + final ResultSet resultSet, + final SchemaBuilder.FieldAssembler builder, final boolean useLogicalTypes) throws SQLException { + ResultSetMetaData meta = resultSet.getMetaData(); + for (int i = 1; i <= meta.getColumnCount(); i++) { final String columnName; @@ -140,7 +144,8 @@ private static SchemaBuilder.FieldAssembler createAvroFields( final int columnType = meta.getColumnType(i); final String typeName = JDBCType.valueOf(columnType).getName(); final String columnClassName = meta.getColumnClassName(i); - final SchemaBuilder.FieldBuilder field = + final String columnTypeName = meta.getColumnTypeName(i); + SchemaBuilder.FieldBuilder field = builder .name(normalizeForAvro(columnName)) .doc(String.format("From sqlType %d %s (%s)", columnType, typeName, columnClassName)) @@ -149,13 +154,21 @@ private static SchemaBuilder.FieldAssembler createAvroFields( .prop("typeName", typeName) .prop("columnClassName", columnClassName); + if (columnTypeName != null) { + field = field.prop("columnTypeName", columnTypeName); + } + final SchemaBuilder.BaseTypeBuilder< SchemaBuilder.UnionAccumulator>> fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and(); + Integer arrayItemType = resultSet.isFirst() && columnType == ARRAY + ? resultSet.getArray(i).getBaseType() : null; + final SchemaBuilder.UnionAccumulator> schemaFieldAssembler = setAvroColumnType( columnType, + arrayItemType, meta.getPrecision(i), columnClassName, useLogicalTypes, @@ -181,6 +194,7 @@ private static SchemaBuilder.FieldAssembler createAvroFields( private static SchemaBuilder.UnionAccumulator> setAvroColumnType( final int columnType, + final Integer arrayItemType, final int precision, final String columnClassName, final boolean useLogicalTypes, @@ -225,10 +239,12 @@ private static SchemaBuilder.FieldAssembler createAvroFields( } else { return field.bytesType(); } + case ARRAY: + return setAvroColumnType(arrayItemType, null, precision, columnClassName, + useLogicalTypes, field.array().items()); case BINARY: case VARBINARY: case LONGVARBINARY: - case ARRAY: case BLOB: return field.bytesType(); case DOUBLE: diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java b/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java index 310f4bd3..af181643 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java @@ -22,9 +22,12 @@ import com.google.auto.value.AutoValue; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; // A fictitious DB model to test different SQL types @AutoValue @@ -42,7 +45,9 @@ public static Coffee create( final java.sql.Timestamp created, final Optional updated, final UUID uid, - final Long rownum) { + final Long rownum, + final List intArr, + final List textArr) { return new AutoValue_Coffee( name, supId, @@ -55,7 +60,9 @@ public static Coffee create( created, updated, uid, - rownum); + rownum, + new ArrayList<>(intArr), + new ArrayList<>(textArr)); } public abstract String name(); @@ -82,10 +89,15 @@ public static Coffee create( public abstract Long rownum(); + public abstract List intArr(); + + public abstract List textArr(); + public String insertStatement() { return String.format( Locale.ENGLISH, - "INSERT INTO COFFEES " + "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d)", + "INSERT INTO COFFEES " + "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d," + + " ARRAY [%s], ARRAY ['%s'])", name(), supId().orElse(null), price().toString(), @@ -97,7 +109,9 @@ public String insertStatement() { created(), updated().orElse(null), uid(), - rownum()); + rownum(), + String.join(",", intArr().stream().map(x -> (CharSequence) x.toString())::iterator), + String.join("','", textArr())); } public static String ddl() { @@ -114,7 +128,9 @@ public static String ddl() { + "\"CREATED\" TIMESTAMP NOT NULL," + "\"UPDATED\" TIMESTAMP," + "\"UID\" UUID NOT NULL," - + "\"ROWNUM\" BIGINT NOT NULL);"; + + "\"ROWNUM\" BIGINT NOT NULL," + + "\"INT_ARR\" INTEGER ARRAY NOT NULL," + + "\"TEXT_ARR\" VARCHAR ARRAY NOT NULL);"; } public static Coffee COFFEE1 = @@ -130,7 +146,19 @@ public static String ddl() { new java.sql.Timestamp(1488300933000L), Optional.empty(), UUID.fromString("123e4567-e89b-12d3-a456-426655440000"), - 1L); + 1L, + new ArrayList() {{ + add(5); + add(7); + add(11); + }}, + new ArrayList() {{ + add("rock"); + add("scissors"); + add("paper"); + }} + ); + public static Coffee COFFEE2 = create( "colombian caffee", @@ -144,5 +172,16 @@ public static String ddl() { new java.sql.Timestamp(1488300723000L), Optional.empty(), UUID.fromString("123e4567-e89b-a456-12d3-426655440000"), - 2L); + 2L, + new ArrayList() {{ + add(7); + add(11); + add(23); + }}, + new ArrayList() {{ + add("scissors"); + add("paper"); + add("rock"); + }} + ); } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java index 2004b4fd..0ea2b6f3 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java @@ -34,6 +34,9 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -42,9 +45,11 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -62,7 +67,7 @@ public static void beforeAll() throws SQLException, ClassNotFoundException { @Test public void shouldCreateSchema() throws ClassNotFoundException, SQLException { - final int fieldCount = 12; + final int fieldCount = 14; final Schema actual = JdbcAvroSchema.createSchemaByReadingOneRow( DbTestHelper.createConnection(CONNECTION_URL), @@ -92,7 +97,9 @@ public void shouldCreateSchema() throws ClassNotFoundException, SQLException { "CREATED", "UPDATED", "UID", - "ROWNUM"), + "ROWNUM", + "INT_ARR", + "TEXT_ARR"), actual.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())); for (Schema.Field f : actual.getFields()) { Assert.assertEquals(Schema.Type.UNION, f.schema().getType()); @@ -128,7 +135,7 @@ public void shouldCreateSchema() throws ClassNotFoundException, SQLException { @Test public void shouldCreateSchemaWithLogicalTypes() throws ClassNotFoundException, SQLException { - final int fieldCount = 12; + final int fieldCount = 14; final Schema actual = JdbcAvroSchema.createSchemaByReadingOneRow( DbTestHelper.createConnection(CONNECTION_URL), @@ -163,8 +170,10 @@ public void shouldEncodeResultSetToValidAvro() throws ClassNotFoundException, SQLException, IOException { final ResultSet rs = DbTestHelper.createConnection(CONNECTION_URL) - .createStatement() + .createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY) .executeQuery("SELECT * FROM COFFEES"); + + rs.first(); final Schema schema = JdbcAvroSchema.createAvroSchema( rs, "dbeam_generated", "connection", Optional.empty(), "doc", false); @@ -173,6 +182,7 @@ public void shouldEncodeResultSetToValidAvro() new DataFileWriter<>(new GenericDatumWriter<>(schema)); final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); dataFileWriter.create(schema, outputStream); + rs.previous(); // convert and write while (rs.next()) { dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes()); @@ -194,8 +204,11 @@ public void shouldEncodeResultSetToValidAvro() .findFirst() .orElseThrow(() -> new IllegalArgumentException("not found")); - Assert.assertEquals(12, record.getSchema().getFields().size()); + Assert.assertEquals(14, record.getSchema().getFields().size()); Assert.assertEquals(schema, record.getSchema()); + List actualTxtArray = + ((GenericData.Array) record.get(13)).stream().map(x -> x.toString()).collect( + Collectors.toList()); final Coffee actual = Coffee.create( record.get(0).toString(), @@ -209,7 +222,9 @@ public void shouldEncodeResultSetToValidAvro() new java.sql.Timestamp((Long) record.get(8)), Optional.ofNullable((Long) record.get(9)).map(Timestamp::new), TestHelper.byteBufferToUuid((ByteBuffer) record.get(10)), - (Long) record.get(11)); + (Long) record.get(11), + new ArrayList<>((GenericData.Array) record.get(12)), + actualTxtArray); Assert.assertEquals(Coffee.COFFEE1, actual); } diff --git a/docs/type-conversion.md b/docs/type-conversion.md index fcbc9e43..cad81e97 100644 --- a/docs/type-conversion.md +++ b/docs/type-conversion.md @@ -5,31 +5,31 @@ When applicable and `--useAvroLogicalTypes` parameter is set to `true`, Avro log To represent nullable columns, unions with the Avro NULL type are used. - **Java SQL type** | **Avro type** | **Avro schema annotations** | **Comments** ---- | --- | --- | --- -BIGINT | long / string | | Depends on a SQL column precision -INTEGER | int | | -SMALLINT | int | | -TINYINT | int | | -TIMESTAMP | long |logicalType: time-millis | -DATE | long | logicalType: time-millis | -TIME | long | logicalType: time-millis | -TIME_WITH_TIMEZONE | long | logicalType: time-millis | -BOOLEAN | boolean | | -BIT | boolean / bytes | | Depends on a SQL column precision -BINARY | bytes | | -BINARY | bytes | | -VARBINARY | bytes | | -LONGVARBINARY | bytes | | -ARRAY | bytes | | -BLOB | bytes | | -DOUBLE | double | | -FLOAT | float | | -REAL | float | | -VARCHAR | string | | -CHAR | string | | -CLOB | string | | -LONGNVARCHAR | string | | -LONGVARCHAR | string | | -NCHAR | string | | -all other Java SQL types | string | | +| **Java SQL type** | **Avro type** | **Avro schema annotations** | **Comments** | +|--------------------------|-----------------|-----------------------------|---------------------------------------| +| BIGINT | long / string | | Depends on a SQL column precision | +| INTEGER | int | | | +| SMALLINT | int | | | +| TINYINT | int | | | +| TIMESTAMP | long | logicalType: time-millis | | +| DATE | long | logicalType: time-millis | | +| TIME | long | logicalType: time-millis | | +| TIME_WITH_TIMEZONE | long | logicalType: time-millis | | +| BOOLEAN | boolean | | | +| BIT | boolean / bytes | | Depends on a SQL column precision | +| BINARY | bytes | | | +| BINARY | bytes | | | +| VARBINARY | bytes | | | +| LONGVARBINARY | bytes | | | +| ARRAY | array | | Propagates an array item type as well | +| BLOB | bytes | | | +| DOUBLE | double | | | +| FLOAT | float | | | +| REAL | float | | | +| VARCHAR | string | | | +| CHAR | string | | | +| CLOB | string | | | +| LONGNVARCHAR | string | | | +| LONGVARCHAR | string | | | +| NCHAR | string | | | +| all other Java SQL types | string | | | diff --git a/e2e/e2e.sh b/e2e/e2e.sh index e4934bc5..0c26305a 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -68,6 +68,7 @@ run_docker_dbeam() { time docker run --interactive --rm \ --net="$DOCKER_NETWORK" \ --mount="type=bind,source=$PROJECT_PATH/dbeam-core/target,target=/dbeam" \ + --mount="type=bind,source=$SCRIPT_PATH,target=$SCRIPT_PATH" \ --memory=1G \ --entrypoint=/usr/bin/java \ "$JAVA_DOCKER_IMAGE" \