Skip to content

Commit

Permalink
[fix][index] Fix DINGODB-2346 issue. Fixed the crash problem when
Browse files Browse the repository at this point in the history
calling try load atomic variables in index process.
The reason is that asynchronous thread is used to access the destructor
object members. Change to shared_ptr mode for passing.
Other similar problems have also been fixed.
  • Loading branch information
yuhaijun999 authored and ketor committed Nov 11, 2024
1 parent 0d4c576 commit a91794c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
13 changes: 9 additions & 4 deletions src/diskann/diskann_item.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ DiskANNItem::~DiskANNItem() {
#endif
}

std::shared_ptr<DiskANNItem> DiskANNItem::GetSelf() { return shared_from_this(); }

butil::Status DiskANNItem::Import(std::shared_ptr<Context> ctx, const std::vector<pb::common::Vector>& vectors,
const std::vector<int64_t>& vector_ids, bool has_more,
bool /*force_to_load_data_if_exist*/, int64_t already_send_vector_count, int64_t ts,
Expand Down Expand Up @@ -876,8 +878,9 @@ butil::Status DiskANNItem::DoSyncBuild(std::shared_ptr<Context> ctx, bool force_
}

butil::Status DiskANNItem::DoAsyncBuild(std::shared_ptr<Context> ctx, bool force_to_build, DiskANNCoreState old_state) {
auto lambda_call = [this, force_to_build, old_state, ctx]() {
this->DoBuildInternal(ctx, force_to_build, old_state);
std::shared_ptr<DiskANNItem> self = GetSelf();
auto lambda_call = [self, force_to_build, old_state, ctx]() {
self->DoBuildInternal(ctx, force_to_build, old_state);
};

#if defined(ENABLE_DISKANN_ITEM_PTHREAD)
Expand Down Expand Up @@ -966,7 +969,8 @@ butil::Status DiskANNItem::DoSyncLoad(std::shared_ptr<Context> ctx, const pb::co

butil::Status DiskANNItem::DoAsyncLoad(std::shared_ptr<Context> ctx, const pb::common::LoadDiskAnnParam& load_param,
DiskANNCoreState old_state) {
auto lambda_call = [this, &load_param, old_state, ctx]() { this->DoLoadInternal(ctx, load_param, old_state); };
std::shared_ptr<DiskANNItem> self = GetSelf();
auto lambda_call = [self, &load_param, old_state, ctx]() { self->DoLoadInternal(ctx, load_param, old_state); };

#if defined(ENABLE_DISKANN_ITEM_PTHREAD)
std::thread th(lambda_call);
Expand Down Expand Up @@ -1032,7 +1036,8 @@ butil::Status DiskANNItem::DoSyncTryLoad(std::shared_ptr<Context> ctx, const pb:

butil::Status DiskANNItem::DoAsyncTryLoad(std::shared_ptr<Context> ctx, const pb::common::LoadDiskAnnParam& load_param,
DiskANNCoreState old_state) {
auto lambda_call = [this, &load_param, old_state, ctx]() { this->DoTryLoadInternal(ctx, load_param, old_state); };
std::shared_ptr<DiskANNItem> self = GetSelf();
auto lambda_call = [self, &load_param, old_state, ctx]() { self->DoTryLoadInternal(ctx, load_param, old_state); };

#if defined(ENABLE_DISKANN_ITEM_PTHREAD)
std::thread th(lambda_call);
Expand Down
4 changes: 3 additions & 1 deletion src/diskann/diskann_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace dingodb {

// #undef ENABLE_DISKANN_ID_MAPPING

class DiskANNItem {
class DiskANNItem : public std::enable_shared_from_this<DiskANNItem> {
public:
explicit DiskANNItem(std::shared_ptr<Context> ctx, int64_t vector_index_id,
const pb::common::VectorIndexParameter& vector_index_parameter, u_int32_t num_threads,
Expand All @@ -53,6 +53,8 @@ class DiskANNItem {
DiskANNItem(DiskANNItem&& rhs) = delete;
DiskANNItem& operator=(DiskANNItem&& rhs) = delete;

std::shared_ptr<DiskANNItem> GetSelf();

butil::Status Import(std::shared_ptr<Context> ctx, const std::vector<pb::common::Vector>& vectors,
const std::vector<int64_t>& vector_ids, bool has_more, bool force_to_load_data_if_exist,
int64_t already_send_vector_count, int64_t ts, int64_t tso,
Expand Down
14 changes: 9 additions & 5 deletions src/vector/vector_index_diskann.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,10 @@ butil::Status VectorIndexDiskANN::Build(const pb::common::Range& region_range, m

if (!FLAGS_diskann_build_sync_internal) {
internal_state = pb::common::DiskANNCoreState::BUILDING;
auto task = std::make_shared<ServiceTask>([this, region_range, reader, parameter, ts]() {
std::shared_ptr<VectorIndexDiskANN> self = GetSelf();
auto task = std::make_shared<ServiceTask>([self, region_range, reader, parameter, ts]() {
pb::common::DiskANNCoreState state;
auto status = DoBuild(region_range, reader, parameter, ts, state);
auto status = self->DoBuild(region_range, reader, parameter, ts, state);
(void)status;
});

Expand Down Expand Up @@ -502,14 +503,15 @@ butil::Status VectorIndexDiskANN::Load(const pb::common::VectorLoadParameter& pa

if (!FLAGS_diskann_load_sync_internal) {
internal_state = pb::common::DiskANNCoreState::LOADING;
auto task = std::make_shared<ServiceTask>([this, internal_parameter]() {
std::shared_ptr<VectorIndexDiskANN> self = GetSelf();
auto task = std::make_shared<ServiceTask>([self, internal_parameter]() {
pb::common::DiskANNCoreState state;
butil::Status status;
// load index rpc
if (internal_parameter.diskann().direct_load_without_build()) {
status = SendVectorTryLoadRequestWrapper(internal_parameter, state);
status = self->SendVectorTryLoadRequestWrapper(internal_parameter, state);
} else {
status = SendVectorLoadRequestWrapper(internal_parameter, state);
status = self->SendVectorLoadRequestWrapper(internal_parameter, state);
}
if (!status.ok()) {
LOG(ERROR) << "[" << __PRETTY_FUNCTION__ << "] " << status.error_cstr();
Expand Down Expand Up @@ -688,6 +690,8 @@ butil::Status VectorIndexDiskANN::Dump(bool dump_all, std::vector<std::string>&
}
}

std::shared_ptr<VectorIndexDiskANN> VectorIndexDiskANN::GetSelf() { return shared_from_this(); }

butil::Status VectorIndexDiskANN::Save(const std::string& /*path*/) {
return butil::Status(pb::error::Errno::EVECTOR_NOT_SUPPORT, "not support in DiskANN!!!");
}
Expand Down
4 changes: 3 additions & 1 deletion src/vector/vector_index_diskann.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace dingodb {

#undef TEST_VECTOR_INDEX_DISKANN_MOCK

class VectorIndexDiskANN : public VectorIndex {
class VectorIndexDiskANN : public VectorIndex, public std::enable_shared_from_this<VectorIndexDiskANN> {
public:
explicit VectorIndexDiskANN(int64_t id, const pb::common::VectorIndexParameter& vector_index_parameter,
const pb::common::RegionEpoch& epoch, const pb::common::Range& range,
Expand All @@ -51,6 +51,8 @@ class VectorIndexDiskANN : public VectorIndex {

static void Init();

std::shared_ptr<VectorIndexDiskANN> GetSelf();

butil::Status Save(const std::string& path) override;
butil::Status Load(const std::string& path) override;

Expand Down

0 comments on commit a91794c

Please sign in to comment.