Skip to content

Commit

Permalink
[feture](inverted index) Adding Storage Format V3 for Inverted Index
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Nov 29, 2024
1 parent c4288a0 commit b2545f8
Show file tree
Hide file tree
Showing 23 changed files with 365 additions and 33 deletions.
2 changes: 1 addition & 1 deletion be/src/clucene
Submodule clucene updated 642 files
9 changes: 9 additions & 0 deletions be/src/olap/inverted_index_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,13 @@ std::string get_parser_stopwords_from_properties(
}
}

std::string get_parser_dict_compression_from_properties(
const std::map<std::string, std::string>& properties) {
if (properties.find(INVERTED_INDEX_PARSER_DICT_COMPRESSION_KEY) != properties.end()) {
return properties.at(INVERTED_INDEX_PARSER_DICT_COMPRESSION_KEY);
} else {
return "";
}
}

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/olap/inverted_index_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ const std::string INVERTED_INDEX_PARSER_LOWERCASE_KEY = "lower_case";

const std::string INVERTED_INDEX_PARSER_STOPWORDS_KEY = "stopwords";

const std::string INVERTED_INDEX_PARSER_DICT_COMPRESSION_KEY = "dict_compression";

std::string inverted_index_parser_type_to_string(InvertedIndexParserType parser_type);

