From c3464c87770abe99735f81def3bf79d922bcb2b5 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:57:37 +0800 Subject: [PATCH 1/4] [improvement](statistics)Remove useless stats validation check. (#43279) (#43558) backport: https://github.com/apache/doris/pull/43279 --- .../java/org/apache/doris/statistics/ColStatsData.java | 5 ----- .../doris/statistics/ColumnStatisticsCacheLoader.java | 8 -------- .../java/org/apache/doris/statistics/StatisticsCache.java | 6 +----- regression-test/suites/statistics/analyze_stats.groovy | 4 +--- 4 files changed, 2 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java index bb8263994583fd..cc79f779f6e872 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java @@ -131,11 +131,6 @@ public String toSQL(boolean roundByParentheses) { } public ColumnStatistic toColumnStatistic() { - // For non-empty table, return UNKNOWN if we can't collect ndv value. - // Because inaccurate ndv is very misleading. - if (count > 0 && ndv == 0 && count != nullCount) { - return ColumnStatistic.UNKNOWN; - } try { ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder(); columnStatisticBuilder.setCount(count); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index 42d5d542359880..1876cacc003f30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -51,14 +51,6 @@ protected Optional doLoad(StatisticsCacheKey key) { // it will trigger load function again without cache an empty value. return null; } - if (columnStatistic.isPresent()) { - // For non-empty table, return UNKNOWN if we can't collect ndv value. - // Because inaccurate ndv is very misleading. - ColumnStatistic stats = columnStatistic.get(); - if (stats.count > 0 && stats.ndv == 0 && stats.count != stats.numNulls) { - columnStatistic = Optional.of(ColumnStatistic.UNKNOWN); - } - } return columnStatistic; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index af5958d740205e..fae6fd0555fab6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -171,12 +171,8 @@ private void doPreHeat() { long tblId = statsId.tblId; long idxId = statsId.idxId; String colId = statsId.colId; - final StatisticsCacheKey k = - new StatisticsCacheKey(tblId, idxId, colId); + final StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colId); ColumnStatistic c = ColumnStatistic.fromResultRow(r); - if (c.count > 0 && c.ndv == 0 && c.count != c.numNulls) { - c = ColumnStatistic.UNKNOWN; - } putCache(k, c); } catch (Throwable t) { LOG.warn("Error when preheating stats cache. reason: [{}]. Row:[{}]", t.getMessage(), r); diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 993e6f531da8eb..c828274a40357f 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -2706,9 +2706,7 @@ PARTITION `p599` VALUES IN (599) alter_result = sql """show column stats alter_test(id)""" assertEquals(1, alter_result.size()) alter_result = sql """show column cached stats alter_test(id)""" - assertEquals(0, alter_result.size()) - alter_result = sql """show column cached stats alter_test(id)""" - assertEquals(0, alter_result.size()) + assertEquals(1, alter_result.size()) sql """alter table alter_test modify column id set stats ('row_count'='100', 'ndv'='0', 'num_nulls'='100', 'data_size'='2.69975443E8', 'min_value'='1', 'max_value'='2');""" alter_result = sql """show column stats alter_test(id)""" assertEquals(1, alter_result.size()) From 677de3428564f00a0a7b5b338d7b8cd1c97f5bf6 Mon Sep 17 00:00:00 2001 From: walter Date: Mon, 11 Nov 2024 17:03:21 +0800 Subject: [PATCH 2/4] [feat](restore) Support compressed snapshot meta and job info (#43516) (#43568) Cherry-pick #43516 --- fe/fe-common/pom.xml | 4 ++ .../java/org/apache/doris/common/Config.java | 9 ++++ .../org/apache/doris/common/GZIPUtils.java | 48 +++++++++++++++++++ .../doris/service/FrontendServiceImpl.java | 48 ++++++++++++++++--- gensrc/thrift/FrontendService.thrift | 3 ++ 5 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml index 9ca13a628577e0..0f6fca30048fd4 100644 --- a/fe/fe-common/pom.xml +++ b/fe/fe-common/pom.xml @@ -89,6 +89,10 @@ under the License. org.aspectj aspectjrt + + commons-io + commons-io + doris-fe-common diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 954dde05005b08..7fbb4745ac73f5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1506,6 +1506,15 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static boolean backup_job_compressed_serialization = false; + /** + * A internal config, to indicate whether to enable the restore snapshot rpc compression. + * + * The ccr syncer will depends this config to decide whether to compress the meta and job + * info of the restore snapshot request. + */ + @ConfField(mutable = false) + public static boolean enable_restore_snapshot_rpc_compression = true; + /** * Control the max num of tablets per backup job involved. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java new file mode 100644 index 00000000000000..7408e2888cc3a5 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.commons.io.IOUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class GZIPUtils { + public static boolean isGZIPCompressed(byte[] data) { + // From RFC 1952: 3.2. Members with a deflate compressed data stream (ID1 = 8, ID2 = 8) + return data.length >= 2 && data[0] == (byte) 0x1F && data[1] == (byte) 0x8B; + } + + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + gzipStream.write(data); + } + return bytesStream.toByteArray(); + } + + public static byte[] decompress(byte[] data) throws IOException { + ByteArrayInputStream bytesStream = new ByteArrayInputStream(data); + try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { + return IOUtils.toByteArray(gzipStream); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 25fa5e1524c219..d83ff7e08156d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.GZIPUtils; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; @@ -208,6 +209,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.IOException; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -2765,8 +2767,9 @@ public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TExcep // getSnapshotImpl private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp) - throws UserException { - // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type + throws UserException, IOException { + // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, + // snapshot_type if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -2811,10 +2814,22 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { - result.setMeta(snapshot.getMeta()); - result.setJobInfo(snapshot.getJobInfo()); + byte[] meta = snapshot.getMeta(); + byte[] jobInfo = snapshot.getJobInfo(); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", - label, snapshot.getMeta().length, snapshot.getJobInfo().length); + label, meta.length, jobInfo.length); + if (request.isEnableCompress()) { + meta = GZIPUtils.compress(meta); + jobInfo = GZIPUtils.compress(jobInfo); + result.setCompressed(true); + if (LOG.isDebugEnabled()) { + LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta " + + "size {}, compressed job info size {}", label, meta.length, jobInfo.length); + } + } + result.setMeta(meta); + result.setJobInfo(jobInfo); } return result; @@ -2928,8 +2943,27 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque restoreTableRefClause = new AbstractBackupTableRefClause(isExclude, tableRefs); } } - RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, request.getMeta(), - request.getJobInfo()); + + byte[] meta = request.getMeta(); + byte[] jobInfo = request.getJobInfo(); + if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) { + if (LOG.isDebugEnabled()) { + LOG.debug("decompress meta and job info, compressed meta size {}, compressed job info size {}", + meta.length, jobInfo.length); + } + try { + meta = GZIPUtils.decompress(meta); + jobInfo = GZIPUtils.decompress(jobInfo); + } catch (Exception e) { + LOG.warn("decompress meta and job info failed", e); + throw new UserException("decompress meta and job info failed", e); + } + } else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) { + throw new UserException("The request is compressed, but the config " + + "`enable_restore_snapshot_rpc_compressed` is not enabled."); + } + + RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo); restoreStmt.setIsBeingSynced(); LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt); try { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 0140eeff5e113a..575643030e11ca 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1077,6 +1077,7 @@ struct TGetSnapshotRequest { 7: optional string label_name 8: optional string snapshot_name 9: optional TSnapshotType snapshot_type + 10: optional bool enable_compress; } struct TGetSnapshotResult { @@ -1084,6 +1085,7 @@ struct TGetSnapshotResult { 2: optional binary meta 3: optional binary job_info 4: optional Types.TNetworkAddress master_address + 5: optional bool compressed; } struct TTableRef { @@ -1107,6 +1109,7 @@ struct TRestoreSnapshotRequest { 13: optional bool clean_tables 14: optional bool clean_partitions 15: optional bool atomic_restore + 16: optional bool compressed; } struct TRestoreSnapshotResult { From 3671b0e4d2f943ee52190427c0b0ac6897bedc5b Mon Sep 17 00:00:00 2001 From: walter Date: Mon, 11 Nov 2024 19:31:12 +0800 Subject: [PATCH 3/4] [chore](restore) reduce logged unfinished snapshoting task #43076 (#43606) cherry pick from #43076 --- .../src/main/java/org/apache/doris/backup/RestoreJob.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 69bcb3f7941f9d..1db289dbaa9cb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1575,8 +1575,7 @@ private void waitingAllSnapshotsFinished() { return; } - LOG.info("waiting {} replicas to make snapshot: [{}]. {}", - unfinishedSignatureToId.size(), unfinishedSignatureToId, this); + LOG.info("waiting {} replicas to make snapshot. {}", unfinishedSignatureToId.size(), this); return; } From 81f43a13abc9b67d5de7a993aa0fb77eeec47fd9 Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Mon, 11 Nov 2024 20:25:18 +0800 Subject: [PATCH 4/4] [fix](Outfile) forbid parallel outfile if pipeline engine enabled. (#43439) cherry-pick #43437 --- .../org/apache/doris/planner/OriginalPlanner.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index a4d5762841f281..a4c629333b02bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -349,16 +349,11 @@ private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root) * The top plan fragment will only summarize the status of the exported result set and return it to fe. */ private void pushDownResultFileSink(Analyzer analyzer) { - if (fragments.size() < 1) { - return; - } - if (!(fragments.get(0).getSink() instanceof ResultFileSink)) { - return; - } - if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) { - return; - } - if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) { + if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile() + || ConnectContext.get().getSessionVariable().getEnablePipelineEngine() + || fragments.size() < 1 + || !(fragments.get(0).getPlanRoot() instanceof ExchangeNode) + || !(fragments.get(0).getSink() instanceof ResultFileSink)) { return; } PlanFragment topPlanFragment = fragments.get(0);