From 6a5c1fc493d44e9e002f603086b85299563367df Mon Sep 17 00:00:00 2001 From: fuchanghai Date: Sat, 27 Jan 2024 18:00:22 +0800 Subject: [PATCH] [improvement] replace `lable` to `label` (#3515) --- .../flink/connector/doris/internal/DorisSinkWriter.java | 2 +- .../apache/streampark/connector/doris/conf/DorisConfig.scala | 2 +- .../connector/doris/conf/DorisSinkConfigOption.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java index a6f0d5317d..160b320478 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java @@ -166,7 +166,7 @@ public final synchronized void writeRecords(String database, String table, Strin final String bufferKey = String.format("%s.%s", database, table); final DorisSinkBufferEntry bufferEntity = bufferMap.computeIfAbsent( - bufferKey, k -> new DorisSinkBufferEntry(database, table, dorisConfig.lablePrefix())); + bufferKey, k -> new DorisSinkBufferEntry(database, table, dorisConfig.labelPrefix())); for (String record : records) { byte[] bts = record.getBytes(StandardCharsets.UTF_8); bufferEntity.addToBuffer(bts); diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala index df09056c1a..5407886e84 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala @@ -58,7 +58,7 @@ class DorisConfig(parameters: Properties) { val sinkOfferTimeout: Long = sinkOption.sinkOfferTimeout.get() - val lablePrefix: String = sinkOption.lablePrefix.get() + val labelPrefix: String = sinkOption.labelPrefix.get() val semantic: String = sinkOption.semantic.get() diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala index 2af63c3ec9..933252d589 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala @@ -126,8 +126,8 @@ class DorisSinkConfigOption(prefixStr: String, properties: Properties) extends S val maxRetries: ConfigOption[Int] = ConfigOption(key = "maxRetries", required = false, defaultValue = 1, classType = classOf[Int]) - val lablePrefix: ConfigOption[String] = ConfigOption( - key = "lablePrefix", + val labelPrefix: ConfigOption[String] = ConfigOption( + key = "labelPrefix", required = false, defaultValue = "doris", classType = classOf[String])