Skip to content

Commit

Permalink
[fix][store] Fixup manual trigger rebuild vector index issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
rock-git authored and ketor committed Mar 29, 2024
1 parent 5ce8d0c commit eb8d31d
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 46 deletions.
3 changes: 1 addition & 2 deletions src/common/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ ThreadPool::ThreadPool(const std::string &thread_name, uint32_t thread_num, std:
{
std::unique_lock<std::mutex> lock(this->task_mutex_);

this->task_condition_.wait_for(lock, std::chrono::milliseconds(10),
[this] { return this->stop_ || !this->tasks_.empty(); });
this->task_condition_.wait(lock, [this] { return this->stop_ || !this->tasks_.empty(); });

if (this->stop_ && this->tasks_.empty()) {
return;
Expand Down
19 changes: 12 additions & 7 deletions src/handler/raft_apply_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ bool HandlePreCreateRegionSplit(const pb::raft::SplitRequest &request, store::Re
ADD_REGION_CHANGE_RECORD_TIMEPOINT(request.job_id(), "Launch rebuild vector index");
// Rebuild vector index
if (Server::GetInstance().IsLeader(to_region->Id())) {
VectorIndexManager::LaunchRebuildVectorIndex(to_region->VectorIndexWrapper(), request.job_id(), "child split");
VectorIndexManager::LaunchRebuildVectorIndex(to_region->VectorIndexWrapper(), request.job_id(), false, false,
true, "child split");
} else {
DINGO_LOG(INFO) << fmt::format(
"[split.spliting][job_id({}).region({}->{})] child follower not need rebuild vector index.", request.job_id(),
Expand All @@ -367,7 +368,8 @@ bool HandlePreCreateRegionSplit(const pb::raft::SplitRequest &request, store::Re
}

if (Server::GetInstance().IsLeader(from_region->Id())) {
VectorIndexManager::LaunchRebuildVectorIndex(from_region->VectorIndexWrapper(), request.job_id(), "parent split");
VectorIndexManager::LaunchRebuildVectorIndex(from_region->VectorIndexWrapper(), request.job_id(), false, false,
true, "parent split");
} else {
DINGO_LOG(INFO) << fmt::format(
"[split.spliting][job_id({}).region({}->{})] parent follower not need rebuild vector index.",
Expand Down Expand Up @@ -558,7 +560,8 @@ bool HandlePostCreateRegionSplit(const pb::raft::SplitRequest &request, store::R
ADD_REGION_CHANGE_RECORD_TIMEPOINT(request.job_id(), "Launch rebuild vector index");
// Rebuild vector index
if (Server::GetInstance().IsLeader(child_region->Id())) {
VectorIndexManager::LaunchRebuildVectorIndex(child_region->VectorIndexWrapper(), request.job_id(), "child split");
VectorIndexManager::LaunchRebuildVectorIndex(child_region->VectorIndexWrapper(), request.job_id(), false, false,
true, "child split");
} else {
DINGO_LOG(INFO) << fmt::format(
"[split.spliting][job_id({}).region({}->{})] child follower not need rebuild vector index.", request.job_id(),
Expand All @@ -576,8 +579,8 @@ bool HandlePostCreateRegionSplit(const pb::raft::SplitRequest &request, store::R
}

if (Server::GetInstance().IsLeader(parent_region->Id())) {
VectorIndexManager::LaunchRebuildVectorIndex(parent_region->VectorIndexWrapper(), request.job_id(),
"parent split");
VectorIndexManager::LaunchRebuildVectorIndex(parent_region->VectorIndexWrapper(), request.job_id(), false, false,
true, "parent split");
} else {
DINGO_LOG(INFO) << fmt::format(
"[split.spliting][job_id({}).region({}->{})] parent follower not need rebuild vector index.",
Expand Down Expand Up @@ -905,7 +908,8 @@ int CommitMergeHandler::Handle(std::shared_ptr<Context>, store::RegionPtr target
ADD_REGION_CHANGE_RECORD_TIMEPOINT(request.job_id(), "Launch rebuild vector index");
// Rebuild vector index
if (Server::GetInstance().IsLeader(target_region->Id())) {
VectorIndexManager::LaunchRebuildVectorIndex(target_region->VectorIndexWrapper(), request.job_id(), "merge");
VectorIndexManager::LaunchRebuildVectorIndex(target_region->VectorIndexWrapper(), request.job_id(), false, false,
true, "merge");
} else {
DINGO_LOG(WARNING) << fmt::format(
"[merge.merging][job_id({}).region({}/{})] target follower not need rebuild vector index.", request.job_id(),
Expand Down Expand Up @@ -1220,7 +1224,8 @@ int RebuildVectorIndexHandler::Handle(std::shared_ptr<Context>, store::RegionPtr
if (vector_index_wrapper != nullptr) {
vector_index_wrapper->SaveApplyLogId(log_id);

VectorIndexManager::LaunchRebuildVectorIndex(vector_index_wrapper, request.cmd_id(), "from raft");
VectorIndexManager::LaunchRebuildVectorIndex(vector_index_wrapper, request.cmd_id(), false, false, true,
"from raft");
}

return 0;
Expand Down
3 changes: 2 additions & 1 deletion src/server/debug_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ void DebugServiceImpl::TriggerRebuildVectorIndex(google::protobuf::RpcController
return;
}

VectorIndexManager::LaunchRebuildVectorIndex(vector_index_wrapper, Helper::TimestampMs(), "from debug");
VectorIndexManager::LaunchRebuildVectorIndex(vector_index_wrapper, Helper::TimestampMs(), false, true, true,
"from debug");
}

void DebugServiceImpl::TriggerSaveVectorIndex(google::protobuf::RpcController* controller,
Expand Down
5 changes: 3 additions & 2 deletions src/vector/vector_index_hnsw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ DECLARE_int64(vector_max_batch_count);

DECLARE_uint32(vector_write_batch_size_per_task);
DECLARE_uint32(vector_read_batch_size_per_task);
DECLARE_uint32(parallel_log_threshold_time_ms);

bvar::LatencyRecorder g_hnsw_upsert_latency("dingo_hnsw_upsert_latency");
bvar::LatencyRecorder g_hnsw_search_latency("dingo_hnsw_search_latency");
Expand Down Expand Up @@ -123,8 +124,8 @@ inline void ParallelFor(ThreadPoolPtr thread_pool, size_t start, size_t end, uin
}

int64_t elapsed_time = Helper::TimestampMs() - start_time;
LOG_IF(INFO, elapsed_time > 10000) << fmt::format("ParallelFor vector count({}) is_priority({}) elapsed time: {}",
end - start, is_priority, elapsed_time);
LOG_IF(INFO, elapsed_time > FLAGS_parallel_log_threshold_time_ms) << fmt::format(
"ParallelFor vector count({}) is_priority({}) elapsed time: {}", end - start, is_priority, elapsed_time);
}

VectorIndexHnsw::VectorIndexHnsw(int64_t id, const pb::common::VectorIndexParameter& vector_index_parameter,
Expand Down
19 changes: 6 additions & 13 deletions src/vector/vector_index_ivf_flat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,13 @@ butil::Status VectorIndexIvfFlat::Train(std::vector<float>& train_datas) {

faiss::ClusteringParameters clustering_parameters;
if (BAIDU_UNLIKELY(data_size < (clustering_parameters.min_points_per_centroid * nlist_))) {
std::string s = fmt::format("train_datas size : {} not enough. suggest at least : {}. ignore", data_size,
clustering_parameters.min_points_per_centroid * nlist_);
std::string s = fmt::format("[vector_index.ivf_flat][id({})] train_datas size({}) not enough. suggest at least {}.",
Id(), data_size, clustering_parameters.min_points_per_centroid * nlist_);
DINGO_LOG(WARNING) << s;
} else if (BAIDU_UNLIKELY(data_size >= clustering_parameters.min_points_per_centroid * nlist_ &&
data_size < clustering_parameters.max_points_per_centroid * nlist_)) {
std::string s = fmt::format("train_datas size : {} not enough. suggest at least : {}. ignore", data_size,
clustering_parameters.max_points_per_centroid * nlist_);
std::string s = fmt::format("[vector_index.ivf_flat][id({})] train_datas size({}) not enough. suggest at least {}.",
Id(), data_size, clustering_parameters.max_points_per_centroid * nlist_);
DINGO_LOG(WARNING) << s;
}

Expand Down Expand Up @@ -527,8 +527,6 @@ bool VectorIndexIvfFlat::NeedToRebuild() {
RWLockReadGuard guard(&rw_lock_);

if (BAIDU_UNLIKELY(!IsTrainedImpl())) {
std::string s = fmt::format("not trained");
DINGO_LOG(WARNING) << s;
return false;
}

Expand Down Expand Up @@ -559,17 +557,12 @@ bool VectorIndexIvfFlat::IsTrained() {

bool VectorIndexIvfFlat::NeedToSave(int64_t last_save_log_behind) {
RWLockReadGuard guard(&rw_lock_);

if (BAIDU_UNLIKELY(!IsTrainedImpl())) {
std::string s = fmt::format("not trained. train first");
DINGO_LOG(WARNING) << s;
return false;
}

int64_t element_count = 0;

element_count = index_->ntotal;

if (element_count == 0) {
if (index_->ntotal == 0) {
return false;
}

Expand Down
4 changes: 2 additions & 2 deletions src/vector/vector_index_ivf_pq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ butil::Status VectorIndexIvfPq::Load(const std::string& path) {

BvarLatencyGuard bvar_guard(&g_ivf_pq_load_latency);

// The outside has been locked. Remove the locking operation here.

// first ivf pq
auto internal_index_raw_ivf_pq =
std::make_unique<VectorIndexRawIvfPq>(id, vector_index_parameter, epoch, range, thread_pool);
Expand Down Expand Up @@ -486,9 +484,11 @@ bool VectorIndexIvfPq::IsTrainedImpl() {
switch (inner_index_type_) {
case IndexTypeInIvfPq::kFlat: {
is_trained = index_flat_->IsTrained();
break;
}
case IndexTypeInIvfPq::kIvfPq: {
is_trained = index_raw_ivf_pq_->IsTrained();
break;
}
case IndexTypeInIvfPq::kUnknow:
[[fallthrough]];
Expand Down
15 changes: 9 additions & 6 deletions src/vector/vector_index_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void RebuildVectorIndexTask::Run() {
return;
}

if (!force_) {
if (is_double_check_) {
if (!vector_index_wrapper_->IsOwnReady()) {
DINGO_LOG(INFO) << fmt::format(
"[vector_index.rebuild][index_id({})][trace({})] vector index is not ready, gave up rebuild.",
Expand All @@ -123,7 +123,9 @@ void RebuildVectorIndexTask::Run() {
vector_index_wrapper_->Id(), trace_);
return;
}
} else {
}

if (!is_force_) {
// Compare vector index snapshot epoch and region epoch.
auto snapshot_set = vector_index_wrapper_->SnapshotSet();
if (snapshot_set != nullptr) {
Expand Down Expand Up @@ -164,7 +166,7 @@ void RebuildVectorIndexTask::Run() {
vector_index_wrapper_->SetIsTempHoldVectorIndex(false);
ADD_REGION_CHANGE_RECORD_TIMEPOINT(job_id_, fmt::format("Saved vector index {}", region->Id()));

if (force_) {
if (is_clear_) {
if (!VectorIndexWrapper::IsPermanentHoldVectorIndex(vector_index_wrapper_->Id())) {
vector_index_wrapper_->ClearVectorIndex(trace_);
}
Expand Down Expand Up @@ -1222,6 +1224,7 @@ VectorIndexPtr VectorIndexManager::BuildVectorIndex(VectorIndexWrapperPtr vector
}

void VectorIndexManager::LaunchRebuildVectorIndex(VectorIndexWrapperPtr vector_index_wrapper, int64_t job_id,
bool is_double_check, bool is_force, bool is_clear,
const std::string& trace) {
assert(vector_index_wrapper != nullptr);

Expand All @@ -1231,8 +1234,8 @@ void VectorIndexManager::LaunchRebuildVectorIndex(VectorIndexWrapperPtr vector_i
vector_index_wrapper->Id(), vector_index_wrapper->RebuildingNum(), vector_index_wrapper->PendingTaskNum(),
GetVectorIndexTaskRunningNum(), trace);

auto task =
std::make_shared<RebuildVectorIndexTask>(vector_index_wrapper, job_id, fmt::format("{}-{}", job_id, trace));
auto task = std::make_shared<RebuildVectorIndexTask>(vector_index_wrapper, job_id, is_double_check, is_force,
is_clear, fmt::format("{}-{}", job_id, trace));
if (!Server::GetInstance().GetVectorIndexManager()->ExecuteTask(vector_index_wrapper->Id(), task)) {
DINGO_LOG(ERROR) << fmt::format("[vector_index.launch][index_id({})][trace({})] Launch rebuild vector index failed",
vector_index_wrapper->Id(), job_id);
Expand Down Expand Up @@ -1530,7 +1533,7 @@ butil::Status VectorIndexManager::ScrubVectorIndex() {
if (need_rebuild && vector_index_wrapper->RebuildingNum() == 0) {
DINGO_LOG(INFO) << fmt::format("[vector_index.scrub][index_id({})] need rebuild, do rebuild vector index.",
vector_index_id);
LaunchRebuildVectorIndex(vector_index_wrapper, 0, "from scrub");
LaunchRebuildVectorIndex(vector_index_wrapper, 0, true, false, false, "from scrub");
continue;
}

Expand Down
20 changes: 15 additions & 5 deletions src/vector/vector_index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ namespace dingodb {
// Rebuild vector index task
class RebuildVectorIndexTask : public TaskRunnable {
public:
RebuildVectorIndexTask(VectorIndexWrapperPtr vector_index_wrapper, int64_t job_id, const std::string& trace)
: vector_index_wrapper_(vector_index_wrapper), force_(job_id > 0), job_id_(job_id), trace_(trace) {
RebuildVectorIndexTask(VectorIndexWrapperPtr vector_index_wrapper, int64_t job_id, bool is_double_check,
bool is_force, bool is_clear, const std::string& trace)
: vector_index_wrapper_(vector_index_wrapper),
is_double_check_(is_double_check),
is_force_(is_force),
is_clear_(is_clear),
job_id_(job_id),
trace_(trace) {
start_time_ = Helper::TimestampMs();
}
~RebuildVectorIndexTask() override = default;
Expand All @@ -47,7 +53,11 @@ class RebuildVectorIndexTask : public TaskRunnable {

private:
VectorIndexWrapperPtr vector_index_wrapper_;
bool force_;

bool is_double_check_;
bool is_force_;
bool is_clear_;

int64_t job_id_{0};
std::string trace_;
int64_t start_time_;
Expand Down Expand Up @@ -196,8 +206,8 @@ class VectorIndexManager {
// Invoke when server running.
static butil::Status RebuildVectorIndex(VectorIndexWrapperPtr vector_index_wrapper, const std::string& trace);
// Launch rebuild vector index at execute queue.
static void LaunchRebuildVectorIndex(VectorIndexWrapperPtr vector_index_wrapper, int64_t job_id,
const std::string& trace);
static void LaunchRebuildVectorIndex(VectorIndexWrapperPtr vector_index_wrapper, int64_t job_id, bool is_double_check,
bool is_force, bool is_clear, const std::string& trace);
static void LaunchBuildVectorIndex(VectorIndexWrapperPtr vector_index_wrapper, bool is_temp_hold_vector_index,
bool is_fast_build, int64_t job_id, const std::string& trace);

Expand Down
10 changes: 2 additions & 8 deletions src/vector/vector_index_raw_ivf_pq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ butil::Status VectorIndexRawIvfPq::Train(std::vector<float>& train_datas) {

if (normalize_) {
for (size_t i = 0; i < data_size; i++) {
VectorIndexUtils::NormalizeVectorForFaiss(const_cast<float*>(train_datas.data()) + i * dimension_, dimension_);
VectorIndexUtils::NormalizeVectorForFaiss(train_datas.data() + i * dimension_, dimension_);
}
}

Expand Down Expand Up @@ -516,8 +516,6 @@ bool VectorIndexRawIvfPq::NeedToRebuild() {
RWLockReadGuard guard(&rw_lock_);

if (BAIDU_UNLIKELY(!IsTrainedImpl())) {
std::string s = fmt::format("not trained");
DINGO_LOG(WARNING) << s;
return false;
}

Expand All @@ -536,11 +534,7 @@ bool VectorIndexRawIvfPq::NeedToSave(int64_t last_save_log_behind) {
return false;
}

int64_t element_count = 0;

element_count = index_->ntotal;

if (element_count == 0) {
if (index_->ntotal == 0) {
return false;
}

Expand Down

0 comments on commit eb8d31d

Please sign in to comment.