Skip to content

Commit

Permalink
[opt](inverted index) Optimize the codes exception handling process #…
Browse files Browse the repository at this point in the history
…44205 #44601 (#44862)

Co-authored-by: Sun Chenyang <[email protected]>
  • Loading branch information
zzzxl1993 and csun5285 authored Dec 9, 2024
1 parent 860f7d0 commit 46bd816
Show file tree
Hide file tree
Showing 6 changed files with 595 additions and 79 deletions.
103 changes: 103 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <CLucene.h> // IWYU pragma: keep

#include <memory>

#include "common/logging.h"

namespace lucene::store {
class Directory;
} // namespace lucene::store

namespace doris::segment_v2 {

struct DirectoryDeleter {
void operator()(lucene::store::Directory* ptr) const { _CLDECDELETE(ptr); }
};

struct ErrorContext {
std::string err_msg;
std::exception_ptr eptr;
};

template <typename T>
concept HasClose = requires(T t) {
{ t->close() };
};

template <typename PtrType>
requires HasClose<PtrType>
void finally_close(PtrType& resource, ErrorContext& error_context) {
if (resource) {
try {
resource->close();
} catch (CLuceneError& err) {
error_context.eptr = std::current_exception();
error_context.err_msg.append("Error occurred while closing resource: ");
error_context.err_msg.append(err.what());
LOG(ERROR) << error_context.err_msg;
} catch (...) {
error_context.eptr = std::current_exception();
error_context.err_msg.append("Error occurred while closing resource");
LOG(ERROR) << error_context.err_msg;
}
}
}

#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-macros"
#endif

#define FINALLY_CLOSE(resource) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_close(resource, error_context); \
}

// Return ERROR after finally
#define FINALLY(finally_block) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_block; \
if (error_context.eptr) { \
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(error_context.err_msg); \
} \
}

// Re-throw the exception after finally
#define FINALLY_EXCEPTION(finally_block) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_block; \
if (error_context.eptr) { \
std::rethrow_exception(error_context.eptr); \
} \
}

#if defined(__clang__)
#pragma clang diagnostic pop
#endif

} // namespace doris::segment_v2
86 changes: 38 additions & 48 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,9 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire

