Skip to content

Commit

Permalink
[fix] (inverted index) fix index file in gc binlogs
Browse files Browse the repository at this point in the history
  • Loading branch information
csun5285 committed Dec 6, 2024
1 parent 4512cb0 commit 7b3245e
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 39 deletions.
13 changes: 11 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,17 @@ void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
if (tablet->tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
std::stoll(segment_index_id), "");
} else {
DCHECK(segment_index_id == "-1");
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
}
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,10 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema.inverted_indexes()) {
auto index_id = index->index_id();
auto index_file = ref_tablet->get_segment_index_filepath(
rowset_id, segment_index, index_id);
auto index_file = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(
segment_file_path),
index_id, "");
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}_{}.binlog-index", schema_full_path, rowset_id,
segment_index, index_id);
Expand Down
44 changes: 16 additions & 28 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2585,30 +2585,6 @@ std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index);
}

std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
std::string_view segment_index,
std::string_view index_id) const {
auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format();
if (format == doris::InvertedIndexStorageFormatPB::V1) {
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index,
index_id);
} else {
return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index);
}
}

std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
int64_t index_id) const {
auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format();
if (format == doris::InvertedIndexStorageFormatPB::V1) {
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index,
index_id);
} else {
DCHECK(index_id == -1);
return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index);
}
}

std::vector<std::string> Tablet::get_binlog_filepath(std::string_view binlog_version) const {
const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version);
std::vector<std::string> binlog_filepath;
Expand Down Expand Up @@ -2653,10 +2629,22 @@ void Tablet::gc_binlogs(int64_t version) {

// add binlog segment files and index files
for (int64_t i = 0; i < num_segments; ++i) {
wait_for_deleted_binlog_files.emplace_back(get_segment_filepath(rowset_id, i));
for (const auto& index : this->tablet_schema()->inverted_indexes()) {
wait_for_deleted_binlog_files.emplace_back(
get_segment_index_filepath(rowset_id, i, index->index_id()));
auto segment_file_path = get_segment_filepath(rowset_id, i);
wait_for_deleted_binlog_files.emplace_back(segment_file_path);

// index files
if (tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema()->inverted_indexes()) {
auto index_file = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
index->index_id(), "");
wait_for_deleted_binlog_files.emplace_back(index_file);
}
} else if (tablet_schema()->has_inverted_index()) {
auto index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
wait_for_deleted_binlog_files.emplace_back(index_file);
}
}
};
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,6 @@ class Tablet final : public BaseTablet {
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const;
std::string get_segment_index_filepath(std::string_view rowset_id,
std::string_view segment_index,
std::string_view index_id) const;
std::string get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
int64_t index_id) const;
bool can_add_binlog(uint64_t total_binlog_size) const;
void gc_binlogs(int64_t version);
Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,9 @@ Status EngineStorageMigrationTask::_copy_index_and_data_files(
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema.inverted_indexes()) {
auto index_id = index->index_id();
auto index_file =
_tablet->get_segment_index_filepath(rowset_id, segment_index, index_id);
auto index_file = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
index_id, "");
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id,
segment_index, index_id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"

namespace doris {

using namespace doris::vectorized;

constexpr static uint32_t MAX_PATH_LEN = 1024;
constexpr static std::string_view dest_dir = "./ut_dir/inverted_index_test";
constexpr static std::string_view tmp_dir = "./ut_dir/tmp";
static int64_t inc_id = 0;

class IndexGcBinglogsTest : public ::testing::Test {
protected:
void SetUp() override {
// absolute dir
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
_curreent_dir = std::string(buffer);
_absolute_dir = _curreent_dir + std::string(dest_dir);
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok());

// tmp dir
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
std::vector<StorePath> paths;
paths.emplace_back(std::string(tmp_dir), 1024000000);
auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
EXPECT_TRUE(tmp_file_dirs->init().ok());
ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));

// storage engine
doris::EngineOptions options;
auto engine = std::make_unique<StorageEngine>(options);
_engine_ref = engine.get();
_data_dir = std::make_unique<DataDir>(*_engine_ref, _absolute_dir);
static_cast<void>(_data_dir->update_capacity());
EXPECT_TRUE(_data_dir->init(true).ok());
ExecEnv::GetInstance()->set_storage_engine(std::move(engine));

// tablet_schema
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(KeysType::DUP_KEYS);
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);

construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0,
"INT", "key");
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1,
"STRING", "v1");

_tablet_schema.reset(new TabletSchema);
_tablet_schema->init_from_pb(schema_pb);

// tablet
TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));

tablet_meta->set_tablet_uid(TabletUid(20, 20));
tablet_meta.get()->_tablet_id = 200;
_tablet.reset(new Tablet(*_engine_ref, tablet_meta, _data_dir.get()));
EXPECT_TRUE(_tablet->init().ok());
}
void TearDown() override {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
_engine_ref = nullptr;
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}

void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id,
const std::string& index_name, int32_t col_unique_id,
const std::string& column_type, const std::string& column_name,
bool parser = false) {
column_pb->set_unique_id(col_unique_id);
column_pb->set_name(column_name);
column_pb->set_type(column_type);
column_pb->set_is_key(false);
column_pb->set_is_nullable(true);
tablet_index->set_index_id(index_id);
tablet_index->set_index_name(index_name);
tablet_index->set_index_type(IndexType::INVERTED);
tablet_index->add_col_unique_id(col_unique_id);
if (parser) {
auto* properties = tablet_index->mutable_properties();
(*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE;
}
}

RowsetWriterContext rowset_writer_context() {
RowsetWriterContext context;
RowsetId rowset_id;
rowset_id.init(inc_id);
context.rowset_id = rowset_id;
context.rowset_type = BETA_ROWSET;
context.data_dir = _data_dir.get();
context.rowset_state = VISIBLE;
context.tablet_schema = _tablet_schema;
context.tablet_path = _tablet->tablet_path();
context.version = Version(inc_id, inc_id);
context.max_rows_per_segment = 200;
inc_id++;
return context;
}

IndexGcBinglogsTest() = default;
~IndexGcBinglogsTest() override = default;

private:
TabletSchemaSPtr _tablet_schema = nullptr;
StorageEngine* _engine_ref = nullptr;
std::unique_ptr<DataDir> _data_dir = nullptr;
TabletSharedPtr _tablet = nullptr;
std::string _absolute_dir;
std::string _curreent_dir;
};

TEST_F(IndexGcBinglogsTest, gc_binlogs_test) {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());

RowsetSharedPtr rowset;
const auto& res =
RowsetFactory::create_rowset_writer(*_engine_ref, rowset_writer_context(), false);
EXPECT_TRUE(res.has_value()) << res.error();
const auto& rowset_writer = res.value();

Block block = _tablet_schema->create_block();
auto columns = block.mutate_columns();

vectorized::Field key = 10;
vectorized::Field v1 = "v1";
columns[0]->insert(key);
columns[1]->insert(v1);

EXPECT_TRUE(rowset_writer->add_block(&block).ok());
EXPECT_TRUE(rowset_writer->flush().ok());
EXPECT_TRUE(rowset_writer->build(rowset).ok());
EXPECT_TRUE(rowset->add_to_binlog().ok());
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());
EXPECT_TRUE(RowsetMetaManager::save(_data_dir->get_meta(), _tablet->tablet_uid(),
rowset->rowset_id(), rowset->rowset_meta()->get_rowset_pb(),
true)
.ok());
_tablet->save_meta();
_tablet->gc_binlogs(0);
}

} // namespace doris

0 comments on commit 7b3245e

Please sign in to comment.