Skip to content

Commit

Permalink
branch-3.0: [Enhancement](Test) Add test config to recycler #44761 (#…
Browse files Browse the repository at this point in the history
…45368)

Cherry-picked from #44761

Co-authored-by: abmdocrt <[email protected]>
  • Loading branch information
github-actions[bot] and Yukang-Lian authored Dec 13, 2024
1 parent f53bb5d commit 45bcd1d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
4 changes: 4 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
// log a warning if a recycle task takes longer than this duration
CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h

// force recycler to recycle all useless object.
// **just for TEST**
CONF_Bool(force_immediate_recycle, "false");

CONF_String(test_s3_ak, "");
CONF_String(test_s3_sk, "");
CONF_String(test_s3_endpoint, "");
Expand Down
36 changes: 32 additions & 4 deletions cloud/src/recycler/meta_checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <chrono>
#include <set>

#include "common/logging.h"
#include "common/util.h"
#include "meta-service/keys.h"
#include "meta-service/txn_kv.h"
Expand Down Expand Up @@ -54,6 +55,7 @@ struct PartitionInfo {
int64_t db_id;
int64_t table_id;
int64_t partition_id;
int64_t tablet_id;
int64_t visible_version;
};

Expand Down Expand Up @@ -173,6 +175,9 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
MYSQL_ROW row = mysql_fetch_row(result);
TabletInfo tablet_info = {0};
tablet_info.tablet_id = atoll(row[0]);
VLOG_DEBUG << "get tablet info log"
<< ", db name" << elem.first << ", table name" << table
<< ",tablet id" << tablet_info.tablet_id;
tablet_info.schema_version = atoll(row[4]);
tablets.push_back(std::move(tablet_info));
}
Expand Down Expand Up @@ -201,6 +206,13 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
partition_info.db_id = atoll(row[4]);
partition_info.table_id = atoll(row[5]);
partition_info.partition_id = atoll(row[6]);
partition_info.tablet_id = tablet_info.tablet_id;
VLOG_DEBUG << "get partition info log"
<< ", db id" << partition_info.db_id << ", table id"
<< partition_info.table_id << ", partition id"
<< partition_info.partition_id << ", tablet id"
<< partition_info.tablet_id;

partitions.insert({partition_info.partition_id, std::move(partition_info)});
}
}
Expand All @@ -217,9 +229,16 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
int num_row = mysql_num_rows(result);
for (int i = 0; i < num_row; ++i) {
MYSQL_ROW row = mysql_fetch_row(result);
int partition_id = atoll(row[0]);
int visible_version = atoll(row[2]);
int64_t partition_id = atoll(row[0]);
int64_t visible_version = atoll(row[2]);
partitions[partition_id].visible_version = visible_version;
VLOG_DEBUG << "get partition version log"
<< ", db name" << elem.first << ", table name" << table
<< ", raw partition id" << row[0] << ", first partition id"
<< partition_id << ", db id" << partitions[partition_id].db_id
<< ", table id" << partitions[partition_id].table_id
<< ", second partition id" << partitions[partition_id].partition_id
<< ", tablet id" << partitions[partition_id].tablet_id;
}
}
mysql_free_result(result);
Expand Down Expand Up @@ -354,14 +373,23 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
int64_t db_id = elem.second.db_id;
int64_t table_id = elem.second.table_id;
int64_t partition_id = elem.second.partition_id;
int64_t tablet_id = elem.second.tablet_id;
std::string ver_key = partition_version_key({instance_id_, db_id, table_id, partition_id});
std::string ver_val;
err = txn->get(ver_key, &ver_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
LOG(WARNING) << "version key not found, partition id: " << partition_id;
LOG_WARNING("version key not found.")
.tag("db id", db_id)
.tag("table id", table_id)
.tag("partition id", partition_id)
.tag("tablet id", tablet_id);
return false;
} else if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to get version: " << partition_id;
LOG_WARNING("failed to get version.")
.tag("db id", db_id)
.tag("table id", table_id)
.tag("partition id", partition_id)
.tag("tablet id", tablet_id);
return false;
}

Expand Down
41 changes: 29 additions & 12 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <atomic>
#include <chrono>
#include <cstdint>
#include <deque>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -747,7 +748,10 @@ int InstanceRecycler::recycle_indexes() {
.tag("num_recycled", num_recycled);
});