InvertedIndexParserType get_inverted_index_parser_type_from_string(const std::string& parser_str);
Expand Down Expand Up @@ -119,4 +121,7 @@ std::string get_parser_lowercase_from_properties(
std::string get_parser_stopwords_from_properties(
const std::map<std::string, std::string>& properties);

std::string get_parser_dict_compression_from_properties(
const std::map<std::string, std::string>& properties);

} // namespace doris
18 changes: 16 additions & 2 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,10 +703,24 @@ Status BetaRowset::show_nested_index_file(rapidjson::Value* rowset_value,
rapidjson::Document::AllocatorType& allocator) {
const auto& fs = _rowset_meta->fs();
auto storage_format = _schema->get_inverted_index_storage_format();
auto format_str = storage_format == InvertedIndexStorageFormatPB::V1 ? "V1" : "V2";
std::string format_str;
switch (storage_format) {
case InvertedIndexStorageFormatPB::V1:
format_str = "V1";
break;
case InvertedIndexStorageFormatPB::V2:
format_str = "V2";
break;
case InvertedIndexStorageFormatPB::V3:
format_str = "V3";
break;
default:
return Status::InternalError("inverted index storage format error");
break;
}
auto rs_id = rowset_id().to_string();
rowset_value->AddMember("rowset_id", rapidjson::Value(rs_id.c_str(), allocator), allocator);
rowset_value->AddMember("index_storage_format", rapidjson::Value(format_str, allocator),
rowset_value->AddMember("index_storage_format", rapidjson::Value(format_str.c_str(), allocator),
allocator);
rapidjson::Value segments(rapidjson::kArrayType);
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

#include <CLucene.h>
#include <CLucene.h> // IWYU pragma: keep
#include <CLucene/analysis/CharFilter.h>

#include <bitset>
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ namespace doris::segment_v2 {
Status InvertedIndexFileReader::init(int32_t read_buffer_size) {
if (!_inited) {
_read_buffer_size = read_buffer_size;
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
auto st = _init_from_v2(read_buffer_size);
if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
auto st = _init_from(read_buffer_size);
if (!st.ok()) {
return st;
}
Expand All @@ -41,7 +41,7 @@ Status InvertedIndexFileReader::init(int32_t read_buffer_size) {
return Status::OK();
}

Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) {
Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size) {
auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);

std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
Expand Down Expand Up @@ -79,7 +79,7 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) {

// 3. read file
int32_t version = _stream->readInt(); // Read version number
if (version == InvertedIndexStorageFormatPB::V2) {
if (version >= InvertedIndexStorageFormatPB::V2) {
DCHECK(version == _storage_format);
int32_t numIndices = _stream->readInt(); // Read number of indices
ReaderFileEntry* entry = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class InvertedIndexFileReader {
int64_t get_inverted_file_size() const { return _stream == nullptr ? 0 : _stream->length(); }

private:
Status _init_from_v2(int32_t read_buffer_size);
Status _init_from(int32_t read_buffer_size);
Result<std::unique_ptr<DorisCompoundReader>> _open(int64_t index_id,
const std::string& index_suffix) const;

Expand Down
22 changes: 11 additions & 11 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Status InvertedIndexFileWriter::close() {
}
} else {
try {
RETURN_IF_ERROR(write_v2());
RETURN_IF_ERROR(write());
for (const auto& entry : _indices_dirs) {
const auto& dir = entry.second;
// delete index path, which contains separated inverted index files
Expand Down Expand Up @@ -293,18 +293,18 @@ Status InvertedIndexFileWriter::write_v1() {
return Status::OK();
}

Status InvertedIndexFileWriter::write_v2() {
Status InvertedIndexFileWriter::write() {
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir = nullptr;
std::unique_ptr<lucene::store::IndexOutput> compound_file_output = nullptr;
ErrorContext error_context;
try {
// Calculate header length and initialize offset
int64_t current_offset = headerLength();
// Prepare file metadata
auto file_metadata = prepare_file_metadata_v2(current_offset);
auto file_metadata = prepare_file_metadata(current_offset);

// Create output stream
auto result = create_output_stream_v2();
auto result = create_output_stream();
out_dir = std::move(result.first);
compound_file_output = std::move(result.second);

Expand All @@ -315,7 +315,7 @@ Status InvertedIndexFileWriter::write_v2() {
write_index_headers_and_metadata(compound_file_output.get(), file_metadata);

// Copy file data
copy_files_data_v2(compound_file_output.get(), file_metadata);
copy_files_data(compound_file_output.get(), file_metadata);

_total_file_size = compound_file_output->getFilePointer();
_file_info.set_index_size(_total_file_size);
Expand Down Expand Up @@ -470,7 +470,7 @@ void InvertedIndexFileWriter::write_header_and_data_v1(lucene::store::IndexOutpu

std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>>
InvertedIndexFileWriter::create_output_stream_v2() {
InvertedIndexFileWriter::create_output_stream() {
io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};

auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str());
Expand All @@ -486,15 +486,15 @@ InvertedIndexFileWriter::create_output_stream_v2() {

void InvertedIndexFileWriter::write_version_and_indices_count(lucene::store::IndexOutput* output) {
// Write the version number
output->writeInt(InvertedIndexStorageFormatPB::V2);
output->writeInt(_storage_format);

// Write the number of indices
const auto num_indices = static_cast<uint32_t>(_indices_dirs.size());
output->writeInt(num_indices);
}

std::vector<InvertedIndexFileWriter::FileMetadata>
InvertedIndexFileWriter::prepare_file_metadata_v2(int64_t& current_offset) {
std::vector<InvertedIndexFileWriter::FileMetadata> InvertedIndexFileWriter::prepare_file_metadata(
int64_t& current_offset) {
std::vector<FileMetadata> file_metadata;

for (const auto& entry : _indices_dirs) {
Expand Down Expand Up @@ -546,8 +546,8 @@ void InvertedIndexFileWriter::write_index_headers_and_metadata(
}
}

void InvertedIndexFileWriter::copy_files_data_v2(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata) {
void InvertedIndexFileWriter::copy_files_data(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata) {
const int64_t buffer_length = 16384;
uint8_t buffer[buffer_length];

Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class InvertedIndexFileWriter {
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
virtual ~InvertedIndexFileWriter() = default;
Status write_v2();
Status write();
Status write_v1();
Status close();
const InvertedIndexFileInfo* get_index_file_info() const {
Expand Down Expand Up @@ -122,7 +122,7 @@ class InvertedIndexFileWriter {
// Helper functions specific to write_v2
virtual std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>>
create_output_stream_v2();
create_output_stream();
void write_version_and_indices_count(lucene::store::IndexOutput* output);
struct FileMetadata {
int64_t index_id;
Expand All @@ -141,11 +141,11 @@ class InvertedIndexFileWriter {
length(len),
directory(dir) {}
};
std::vector<FileMetadata> prepare_file_metadata_v2(int64_t& current_offset);
std::vector<FileMetadata> prepare_file_metadata(int64_t& current_offset);
virtual void write_index_headers_and_metadata(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
void copy_files_data_v2(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
void copy_files_data(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
Status _insert_directory_into_map(int64_t index_id, const std::string& index_suffix,
std::shared_ptr<DorisFSDirectory> dir);
// Member variables...
Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,28 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
(*field)->setOmitTermFreqAndPositions(
!(get_parser_phrase_support_string_from_properties(_index_meta->properties()) ==
INVERTED_INDEX_PARSER_PHRASE_SUPPORT_YES));
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::create_field_v3", {
if (_index_file_writer->get_storage_format() != InvertedIndexStorageFormatPB::V3) {
return Status::Error<doris::ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: InvertedIndexColumnWriterImpl::create_field_v3 error");
}
})
if (_index_file_writer->get_storage_format() >= InvertedIndexStorageFormatPB::V3) {
(*field)->setIndexVersion(IndexVersion::kV3);
// Only effective in v3
std::string dict_compression =
get_parser_phrase_support_string_from_properties(_index_meta->properties());
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::create_field_dic_compression", {
if (dict_compression != INVERTED_INDEX_PARSER_TRUE) {
return Status::Error<doris::ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: "
"InvertedIndexColumnWriterImpl::create_field_dic_compression error");
}
})
if (dict_compression == INVERTED_INDEX_PARSER_TRUE) {
(*field)->updateFlag(FlagBits::DICT_COMPRESS);
}
}
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
case TInvertedIndexFileStorageFormat::V2:
schema->set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
break;
case TInvertedIndexFileStorageFormat::V3:
schema->set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V3);
break;
default:
schema->set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ TEST_F(PhraseQueryTest, test_parser_info) {
EXPECT_EQ(query_info.slop, res3);
EXPECT_EQ(query_info.ordered, res4);
EXPECT_EQ(query_info.additional_terms.size(), res5);
std::cout << "--- 1 ---: " << query_info.to_string() << std::endl;
};

// "english/history off.gif ~20+" sequential_opt = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ TEST_F(InvertedIndexFileWriterTest, WriteV2ExceptionHandlingTest) {
EXPECT_CALL(writer_mock, write_index_headers_and_metadata(::testing::_, ::testing::_))
.WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated exception", false)));

Status status = writer_mock.write_v2();
Status status = writer_mock.write();
ASSERT_FALSE(status.ok());
ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR);
}
Expand All @@ -523,7 +523,7 @@ class InvertedIndexFileWriterMockCreateOutputStreamV2 : public InvertedIndexFile

MOCK_METHOD((std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>>),
create_output_stream_v2, (), (override));
create_output_stream, (), (override));
};

class InvertedIndexFileWriterMockCreateOutputStreamV1 : public InvertedIndexFileWriter {
Expand Down Expand Up @@ -622,7 +622,7 @@ TEST_F(InvertedIndexFileWriterTest, WriteV2OutputTest) {
auto compound_file_output = std::unique_ptr<DorisFSDirectory::FSIndexOutputV2>(mock_output_v2);
compound_file_output->init(file_writer.get());

EXPECT_CALL(writer_mock, create_output_stream_v2())
EXPECT_CALL(writer_mock, create_output_stream())
.WillOnce(::testing::Invoke(
[&]() -> std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>> {
Expand Down Expand Up @@ -680,7 +680,7 @@ TEST_F(InvertedIndexFileWriterTest, WriteV2OutputCloseErrorTest) {
auto compound_file_output = std::unique_ptr<DorisFSDirectory::FSIndexOutputV2>(mock_output_v2);
compound_file_output->init(file_writer.get());

EXPECT_CALL(writer_mock, create_output_stream_v2())
EXPECT_CALL(writer_mock, create_output_stream())
.WillOnce(::testing::Invoke(
[&]() -> std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class InvertedIndexUtil {

public static String INVERTED_INDEX_PARSER_STOPWORDS_KEY = "stopwords";

public static String INVERTED_INDEX_DICT_COMPRESSION_KEY = "dict_compression";

public static String getInvertedIndexParser(Map<String, String> properties) {
String parser = properties == null ? null : properties.get(INVERTED_INDEX_PARSER_KEY);
// default is "none" if not set
Expand Down Expand Up @@ -157,7 +159,8 @@ public static void checkInvertedIndexProperties(Map<String, String> properties)
INVERTED_INDEX_PARSER_CHAR_FILTER_REPLACEMENT,
INVERTED_INDEX_PARSER_IGNORE_ABOVE_KEY,
INVERTED_INDEX_PARSER_LOWERCASE_KEY,
INVERTED_INDEX_PARSER_STOPWORDS_KEY
INVERTED_INDEX_PARSER_STOPWORDS_KEY,
INVERTED_INDEX_DICT_COMPRESSION_KEY
));

for (String key : properties.keySet()) {
Expand All @@ -174,6 +177,7 @@ public static void checkInvertedIndexProperties(Map<String, String> properties)
String ignoreAbove = properties.get(INVERTED_INDEX_PARSER_IGNORE_ABOVE_KEY);
String lowerCase = properties.get(INVERTED_INDEX_PARSER_LOWERCASE_KEY);
String stopWords = properties.get(INVERTED_INDEX_PARSER_STOPWORDS_KEY);
String dictCompression = properties.get(INVERTED_INDEX_DICT_COMPRESSION_KEY);

if (parser != null && !parser.matches("none|english|unicode|chinese|standard")) {
throw new AnalysisException("Invalid inverted index 'parser' value: " + parser
Expand Down Expand Up @@ -221,5 +225,11 @@ public static void checkInvertedIndexProperties(Map<String, String> properties)
throw new AnalysisException("Invalid inverted index 'stopWords' value: " + stopWords
+ ", stopWords must be none");
}

if (dictCompression != null && !dictCompression.matches("true|false")) {
throw new AnalysisException(
"Invalid inverted index 'dict_compression' value: "
+ dictCompression + ", dict_compression must be true or false");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,12 @@ public OlapFile.TabletMetaCloudPB.Builder createTabletMetaBuilder(long tableId,
if (invertedIndexFileStorageFormat != null) {
if (invertedIndexFileStorageFormat == TInvertedIndexFileStorageFormat.V1) {
schemaBuilder.setInvertedIndexStorageFormat(OlapFile.InvertedIndexStorageFormatPB.V1);
} else {
} else if (invertedIndexFileStorageFormat == TInvertedIndexFileStorageFormat.V2) {
schemaBuilder.setInvertedIndexStorageFormat(OlapFile.InvertedIndexStorageFormatPB.V2);
} else if (invertedIndexFileStorageFormat == TInvertedIndexFileStorageFormat.V3) {
schemaBuilder.setInvertedIndexStorageFormat(OlapFile.InvertedIndexStorageFormatPB.V3);
} else {
throw new DdlException("invalid inverted index storage format");
}
}
schemaBuilder.setRowStorePageSize(pageSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,8 @@ public static TInvertedIndexFileStorageFormat analyzeInvertedIndexFileStorageFor
return TInvertedIndexFileStorageFormat.V1;
} else if (invertedIndexFileStorageFormat.equalsIgnoreCase("v2")) {
return TInvertedIndexFileStorageFormat.V2;
} else if (invertedIndexFileStorageFormat.equalsIgnoreCase("v3")) {
return TInvertedIndexFileStorageFormat.V3;
} else if (invertedIndexFileStorageFormat.equalsIgnoreCase("default")) {
if (Config.inverted_index_storage_format.equalsIgnoreCase("V1")) {
return TInvertedIndexFileStorageFormat.V1;
Expand Down
Loading

0 comments on commit b2545f8

Please sign in to comment.