Skip to content

Commit

Permalink
Merge branch 'master' into drop_workload_policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Nov 24, 2024
2 parents 448d750 + 8ddf4c7 commit be11a21
Show file tree
Hide file tree
Showing 41 changed files with 401 additions and 85 deletions.
12 changes: 12 additions & 0 deletions be/src/common/cast_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ void cast_set(T& a, U b) {
a = static_cast<T>(b);
}

template <typename T, typename U>
requires std::is_floating_point_v<T> and std::is_integral_v<U>
void cast_set(T& a, U b) {
a = static_cast<T>(b);
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
T cast_set(U b) {
Expand All @@ -70,4 +76,10 @@ T cast_set(U b) {
return static_cast<T>(b);
}

template <typename T, typename U>
requires std::is_floating_point_v<T> and std::is_integral_v<U>
T cast_set(U b) {
return static_cast<T>(b);
}

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

DEFINE_Bool(enable_skip_tablet_compaction, "false");
DEFINE_Bool(enable_skip_tablet_compaction, "true");
DEFINE_mInt32(skip_tablet_compaction_second, "10");

// output rowset of cumulative compaction total disk size exceed this config size,
// this rowset will be given to base compaction, unit is m byte.
DEFINE_mInt64(compaction_promotion_size_mbytes, "1024");
Expand Down Expand Up @@ -454,7 +456,7 @@ DEFINE_mInt32(multi_get_max_threads, "10");
DEFINE_mInt64(total_permits_for_compaction_score, "10000");

// sleep interval in ms after generated compaction tasks
DEFINE_mInt32(generate_compaction_tasks_interval_ms, "10");
DEFINE_mInt32(generate_compaction_tasks_interval_ms, "100");

// sleep interval in second after update replica infos
DEFINE_mInt32(update_replica_infos_interval_seconds, "60");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);

DECLARE_Bool(enable_skip_tablet_compaction);
DECLARE_mInt32(skip_tablet_compaction_second);
// output rowset of cumulative compaction total disk size exceed this config size,
// this rowset will be given to base compaction, unit is m byte.
DECLARE_mInt64(compaction_promotion_size_mbytes);
Expand Down
4 changes: 2 additions & 2 deletions be/src/http/action/calc_file_crc_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine& engin
// calculate the crc value of the files in the tablet
Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t* crc_value,
int64_t* start_version, int64_t* end_version,
int32_t* rowset_count, int64_t* file_count) {
uint32_t* rowset_count, int64_t* file_count) {
uint64_t tablet_id = 0;
const auto& req_tablet_id = req->param(TABLET_ID_KEY);
if (req_tablet_id.empty()) {
Expand Down Expand Up @@ -110,7 +110,7 @@ void CalcFileCrcAction::handle(HttpRequest* req) {
uint32_t crc_value = 0;
int64_t start_version = 0;
int64_t end_version = 0;
int32_t rowset_count = 0;
uint32_t rowset_count = 0;
int64_t file_count = 0;

MonotonicStopWatch timer;
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/calc_file_crc_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class CalcFileCrcAction : public HttpHandlerWithAuth {

private:
Status _handle_calc_crc(HttpRequest* req, uint32_t* crc_value, int64_t* start_version,
int64_t* end_version, int32_t* rowset_count, int64_t* file_count);
int64_t* end_version, uint32_t* rowset_count, int64_t* file_count);

private:
BaseStorageEngine& _engine;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mutex>
#include <ostream>

#include "common/cast_set.h"
#include "common/config.h"
#include "common/logging.h"
#include "olap/compaction.h"
Expand All @@ -35,6 +36,8 @@
#include "util/trace.h"

namespace doris {
#include "common/compile_check_begin.h"

using namespace ErrorCode;

BaseCompaction::BaseCompaction(StorageEngine& engine, const TabletSharedPtr& tablet)
Expand Down Expand Up @@ -184,7 +187,8 @@ Status BaseCompaction::pick_rowsets_to_compact() {
// set to 1 to void divide by zero
base_size = 1;
}
double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size;
double cumulative_base_ratio =
cast_set<double>(cumulative_total_size) / cast_set<double>(base_size);

if (cumulative_base_ratio > min_data_ratio) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
32 changes: 21 additions & 11 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
#include <fmt/format.h>
#include <rapidjson/prettywriter.h>

#include <cstdint>
#include <iterator>

#include "common/cast_set.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/calc_delete_bitmap_executor.h"
Expand All @@ -45,6 +49,8 @@
#include "vec/jsonb/serialize.h"

namespace doris {
#include "common/compile_check_begin.h"

using namespace ErrorCode;

namespace {
Expand Down Expand Up @@ -462,9 +468,9 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
RowLocation loc;

for (size_t i = 0; i < specified_rowsets.size(); i++) {
auto& rs = specified_rowsets[i];
auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds();
int num_segments = rs->num_segments();
const auto& rs = specified_rowsets[i];
const auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds();
int num_segments = cast_set<int>(rs->num_segments());
DCHECK_EQ(segments_key_bounds.size(), num_segments);
std::vector<uint32_t> picked_segments;
for (int i = num_segments - 1; i >= 0; i--) {
Expand Down Expand Up @@ -671,7 +677,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,

RowsetSharedPtr rowset_find;
auto st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc,
dummy_version.first - 1, segment_caches, &rowset_find);
cast_set<uint32_t>(dummy_version.first - 1), segment_caches,
&rowset_find);
bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || st.is<KEY_ALREADY_EXISTS>();
// It's a defensive DCHECK, we need to exclude some common errors to avoid core-dump
// while stress test
Expand Down Expand Up @@ -1130,7 +1137,7 @@ Status BaseTablet::generate_new_block_for_flexible_partial_update(
const signed char* delete_sign_column_data) {
if (skipped) {
if (delete_sign_column_data != nullptr &&
delete_sign_column_data[read_index_old[idx]] != 0) {
delete_sign_column_data[read_index_old[cast_set<uint32_t>(idx)]] != 0) {
if (tablet_column.has_default_value()) {
new_col->insert_from(default_value_col, 0);
} else if (tablet_column.is_nullable()) {
Expand Down Expand Up @@ -1300,7 +1307,8 @@ Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
for (const auto& rowset : *rowsets) {
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
value.SetString(version_str.c_str(),
cast_set<rapidjson::SizeType>(version_str.length()),
required_rowsets_arr.GetAllocator());
required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator());
}
Expand All @@ -1313,15 +1321,17 @@ Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
for (const auto& rowset : rowsets) {
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
value.SetString(version_str.c_str(),
cast_set<rapidjson::SizeType>(version_str.length()),
required_rowsets_arr.GetAllocator());
required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator());
}
}
for (const auto& missing_rowset_id : missing_ids) {
rapidjson::Value miss_value;
std::string rowset_id_str = missing_rowset_id.to_string();
miss_value.SetString(rowset_id_str.c_str(), rowset_id_str.length(),
miss_value.SetString(rowset_id_str.c_str(),
cast_set<rapidjson::SizeType>(rowset_id_str.length()),
missing_rowsets_arr.GetAllocator());
missing_rowsets_arr.PushBack(miss_value, missing_rowsets_arr.GetAllocator());
}
Expand Down Expand Up @@ -1725,7 +1735,7 @@ std::vector<RowsetSharedPtr> BaseTablet::get_snapshot_rowset(bool include_stale_
void BaseTablet::calc_consecutive_empty_rowsets(
std::vector<RowsetSharedPtr>* empty_rowsets,
const std::vector<RowsetSharedPtr>& candidate_rowsets, int limit) {
int len = candidate_rowsets.size();
int len = cast_set<int>(candidate_rowsets.size());
for (int i = 0; i < len - 1; ++i) {
auto rowset = candidate_rowsets[i];
auto next_rowset = candidate_rowsets[i + 1];
Expand Down Expand Up @@ -1761,7 +1771,7 @@ void BaseTablet::calc_consecutive_empty_rowsets(
}

Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version,
int32_t* rowset_count, int64_t* file_count) {
uint32_t* rowset_count, int64_t* file_count) {
Version v(start_version, end_version);
std::vector<RowsetSharedPtr> rowsets;
traverse_rowsets([&rowsets, &v](const auto& rs) {
Expand All @@ -1771,7 +1781,7 @@ Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int
}
});
std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
*rowset_count = rowsets.size();
*rowset_count = cast_set<uint32_t>(rowsets.size());

*crc_value = 0;
*file_count = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class BaseTablet {
}

Status calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version,
int32_t* rowset_count, int64_t* file_count);
uint32_t* rowset_count, int64_t* file_count);

Status show_nested_index_file(std::string* json_meta);

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2542,10 +2542,10 @@ void Tablet::set_skip_compaction(bool skip, CompactionType compaction_type, int6

bool Tablet::should_skip_compaction(CompactionType compaction_type, int64_t now) {
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION && _skip_cumu_compaction &&
now < _skip_cumu_compaction_ts + 120) {
now < _skip_cumu_compaction_ts + config::skip_tablet_compaction_second) {
return true;
} else if (compaction_type == CompactionType::BASE_COMPACTION && _skip_base_compaction &&
now < _skip_base_compaction_ts + 120) {
now < _skip_base_compaction_ts + config::skip_tablet_compaction_second) {
return true;
}
return false;
Expand Down
16 changes: 4 additions & 12 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,17 @@ int Checker::start() {
auto ctime_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
ret = checker->do_check();
int ret1 = checker->do_check();

int ret2 = 0;
if (config::enable_inverted_check) {
if (ret == 0) {
ret = checker->do_inverted_check();
}
}

if (ret < 0) {
// If ret < 0, it means that a temporary error occurred during the check process.
// The check job should not be considered finished, and the next round of check job
// should be retried as soon as possible.
return;
ret2 = checker->do_inverted_check();
}

// If instance checker has been aborted, don't finish this job
if (!checker->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
ip_port_, ret == 0, ctime_ms);
ip_port_, ret1 == 0 && ret2 == 0, ctime_ms);
}
{
std::lock_guard lock(mtx_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,7 @@ public class Config extends ConfigBase {
* only for certain test type. E.g. only settting batch_size to small
* value for p0.
*/
@ConfField(mutable = true, masterOnly = false, options = {"p0"})
@ConfField(mutable = true, masterOnly = false, options = {"p0", "daily", "rqg"})
public static String fuzzy_test_type = "";

/**
Expand All @@ -2189,6 +2189,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = false)
public static boolean use_fuzzy_session_variable = false;

/**
* Set config variables randomly to check more issues in github workflow
*/
@ConfField(mutable = true, masterOnly = false)
public static boolean use_fuzzy_conf = false;

/**
* Max num of same name meta informatntion in catalog recycle bin.
* Default is 3.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ statementBase
| supportedRefreshStatement #supportedRefreshStatementAlias
| supportedShowStatement #supportedShowStatementAlias
| supportedRecoverStatement #supportedRecoverStatementAlias
| supportedLoadStatement #supportedLoadfStatementAlias
| unsupportedStatement #unsupported
;

Expand Down Expand Up @@ -235,6 +236,10 @@ supportedShowStatement
tabletIds+=INTEGER_VALUE (COMMA tabletIds+=INTEGER_VALUE)* #showTabletsBelong
;

supportedLoadStatement
: SYNC #sync
;

unsupportedOtherStatement
: HELP mark=identifierOrText #help
| INSTALL PLUGIN FROM source=identifierOrText properties=propertyClause? #installPlugin
Expand Down Expand Up @@ -372,7 +377,6 @@ unsupportedLoadStatement
| SHOW ROUTINE LOAD TASK ((FROM | IN) database=identifier)? wildWhere? #showRoutineLoadTask
| SHOW ALL? CREATE ROUTINE LOAD FOR label=multipartIdentifier #showCreateRoutineLoad
| SHOW CREATE LOAD FOR label=multipartIdentifier #showCreateLoad
| SYNC #sync
| importSequenceStatement #importSequenceStatementAlias
| importPrecedingFilterStatement #importPrecedingFilterStatementAlias
| importWhereStatement #importWhereStatementAlias
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
import java.util.concurrent.TimeUnit;

public class DorisFE {
Expand Down Expand Up @@ -162,6 +163,8 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
System.exit(-1);
}

fuzzyConfigs();

LOG.info("Doris FE starting...");

FrontendOptions.init();
Expand Down Expand Up @@ -529,6 +532,16 @@ public static void overwriteConfigs() {
}
}

private static void fuzzyConfigs() {
if (!Config.use_fuzzy_conf) {
return;
}
if (Config.fuzzy_test_type.equalsIgnoreCase("daily") || Config.fuzzy_test_type.equalsIgnoreCase("rqg")) {
Config.random_add_cluster_keys_for_mow = (LocalDate.now().getDayOfMonth() % 2 == 0);
LOG.info("fuzzy set random_add_cluster_keys_for_mow={}", Config.random_add_cluster_keys_for_mow);
}
}

public static class StartupOptions {
public boolean enableHttpServer = true;
public boolean enableQeService = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ private void replayCancelled(IndexChangeJob replayedJob) {
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static IndexChangeJob read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_122) {
IndexChangeJob job = new IndexChangeJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1922,7 +1922,7 @@ public void process(String rawSql, List<AlterClause> alterClauses, Database db,
// index id -> index schema
Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>();

//for multi add colmuns clauses
//for multi add columns clauses
//index id -> index col_unique_id supplier
Map<Long, IntSupplier> colUniqueIdSupplierMap = new HashMap<>();
for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema(true).entrySet()) {
Expand Down Expand Up @@ -2752,7 +2752,7 @@ private boolean processAddIndex(CreateIndexClause alterClause, OlapTable olapTab
// the column name in CreateIndexClause is not check case sensitivity,
// when send index description to BE, there maybe cannot find column by name,
// so here update column name in CreateIndexClause after checkColumn for indexDef,
// there will use the column name in olapTable insead of the column name in CreateIndexClause.
// there will use the column name in olapTable instead of the column name in CreateIndexClause.
alterIndex.setColumns(indexDef.getColumns());
alterIndex.setColumnUniqueIds(indexDef.getColumnUniqueIds());
newIndexes.add(alterIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ public String toSql() {
sb.append("DROP ROLLUP ");
}
sb.append(((AddRollupClause) op).getRollupName());
} else if (op instanceof CreateIndexClause) {
sb.append(((CreateIndexClause) op).toSql(true));
} else if (op instanceof DropIndexClause) {
sb.append(((DropIndexClause) op).toSql(true));
} else {
sb.append(op.toSql());
}
Expand Down
Loading

0 comments on commit be11a21

Please sign in to comment.