Status InvertedIndexFileWriter::write_v1() {
int64_t total_size = 0;
std::string err_msg;
lucene::store::Directory* out_dir = nullptr;
std::exception_ptr eptr;
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir = nullptr;
std::unique_ptr<lucene::store::IndexOutput> output = nullptr;
ErrorContext error_context;
for (const auto& entry : _indices_dirs) {
const int64_t index_id = entry.first.first;
const auto& index_suffix = entry.first.second;
Expand All @@ -262,7 +261,7 @@ Status InvertedIndexFileWriter::write_v1() {

// Create output stream
auto result = create_output_stream_v1(index_id, index_suffix);
out_dir = result.first;
out_dir = std::move(result.first);
output = std::move(result.second);

size_t start = output->getFilePointer();
Expand All @@ -275,34 +274,29 @@ Status InvertedIndexFileWriter::write_v1() {
total_size += compound_file_size;
add_index_info(index_id, index_suffix, compound_file_size);
} catch (CLuceneError& err) {
eptr = std::current_exception();
error_context.eptr = std::current_exception();
auto index_path = InvertedIndexDescriptor::get_index_file_path_v1(
_index_path_prefix, index_id, index_suffix);
err_msg = "CLuceneError occur when write_v1 idx file " + index_path +
" error msg: " + err.what();
}

// Close and clean up
finalize_output_dir(out_dir);
if (output) {
output->close();
}

if (eptr) {
LOG(ERROR) << err_msg;
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(err_msg);
error_context.err_msg.append("CLuceneError occur when write_v1 idx file: ");
error_context.err_msg.append(index_path);
error_context.err_msg.append(", error msg: ");
error_context.err_msg.append(err.what());
LOG(ERROR) << error_context.err_msg;
}
FINALLY({
FINALLY_CLOSE(output);
FINALLY_CLOSE(out_dir);
})
}

_total_file_size = total_size;
return Status::OK();
}

Status InvertedIndexFileWriter::write_v2() {
std::string err_msg;
lucene::store::Directory* out_dir = nullptr;
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir = nullptr;
std::unique_ptr<lucene::store::IndexOutput> compound_file_output = nullptr;
std::exception_ptr eptr;
ErrorContext error_context;
try {
// Calculate header length and initialize offset
int64_t current_offset = headerLength();
Expand All @@ -311,7 +305,7 @@ Status InvertedIndexFileWriter::write_v2() {

// Create output stream
auto result = create_output_stream_v2();
out_dir = result.first;
out_dir = std::move(result.first);
compound_file_output = std::move(result.second);

// Write version and number of indices
Expand All @@ -326,22 +320,18 @@ Status InvertedIndexFileWriter::write_v2() {
_total_file_size = compound_file_output->getFilePointer();
_file_info.set_index_size(_total_file_size);
} catch (CLuceneError& err) {
eptr = std::current_exception();
error_context.eptr = std::current_exception();
auto index_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
err_msg = "CLuceneError occur when close idx file " + index_path +
" error msg: " + err.what();
}

// Close and clean up
finalize_output_dir(out_dir);
if (compound_file_output) {
compound_file_output->close();
}

if (eptr) {
LOG(ERROR) << err_msg;
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(err_msg);
error_context.err_msg.append("CLuceneError occur when close idx file: ");
error_context.err_msg.append(index_path);
error_context.err_msg.append(", error msg: ");
error_context.err_msg.append(err.what());
LOG(ERROR) << error_context.err_msg;
}
FINALLY({
FINALLY_CLOSE(compound_file_output);
FINALLY_CLOSE(out_dir);
})

return Status::OK();
}
Expand Down Expand Up @@ -369,13 +359,6 @@ std::vector<FileInfo> InvertedIndexFileWriter::prepare_sorted_files(
return sorted_files;
}

void InvertedIndexFileWriter::finalize_output_dir(lucene::store::Directory* out_dir) {
if (out_dir != nullptr) {
out_dir->close();
_CLDECDELETE(out_dir)
}
}

void InvertedIndexFileWriter::add_index_info(int64_t index_id, const std::string& index_suffix,
int64_t compound_file_size) {
InvertedIndexFileInfo_IndexInfo index_info;
Expand Down Expand Up @@ -424,7 +407,8 @@ std::pair<int64_t, int32_t> InvertedIndexFileWriter::calculate_header_length(
return {header_length, header_file_count};
}

std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>>
InvertedIndexFileWriter::create_output_stream_v1(int64_t index_id,
const std::string& index_suffix) {
io::Path cfs_path(InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix, index_id,
Expand All @@ -434,6 +418,7 @@ InvertedIndexFileWriter::create_output_stream_v1(int64_t index_id,

auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, idx_path.c_str());
out_dir->set_file_writer_opts(_opts);
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir_ptr(out_dir);

auto* out = out_dir->createOutput(idx_name.c_str());
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
Expand All @@ -443,9 +428,9 @@ InvertedIndexFileWriter::create_output_stream_v1(int64_t index_id,
"output is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
}

std::unique_ptr<lucene::store::IndexOutput> output(out);
return {out_dir, std::move(output)};

return {std::move(out_dir_ptr), std::move(output)};
}

void InvertedIndexFileWriter::write_header_and_data_v1(lucene::store::IndexOutput* output,
Expand Down Expand Up @@ -483,15 +468,20 @@ void InvertedIndexFileWriter::write_header_and_data_v1(lucene::store::IndexOutpu
}
}

std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>>
InvertedIndexFileWriter::create_output_stream_v2() {
io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};

auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str());
out_dir->set_file_writer_opts(_opts);
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir_ptr(out_dir);

DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is nullptr";
auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
out_dir->createOutputV2(_idx_v2_writer.get()));
return std::make_pair(out_dir, std::move(compound_file_output));

return {std::move(out_dir_ptr), std::move(compound_file_output)};
}

void InvertedIndexFileWriter::write_version_and_indices_count(lucene::store::IndexOutput* output) {
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index_common.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "runtime/exec_env.h"

Expand Down Expand Up @@ -105,21 +106,22 @@ class InvertedIndexFileWriter {
void sort_files(std::vector<FileInfo>& file_infos);
void copyFile(const char* fileName, lucene::store::Directory* dir,
lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength);
void finalize_output_dir(lucene::store::Directory* out_dir);
void add_index_info(int64_t index_id, const std::string& index_suffix,
int64_t compound_file_size);
int64_t headerLength();
// Helper functions specific to write_v1
std::pair<int64_t, int32_t> calculate_header_length(const std::vector<FileInfo>& sorted_files,
lucene::store::Directory* directory);
std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
virtual std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>>
create_output_stream_v1(int64_t index_id, const std::string& index_suffix);
virtual void write_header_and_data_v1(lucene::store::IndexOutput* output,
const std::vector<FileInfo>& sorted_files,
lucene::store::Directory* directory,
int64_t header_length, int32_t header_file_count);
// Helper functions specific to write_v2
std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
virtual std::pair<std::unique_ptr<lucene::store::Directory, DirectoryDeleter>,
std::unique_ptr<lucene::store::IndexOutput>>
create_output_stream_v2();
void write_version_and_indices_count(lucene::store::IndexOutput* output);
struct FileMetadata {
Expand Down
Loading

0 comments on commit 46bd816

Please sign in to comment.