diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 43a426a89593d5..828ad1b519b060 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -82,6 +82,10 @@ CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60"); // log a warning if a recycle task takes longer than this duration CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h +// force recycler to recycle all useless object. +// **just for TEST** +CONF_Bool(force_immediate_recycle, "false"); + CONF_String(test_s3_ak, ""); CONF_String(test_s3_sk, ""); CONF_String(test_s3_endpoint, ""); diff --git a/cloud/src/recycler/meta_checker.cpp b/cloud/src/recycler/meta_checker.cpp index 522015555de825..f1223068d4be6e 100644 --- a/cloud/src/recycler/meta_checker.cpp +++ b/cloud/src/recycler/meta_checker.cpp @@ -25,6 +25,7 @@ #include #include +#include "common/logging.h" #include "common/util.h" #include "meta-service/keys.h" #include "meta-service/txn_kv.h" @@ -54,6 +55,7 @@ struct PartitionInfo { int64_t db_id; int64_t table_id; int64_t partition_id; + int64_t tablet_id; int64_t visible_version; }; @@ -173,6 +175,9 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) { MYSQL_ROW row = mysql_fetch_row(result); TabletInfo tablet_info = {0}; tablet_info.tablet_id = atoll(row[0]); + VLOG_DEBUG << "get tablet info log" + << ", db name" << elem.first << ", table name" << table + << ",tablet id" << tablet_info.tablet_id; tablet_info.schema_version = atoll(row[4]); tablets.push_back(std::move(tablet_info)); } @@ -201,6 +206,13 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) { partition_info.db_id = atoll(row[4]); partition_info.table_id = atoll(row[5]); partition_info.partition_id = atoll(row[6]); + partition_info.tablet_id = tablet_info.tablet_id; + VLOG_DEBUG << "get partition info log" + << ", db id" << partition_info.db_id << ", table id" + << partition_info.table_id << ", partition id" + << partition_info.partition_id << ", tablet id" + << partition_info.tablet_id; + partitions.insert({partition_info.partition_id, std::move(partition_info)}); } } @@ -217,9 +229,16 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) { int num_row = mysql_num_rows(result); for (int i = 0; i < num_row; ++i) { MYSQL_ROW row = mysql_fetch_row(result); - int partition_id = atoll(row[0]); - int visible_version = atoll(row[2]); + int64_t partition_id = atoll(row[0]); + int64_t visible_version = atoll(row[2]); partitions[partition_id].visible_version = visible_version; + VLOG_DEBUG << "get partition version log" + << ", db name" << elem.first << ", table name" << table + << ", raw partition id" << row[0] << ", first partition id" + << partition_id << ", db id" << partitions[partition_id].db_id + << ", table id" << partitions[partition_id].table_id + << ", second partition id" << partitions[partition_id].partition_id + << ", tablet id" << partitions[partition_id].tablet_id; } } mysql_free_result(result); @@ -354,14 +373,23 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) { int64_t db_id = elem.second.db_id; int64_t table_id = elem.second.table_id; int64_t partition_id = elem.second.partition_id; + int64_t tablet_id = elem.second.tablet_id; std::string ver_key = partition_version_key({instance_id_, db_id, table_id, partition_id}); std::string ver_val; err = txn->get(ver_key, &ver_val); if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - LOG(WARNING) << "version key not found, partition id: " << partition_id; + LOG_WARNING("version key not found.") + .tag("db id", db_id) + .tag("table id", table_id) + .tag("partition id", partition_id) + .tag("tablet id", tablet_id); return false; } else if (err != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to get version: " << partition_id; + LOG_WARNING("failed to get version.") + .tag("db id", db_id) + .tag("table id", table_id) + .tag("partition id", partition_id) + .tag("tablet id", tablet_id); return false; } diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 1b21ec68916be6..62fe9eae1a16bf 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -743,7 +744,10 @@ int InstanceRecycler::recycle_indexes() { .tag("num_recycled", num_recycled); }); - auto calc_expiration = [](const RecycleIndexPB& index) { + auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t { + if (config::force_immediate_recycle) { + return 0; + } int64_t expiration = index.expiration() > 0 ? index.expiration() : index.creation_time(); int64_t retention_seconds = config::retention_seconds; if (index.state() == RecycleIndexPB::DROPPED) { @@ -938,7 +942,10 @@ int InstanceRecycler::recycle_partitions() { .tag("num_recycled", num_recycled); }); - auto calc_expiration = [](const RecyclePartitionPB& partition) { + auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t { + if (config::force_immediate_recycle) { + return 0; + } int64_t expiration = partition.expiration() > 0 ? partition.expiration() : partition.creation_time(); int64_t retention_seconds = config::retention_seconds; @@ -1680,7 +1687,10 @@ int InstanceRecycler::recycle_rowsets() { return 0; }; - auto calc_expiration = [](const RecycleRowsetPB& rs) { + auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t { + if (config::force_immediate_recycle) { + return 0; + } // RecycleRowsetPB created by compacted or dropped rowset has no expiration time, and will be recycled when exceed retention time int64_t expiration = rs.expiration() > 0 ? rs.expiration() : rs.creation_time(); int64_t retention_seconds = config::retention_seconds; @@ -1917,8 +1927,9 @@ int InstanceRecycler::recycle_tmp_rowsets() { // ATTN: `txn_expiration` should > 0, however we use `creation_time` + a large `retention_time` (> 1 day in production environment) // when `txn_expiration` <= 0 in some unexpected situation (usually when there are bugs). This is usually safe, coz loading // duration or timeout always < `retention_time` in practice. - int64_t expiration = - rowset.txn_expiration() > 0 ? rowset.txn_expiration() : rowset.creation_time(); + int64_t expiration = config::force_immediate_recycle ? 0 + : rowset.txn_expiration() > 0 ? rowset.txn_expiration() + : rowset.creation_time(); VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned << " num_expired=" << num_expired << " expiration=" << expiration << " txn_expiration=" << rowset.txn_expiration() @@ -2100,7 +2111,7 @@ int InstanceRecycler::abort_timeout_txn() { LOG_WARNING("malformed txn_running_pb").tag("key", hex(k)); return -1; } - if (txn_running_pb.timeout_time() > current_time) { + if (!config::force_immediate_recycle && txn_running_pb.timeout_time() > current_time) { return 0; } ++num_timeout; @@ -2190,7 +2201,8 @@ int InstanceRecycler::recycle_expired_txn_label() { LOG_WARNING("malformed txn_running_pb").tag("key", hex(k)); return -1; } - if ((recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) || + if ((config::force_immediate_recycle) || + (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) || (recycle_txn_pb.creation_time() + config::label_keep_max_second * 1000L <= current_time)) { LOG_INFO("found recycle txn").tag("key", hex(k)); @@ -2486,14 +2498,16 @@ int InstanceRecycler::recycle_copy_jobs() { int64_t current_time = duration_cast(system_clock::now().time_since_epoch()).count(); if (copy_job.finish_time_ms() > 0) { - if (current_time < - copy_job.finish_time_ms() + config::copy_job_max_retention_second * 1000) { + if (!config::force_immediate_recycle && + current_time < copy_job.finish_time_ms() + + config::copy_job_max_retention_second * 1000) { return 0; } } else { // For compatibility, copy job does not contain finish time before 2.2.2, use start time - if (current_time < - copy_job.start_time_ms() + config::copy_job_max_retention_second * 1000) { + if (!config::force_immediate_recycle && + current_time < copy_job.start_time_ms() + + config::copy_job_max_retention_second * 1000) { return 0; } } @@ -2502,7 +2516,7 @@ int InstanceRecycler::recycle_copy_jobs() { int64_t current_time = duration_cast(system_clock::now().time_since_epoch()).count(); // if copy job is timeout: delete all copy file kvs and copy job kv - if (current_time <= copy_job.timeout_time_ms()) { + if (!config::force_immediate_recycle && current_time <= copy_job.timeout_time_ms()) { return 0; } ++num_expired; @@ -2790,6 +2804,9 @@ int InstanceRecycler::recycle_expired_stage_objects() { int64_t expiration_time = duration_cast(system_clock::now().time_since_epoch()).count() - config::internal_stage_objects_expire_time_second; + if (config::force_immediate_recycle) { + expiration_time = INT64_MAX; + } ret1 = accessor->delete_all(expiration_time); if (ret1 != 0) { LOG(WARNING) << "failed to recycle expired stage objects, instance_id=" << instance_id_