Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] (inverted index) fix index file in gc binlogs #44994

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,17 @@ void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
if (tablet->tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
std::stoll(segment_index_id), "");
} else {
DCHECK(segment_index_id == "-1");
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
}
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,10 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema.inverted_indexes()) {
auto index_id = index->index_id();
auto index_file = ref_tablet->get_segment_index_filepath(
rowset_id, segment_index, index_id);
auto index_file = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(
segment_file_path),
index_id, index->get_index_suffix());
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}_{}.binlog-index", schema_full_path, rowset_id,
segment_index, index_id);
Expand Down
44 changes: 16 additions & 28 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2595,30 +2595,6 @@ std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index);
}

std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
std::string_view segment_index,
std::string_view index_id) const {
auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format();
if (format == doris::InvertedIndexStorageFormatPB::V1) {
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index,
index_id);
} else {
return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index);
}
}

std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
int64_t index_id) const {
auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format();
if (format == doris::InvertedIndexStorageFormatPB::V1) {
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index,
index_id);
} else {
DCHECK(index_id == -1);
return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index);
}
}

std::vector<std::string> Tablet::get_binlog_filepath(std::string_view binlog_version) const {
const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version);
std::vector<std::string> binlog_filepath;
Expand Down Expand Up @@ -2663,10 +2639,22 @@ void Tablet::gc_binlogs(int64_t version) {

// add binlog segment files and index files
for (int64_t i = 0; i < num_segments; ++i) {
wait_for_deleted_binlog_files.emplace_back(get_segment_filepath(rowset_id, i));
for (const auto& index : this->tablet_schema()->inverted_indexes()) {
wait_for_deleted_binlog_files.emplace_back(
get_segment_index_filepath(rowset_id, i, index->index_id()));
auto segment_file_path = get_segment_filepath(rowset_id, i);
wait_for_deleted_binlog_files.emplace_back(segment_file_path);

// index files
if (tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema()->inverted_indexes()) {
auto index_file = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
index->index_id(), index->get_index_suffix());
wait_for_deleted_binlog_files.emplace_back(index_file);
}
} else if (tablet_schema()->has_inverted_index()) {
auto index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
wait_for_deleted_binlog_files.emplace_back(index_file);
}
}
};
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,6 @@ class Tablet final : public BaseTablet {
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const;
std::string get_segment_index_filepath(std::string_view rowset_id,
std::string_view segment_index,
std::string_view index_id) const;
std::string get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
int64_t index_id) const;
bool can_add_binlog(uint64_t total_binlog_size) const;
void gc_binlogs(int64_t version);
Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,9 @@ Status EngineStorageMigrationTask::_copy_index_and_data_files(
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema.inverted_indexes()) {
auto index_id = index->index_id();
auto index_file =
_tablet->get_segment_index_filepath(rowset_id, segment_index, index_id);
auto index_file = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
index_id, "");
csun5285 marked this conversation as resolved.
Show resolved Hide resolved
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id,
segment_index, index_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>
csun5285 marked this conversation as resolved.
Show resolved Hide resolved

#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"

namespace doris {

using namespace doris::vectorized;

constexpr static uint32_t MAX_PATH_LEN = 1024;
constexpr static std::string_view dest_dir = "./ut_dir/inverted_index_test";
constexpr static std::string_view tmp_dir = "./ut_dir/tmp";
static int64_t inc_id = 0;

class IndexGcBinglogsTest : public ::testing::Test {
protected:
void SetUp() override {
// absolute dir
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
_curreent_dir = std::string(buffer);
_absolute_dir = _curreent_dir + std::string(dest_dir);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok());

// tmp dir
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
std::vector<StorePath> paths;
paths.emplace_back(std::string(tmp_dir), 1024000000);
auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
EXPECT_TRUE(tmp_file_dirs->init().ok());
ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));

// storage engine
doris::EngineOptions options;
auto engine = std::make_unique<StorageEngine>(options);
_engine_ref = engine.get();
_data_dir = std::make_unique<DataDir>(*_engine_ref, _absolute_dir);
static_cast<void>(_data_dir->update_capacity());
EXPECT_TRUE(_data_dir->init(true).ok());
ExecEnv::GetInstance()->set_storage_engine(std::move(engine));

// tablet_schema
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(KeysType::DUP_KEYS);
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
csun5285 marked this conversation as resolved.
Show resolved Hide resolved

construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0,
"INT", "key");
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1,
"STRING", "v1");

_tablet_schema.reset(new TabletSchema);
_tablet_schema->init_from_pb(schema_pb);

// tablet
TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));

tablet_meta->set_tablet_uid(TabletUid(20, 20));
tablet_meta.get()->_tablet_id = 200;
_tablet.reset(new Tablet(*_engine_ref, tablet_meta, _data_dir.get()));
EXPECT_TRUE(_tablet->init().ok());
}
void TearDown() override {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
_engine_ref = nullptr;
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}

void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id,
const std::string& index_name, int32_t col_unique_id,
const std::string& column_type, const std::string& column_name,
bool parser = false) {
column_pb->set_unique_id(col_unique_id);
column_pb->set_name(column_name);
column_pb->set_type(column_type);
column_pb->set_is_key(false);
column_pb->set_is_nullable(true);
tablet_index->set_index_id(index_id);
tablet_index->set_index_name(index_name);
tablet_index->set_index_type(IndexType::INVERTED);
tablet_index->add_col_unique_id(col_unique_id);
if (parser) {
auto* properties = tablet_index->mutable_properties();
(*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE;
}
}

RowsetWriterContext rowset_writer_context() {
RowsetWriterContext context;
RowsetId rowset_id;
rowset_id.init(inc_id);
context.rowset_id = rowset_id;
context.rowset_type = BETA_ROWSET;
context.data_dir = _data_dir.get();
context.rowset_state = VISIBLE;
context.tablet_schema = _tablet_schema;
context.tablet_path = _tablet->tablet_path();
context.version = Version(inc_id, inc_id);
context.max_rows_per_segment = 200;
inc_id++;
return context;
}

IndexGcBinglogsTest() = default;
~IndexGcBinglogsTest() override = default;

private:
TabletSchemaSPtr _tablet_schema = nullptr;
StorageEngine* _engine_ref = nullptr;
std::unique_ptr<DataDir> _data_dir = nullptr;
TabletSharedPtr _tablet = nullptr;
std::string _absolute_dir;
std::string _curreent_dir;
};

TEST_F(IndexGcBinglogsTest, gc_binlogs_test) {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());

RowsetSharedPtr rowset;
const auto& res =
RowsetFactory::create_rowset_writer(*_engine_ref, rowset_writer_context(), false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();

Block block = _tablet_schema->create_block();
auto columns = block.mutate_columns();

vectorized::Field key = 10;
vectorized::Field v1 = "v1";
columns[0]->insert(key);
columns[1]->insert(v1);

EXPECT_TRUE(rowset_writer->add_block(&block).ok());
EXPECT_TRUE(rowset_writer->flush().ok());
EXPECT_TRUE(rowset_writer->build(rowset).ok());
EXPECT_TRUE(rowset->add_to_binlog().ok());
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());
EXPECT_TRUE(RowsetMetaManager::save(_data_dir->get_meta(), _tablet->tablet_uid(),
rowset->rowset_id(), rowset->rowset_meta()->get_rowset_pb(),
true)
.ok());
_tablet->save_meta();
_tablet->gc_binlogs(0);
}

} // namespace doris
Loading