auto calc_expiration = [](const RecycleIndexPB& index) {
auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t {
if (config::force_immediate_recycle) {
return 0;
}
int64_t expiration = index.expiration() > 0 ? index.expiration() : index.creation_time();
int64_t retention_seconds = config::retention_seconds;
if (index.state() == RecycleIndexPB::DROPPED) {
Expand Down Expand Up @@ -942,7 +946,10 @@ int InstanceRecycler::recycle_partitions() {
.tag("num_recycled", num_recycled);
});

auto calc_expiration = [](const RecyclePartitionPB& partition) {
auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t {
if (config::force_immediate_recycle) {
return 0;
}
int64_t expiration =
partition.expiration() > 0 ? partition.expiration() : partition.creation_time();
int64_t retention_seconds = config::retention_seconds;
Expand Down Expand Up @@ -1686,7 +1693,10 @@ int InstanceRecycler::recycle_rowsets() {
return 0;
};

auto calc_expiration = [](const RecycleRowsetPB& rs) {
auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t {
if (config::force_immediate_recycle) {
return 0;
}
// RecycleRowsetPB created by compacted or dropped rowset has no expiration time, and will be recycled when exceed retention time
int64_t expiration = rs.expiration() > 0 ? rs.expiration() : rs.creation_time();
int64_t retention_seconds = config::retention_seconds;
Expand Down Expand Up @@ -1923,8 +1933,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
// ATTN: `txn_expiration` should > 0, however we use `creation_time` + a large `retention_time` (> 1 day in production environment)
// when `txn_expiration` <= 0 in some unexpected situation (usually when there are bugs). This is usually safe, coz loading
// duration or timeout always < `retention_time` in practice.
int64_t expiration =
rowset.txn_expiration() > 0 ? rowset.txn_expiration() : rowset.creation_time();
int64_t expiration = config::force_immediate_recycle ? 0
: rowset.txn_expiration() > 0 ? rowset.txn_expiration()
: rowset.creation_time();
VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned
<< " num_expired=" << num_expired << " expiration=" << expiration
<< " txn_expiration=" << rowset.txn_expiration()
Expand Down Expand Up @@ -2106,7 +2117,7 @@ int InstanceRecycler::abort_timeout_txn() {
LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
return -1;
}
if (txn_running_pb.timeout_time() > current_time) {
if (!config::force_immediate_recycle && txn_running_pb.timeout_time() > current_time) {
return 0;
}
++num_timeout;
Expand Down Expand Up @@ -2196,7 +2207,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
return -1;
}
if ((recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
if ((config::force_immediate_recycle) ||
(recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
(recycle_txn_pb.creation_time() + config::label_keep_max_second * 1000L <=
current_time)) {
LOG_INFO("found recycle txn").tag("key", hex(k));
Expand Down Expand Up @@ -2492,14 +2504,16 @@ int InstanceRecycler::recycle_copy_jobs() {
int64_t current_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (copy_job.finish_time_ms() > 0) {
if (current_time <
copy_job.finish_time_ms() + config::copy_job_max_retention_second * 1000) {
if (!config::force_immediate_recycle &&
current_time < copy_job.finish_time_ms() +
config::copy_job_max_retention_second * 1000) {
return 0;
}
} else {
// For compatibility, copy job does not contain finish time before 2.2.2, use start time
if (current_time <
copy_job.start_time_ms() + config::copy_job_max_retention_second * 1000) {
if (!config::force_immediate_recycle &&
current_time < copy_job.start_time_ms() +
config::copy_job_max_retention_second * 1000) {
return 0;
}
}
Expand All @@ -2508,7 +2522,7 @@ int InstanceRecycler::recycle_copy_jobs() {
int64_t current_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
// if copy job is timeout: delete all copy file kvs and copy job kv
if (current_time <= copy_job.timeout_time_ms()) {
if (!config::force_immediate_recycle && current_time <= copy_job.timeout_time_ms()) {
return 0;
}
++num_expired;
Expand Down Expand Up @@ -2796,6 +2810,9 @@ int InstanceRecycler::recycle_expired_stage_objects() {
int64_t expiration_time =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
config::internal_stage_objects_expire_time_second;
if (config::force_immediate_recycle) {
expiration_time = INT64_MAX;
}
ret1 = accessor->delete_all(expiration_time);
if (ret1 != 0) {
LOG(WARNING) << "failed to recycle expired stage objects, instance_id=" << instance_id_
Expand Down

0 comments on commit 45bcd1d

Please sign in to comment.