From 5b46c84c1f99092f26a9db2da7dbdef806bf3a57 Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Mon, 28 Oct 2024 22:46:43 +0800 Subject: [PATCH] [refactor](hudi, iceberg) optimize some code (#42636) make code more readable --- .../doris/datasource/hudi/HudiUtils.java | 242 ++++++++++-------- .../datasource/hudi/source/HudiScanNode.java | 2 +- .../dlf/client/DLFCachedClientPool.java | 27 +- 3 files changed, 153 insertions(+), 118 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index 3885f1de3ee95e..d7803b1a516f9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; -import com.google.common.base.Preconditions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -41,30 +40,58 @@ public class HudiUtils { private static final SimpleDateFormat defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd"); - public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) { - Schema.Type columnType = avroSchema.getType(); - LogicalType logicalType = avroSchema.getLogicalType(); - switch (columnType) { + /** + * Convert different query instant time format to the commit time format. + * Currently we support three kinds of instant time format for time travel query: + * 1、yyyy-MM-dd HH:mm:ss + * 2、yyyy-MM-dd + * This will convert to 'yyyyMMdd000000'. + * 3、yyyyMMddHHmmss + */ + public static String formatQueryInstant(String queryInstant) throws ParseException { + int instantLength = queryInstant.length(); + if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS] + if (instantLength == 19) { + queryInstant += ".000"; + } + return HoodieInstantTimeGenerator.getInstantForDateString(queryInstant); + } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH + || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS] + HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // validate the format + return queryInstant; + } else if (instantLength == 10) { // for yyyy-MM-dd + return HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant)); + } else { + throw new IllegalArgumentException("Unsupported query instant time format: " + queryInstant + + ", Supported time format are: 'yyyy-MM-dd HH:mm:ss[.SSS]' " + + "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'"); + } + } + + public static String convertAvroToHiveType(Schema schema) { + Schema.Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + + switch (type) { case BOOLEAN: return "boolean"; case INT: if (logicalType instanceof LogicalTypes.Date) { return "date"; - } else if (logicalType instanceof LogicalTypes.TimeMillis) { - break; - } else { - return "int"; } + if (logicalType instanceof LogicalTypes.TimeMillis) { + return handleUnsupportedType(schema); + } + return "int"; case LONG: + if (logicalType instanceof LogicalTypes.TimestampMillis + || logicalType instanceof LogicalTypes.TimestampMicros) { + return logicalType.getName(); + } if (logicalType instanceof LogicalTypes.TimeMicros) { - break; - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - return "timestamp(3)"; - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { - return "timestamp(6)"; - } else { - return "bigint"; + return handleUnsupportedType(schema); } + return "bigint"; case FLOAT: return "float"; case DOUBLE: @@ -74,71 +101,57 @@ public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) { case FIXED: case BYTES: if (logicalType instanceof LogicalTypes.Decimal) { - int precision = ((LogicalTypes.Decimal) logicalType).getPrecision(); - int scale = ((LogicalTypes.Decimal) logicalType).getScale(); - return String.format("decimal(%s,%s)", precision, scale); - } else { - if (columnType == Schema.Type.BYTES) { - return "binary"; - } - return "string"; + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + return String.format("decimal(%d,%d)", decimalType.getPrecision(), decimalType.getScale()); } + return "string"; case ARRAY: - String elementType = fromAvroHudiTypeToHiveTypeString(avroSchema.getElementType()); - return String.format("array<%s>", elementType); + String arrayElementType = convertAvroToHiveType(schema.getElementType()); + return String.format("array<%s>", arrayElementType); case RECORD: - List fields = avroSchema.getFields(); - Preconditions.checkArgument(fields.size() > 0); - String nameToType = fields.stream() - .map(f -> String.format("%s:%s", f.name(), - fromAvroHudiTypeToHiveTypeString(f.schema()))) + List recordFields = schema.getFields(); + if (recordFields.isEmpty()) { + throw new IllegalArgumentException("Record must have fields"); + } + String structFields = recordFields.stream() + .map(field -> String.format("%s:%s", field.name(), convertAvroToHiveType(field.schema()))) .collect(Collectors.joining(",")); - return String.format("struct<%s>", nameToType); + return String.format("struct<%s>", structFields); case MAP: - Schema value = avroSchema.getValueType(); - String valueType = fromAvroHudiTypeToHiveTypeString(value); - return String.format("map<%s,%s>", "string", valueType); + Schema mapValueType = schema.getValueType(); + String mapValueTypeString = convertAvroToHiveType(mapValueType); + return String.format("map", mapValueTypeString); case UNION: - List nonNullMembers = avroSchema.getTypes().stream() - .filter(schema -> !Schema.Type.NULL.equals(schema.getType())) + List unionTypes = schema.getTypes().stream() + .filter(s -> s.getType() != Schema.Type.NULL) .collect(Collectors.toList()); - // The nullable column in hudi is the union type with schemas [null, real column type] - if (nonNullMembers.size() == 1) { - return fromAvroHudiTypeToHiveTypeString(nonNullMembers.get(0)); + if (unionTypes.size() == 1) { + return convertAvroToHiveType(unionTypes.get(0)); } break; default: break; } - String errorMsg = String.format("Unsupported hudi %s type of column %s", avroSchema.getType().getName(), - avroSchema.getName()); - throw new IllegalArgumentException(errorMsg); + + throw new IllegalArgumentException( + String.format("Unsupported type: %s for column: %s", type.getName(), schema.getName())); + } + + private static String handleUnsupportedType(Schema schema) { + throw new IllegalArgumentException(String.format("Unsupported logical type: %s", schema.getLogicalType())); } public static Type fromAvroHudiTypeToDorisType(Schema avroSchema) { Schema.Type columnType = avroSchema.getType(); LogicalType logicalType = avroSchema.getLogicalType(); + switch (columnType) { case BOOLEAN: return Type.BOOLEAN; case INT: - if (logicalType instanceof LogicalTypes.Date) { - return ScalarType.createDateV2Type(); - } else if (logicalType instanceof LogicalTypes.TimeMillis) { - return ScalarType.createTimeV2Type(3); - } else { - return Type.INT; - } + return handleIntType(logicalType); case LONG: - if (logicalType instanceof LogicalTypes.TimeMicros) { - return ScalarType.createTimeV2Type(6); - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - return ScalarType.createDatetimeV2Type(3); - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { - return ScalarType.createDatetimeV2Type(6); - } else { - return Type.BIGINT; - } + return handleLongType(logicalType); case FLOAT: return Type.FLOAT; case DOUBLE: @@ -147,64 +160,75 @@ public static Type fromAvroHudiTypeToDorisType(Schema avroSchema) { return Type.STRING; case FIXED: case BYTES: - if (logicalType instanceof LogicalTypes.Decimal) { - int precision = ((LogicalTypes.Decimal) logicalType).getPrecision(); - int scale = ((LogicalTypes.Decimal) logicalType).getScale(); - return ScalarType.createDecimalV3Type(precision, scale); - } else { - return Type.STRING; - } + return handleFixedOrBytesType(logicalType); case ARRAY: - Type innerType = fromAvroHudiTypeToDorisType(avroSchema.getElementType()); - return ArrayType.create(innerType, true); + return handleArrayType(avroSchema); case RECORD: - ArrayList fields = new ArrayList<>(); - avroSchema.getFields().forEach( - f -> fields.add(new StructField(f.name(), fromAvroHudiTypeToDorisType(f.schema())))); - return new StructType(fields); + return handleRecordType(avroSchema); case MAP: - // Hudi map's key must be string - return new MapType(Type.STRING, fromAvroHudiTypeToDorisType(avroSchema.getValueType())); + return handleMapType(avroSchema); case UNION: - List nonNullMembers = avroSchema.getTypes().stream() - .filter(schema -> !Schema.Type.NULL.equals(schema.getType())) - .collect(Collectors.toList()); - // The nullable column in hudi is the union type with schemas [null, real column type] - if (nonNullMembers.size() == 1) { - return fromAvroHudiTypeToDorisType(nonNullMembers.get(0)); - } - break; + return handleUnionType(avroSchema); default: - break; + return Type.UNSUPPORTED; } - return Type.UNSUPPORTED; } - /** - * Convert different query instant time format to the commit time format. - * Currently we support three kinds of instant time format for time travel query: - * 1、yyyy-MM-dd HH:mm:ss - * 2、yyyy-MM-dd - * This will convert to 'yyyyMMdd000000'. - * 3、yyyyMMddHHmmss - */ - public static String formatQueryInstant(String queryInstant) throws ParseException { - int instantLength = queryInstant.length(); - if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS] - if (instantLength == 19) { - queryInstant += ".000"; - } - return HoodieInstantTimeGenerator.getInstantForDateString(queryInstant); - } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH - || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS] - HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // validate the format - return queryInstant; - } else if (instantLength == 10) { // for yyyy-MM-dd - return HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant)); - } else { - throw new IllegalArgumentException("Unsupported query instant time format: " + queryInstant - + ", Supported time format are: 'yyyy-MM-dd HH:mm:ss[.SSS]' " - + "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'"); + private static Type handleIntType(LogicalType logicalType) { + if (logicalType instanceof LogicalTypes.Date) { + return ScalarType.createDateV2Type(); + } + if (logicalType instanceof LogicalTypes.TimeMillis) { + return ScalarType.createTimeV2Type(3); + } + return Type.INT; + } + + private static Type handleLongType(LogicalType logicalType) { + if (logicalType instanceof LogicalTypes.TimeMicros) { + return ScalarType.createTimeV2Type(6); + } + if (logicalType instanceof LogicalTypes.TimestampMillis) { + return ScalarType.createDatetimeV2Type(3); } + if (logicalType instanceof LogicalTypes.TimestampMicros) { + return ScalarType.createDatetimeV2Type(6); + } + return Type.BIGINT; + } + + private static Type handleFixedOrBytesType(LogicalType logicalType) { + if (logicalType instanceof LogicalTypes.Decimal) { + int precision = ((LogicalTypes.Decimal) logicalType).getPrecision(); + int scale = ((LogicalTypes.Decimal) logicalType).getScale(); + return ScalarType.createDecimalV3Type(precision, scale); + } + return Type.STRING; + } + + private static Type handleArrayType(Schema avroSchema) { + Type innerType = fromAvroHudiTypeToDorisType(avroSchema.getElementType()); + return ArrayType.create(innerType, true); + } + + private static Type handleRecordType(Schema avroSchema) { + ArrayList fields = new ArrayList<>(); + avroSchema.getFields().forEach( + f -> fields.add(new StructField(f.name(), fromAvroHudiTypeToDorisType(f.schema())))); + return new StructType(fields); + } + + private static Type handleMapType(Schema avroSchema) { + return new MapType(Type.STRING, fromAvroHudiTypeToDorisType(avroSchema.getValueType())); + } + + private static Type handleUnionType(Schema avroSchema) { + List nonNullMembers = avroSchema.getTypes().stream() + .filter(schema -> !Schema.Type.NULL.equals(schema.getType())) + .collect(Collectors.toList()); + if (nonNullMembers.size() == 1) { + return fromAvroHudiTypeToDorisType(nonNullMembers.get(0)); + } + return Type.UNSUPPORTED; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index abd5a377f5a9cf..a8f2a362bfde8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -176,7 +176,7 @@ protected void doInitialize() throws UserException { } for (Schema.Field hudiField : hudiSchema.getFields()) { columnNames.add(hudiField.name().toLowerCase(Locale.ROOT)); - String columnType = HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema()); + String columnType = HudiUtils.convertAvroToHiveType(hudiField.schema()); columnTypes.add(columnType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java index f8e70ebd3f52b9..23b814c13b8085 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java @@ -41,22 +41,32 @@ public class DLFCachedClientPool implements ClientPool properties) { this.conf = conf; this.endpoint = conf.get("", ""); - this.clientPoolSize = - PropertyUtil.propertyAsInt( + this.clientPoolSize = getClientPoolSize(properties); + this.evictionInterval = getEvictionInterval(properties); + initializeClientPoolCache(); + } + + private int getClientPoolSize(Map properties) { + return PropertyUtil.propertyAsInt( properties, CatalogProperties.CLIENT_POOL_SIZE, - CatalogProperties.CLIENT_POOL_SIZE_DEFAULT); - this.evictionInterval = - PropertyUtil.propertyAsLong( + CatalogProperties.CLIENT_POOL_SIZE_DEFAULT + ); + } + + private long getEvictionInterval(Map properties) { + return PropertyUtil.propertyAsLong( properties, CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, - CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT); + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT + ); + } + private void initializeClientPoolCache() { if (clientPoolCache == null) { synchronized (clientPoolCacheLock) { if (clientPoolCache == null) { - clientPoolCache = - Caffeine.newBuilder() + clientPoolCache = Caffeine.newBuilder() .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) .removalListener((key, value, cause) -> ((DLFClientPool) value).close()) .build(); @@ -80,3 +90,4 @@ public R run(Action action, boolean retry) return clientPool().run(action, retry); } } +