Skip to content

Commit

Permalink
Merge branch 'master' into enhancement_nereids_kill-query-support-union
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao-MR authored Nov 22, 2024
2 parents 4e5b731 + 127e597 commit d5c08b0
Show file tree
Hide file tree
Showing 92 changed files with 3,209 additions and 1,060 deletions.
34 changes: 23 additions & 11 deletions be/src/io/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,70 @@ namespace io {

Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(create_file_impl(path, writer, opts));
}

Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(open_file_impl(path, reader, opts));
}

Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
}

Status FileSystem::delete_file(const Path& file) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(delete_file_impl(path));
}

Status FileSystem::delete_directory(const Path& dir) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(delete_directory_impl(path));
}

Status FileSystem::batch_delete(const std::vector<Path>& files) {
std::vector<Path> abs_files;
for (auto& file : files) {
abs_files.push_back(absolute_path(file));
Path abs_file;
RETURN_IF_ERROR(absolute_path(file, abs_file));
abs_files.push_back(abs_file);
}
FILESYSTEM_M(batch_delete_impl(abs_files));
}

Status FileSystem::exists(const Path& path, bool* res) const {
auto fs_path = absolute_path(path);
Path fs_path;
RETURN_IF_ERROR(absolute_path(path, fs_path));
FILESYSTEM_M(exists_impl(fs_path, res));
}

Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(file_size_impl(path, file_size));
}

Status FileSystem::list(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(list_impl(path, only_file, files, exists));
}

Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
auto orig_path = absolute_path(orig_name);
auto new_path = absolute_path(new_name);
Path orig_path;
RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
Path new_path;
RETURN_IF_ERROR(absolute_path(new_name, new_path));
FILESYSTEM_M(rename_impl(orig_path, new_path));
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class FileSystem {

// FIMXE(plat1ko): The implementation and semantics of this function are not completely
// consistent, which is confused.
virtual Path absolute_path(const Path& path) const = 0;
virtual Status absolute_path(const Path& path, Path& abs_path) const = 0;

FileSystem(std::string id, FileSystemType type) : _id(std::move(id)), _type(type) {}

Expand Down
50 changes: 50 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,54 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
return Status::OK();
}

Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& abs_path) {
// valid path include:
// 1. abc/def will return abc/def
// 2. /abc/def will return /abc/def
// 3. file:/abc/def will return /abc/def
// 4. file://<authority>/abc/def will return /abc/def
std::string path_str = input_path_str;
size_t slash = path_str.find('/');
if (slash == 0) {
abs_path = input_path_str;
return Status::OK();
}

// Initialize scheme and authority
std::string scheme;
size_t start = 0;

// Parse URI scheme
size_t colon = path_str.find(':');
if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) {
// Has a scheme
scheme = path_str.substr(0, colon);
if (scheme != "file") {
return Status::InternalError(
"Only supports `file` type scheme, like 'file:///path', 'file:/path'.");
}
start = colon + 1;
}

// Parse URI authority, if any
if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) {
// Has authority
// such as : path_str = "file://authority/abc/def"
// and now : start = 5
size_t next_slash = path_str.find('/', start + 2);
// now : next_slash = 16
if (next_slash == std::string::npos) {
return Status::InternalError(
"This input string only has authority, but has no path information");
}
// We will skit authority
// now : start = 16
start = next_slash;
}

// URI path is the rest of the string
abs_path = path_str.substr(start);
return Status::OK();
}

} // namespace doris::io
6 changes: 5 additions & 1 deletion be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class LocalFileSystem final : public FileSystem {
public:
~LocalFileSystem() override;

static Status convert_to_abs_path(const Path& path, Path& abs_path);

/// hard link dest file to src file
Status link_file(const Path& src, const Path& dest);

Expand Down Expand Up @@ -104,7 +106,9 @@ class LocalFileSystem final : public FileSystem {

// `LocalFileSystem` always use absolute path as arguments
// FIXME(plat1ko): Eliminate this method
Path absolute_path(const Path& path) const override { return path; }
Status absolute_path(const Path& path, Path& abs_path) const override {
return convert_to_abs_path(path, abs_path);
}

friend const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
};
Expand Down
10 changes: 7 additions & 3 deletions be/src/io/fs/remote_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@
namespace doris::io {

Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) {
auto dest_path = absolute_path(dest_file);
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
FILESYSTEM_M(upload_impl(local_file, dest_path));
}

Status RemoteFileSystem::batch_upload(const std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
std::vector<Path> remote_paths;
for (auto& path : remote_files) {
remote_paths.push_back(absolute_path(path));
Path abs_path;
RETURN_IF_ERROR(absolute_path(path, abs_path));
remote_paths.push_back(abs_path);
}
FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
}

Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
auto remote_path = absolute_path(remote_file);
Path remote_path;
RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
FILESYSTEM_M(download_impl(remote_path, local));
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/io/fs/remote_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class RemoteFileSystem : public FileSystem {
virtual Status open_file_internal(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions& opts) = 0;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.is_absolute()) {
return path;
abs_path = path;
} else {
abs_path = _root_path / path;
}
return _root_path / path;
return Status::OK();
}

Path _root_path;
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ class S3FileSystem final : public RemoteFileSystem {
const std::vector<Path>& remote_files) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.string().find("://") != std::string::npos) {
// the path is with schema, which means this is a full path like:
// s3://bucket/path/to/file.txt
// so no need to concat with prefix
return path;
abs_path = path;
} else {
// path with no schema
return _root_path / path;
abs_path = _root_path / path;
}
return Status::OK();
}

private:
Expand Down
20 changes: 11 additions & 9 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "cloud/config.h"
#include "common/status.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/segment_loader.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"

Expand Down Expand Up @@ -63,21 +64,18 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
auto rowset = reader->rowset();
const auto rowset_id = rowset->rowset_id();

DCHECK(_segment_cache_handles.contains(rowset_id));
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
const auto& segments_rows = _all_segments_rows[rowset_id];

if (rowset->num_rows() == 0) {
continue;
}

const auto& segments = segment_cache_handle.get_segments();
int segment_start = 0;
auto split = RowSetSplits(reader->clone());

for (size_t i = 0; i != segments.size(); ++i) {
const auto& segment = segments[i];
for (size_t i = 0; i != segments_rows.size(); ++i) {
const size_t rows_of_segment = segments_rows[i];
RowRanges row_ranges;
const size_t rows_of_segment = segment->num_rows();
int64_t offset_in_segment = 0;

// try to split large segments into RowRanges
Expand Down Expand Up @@ -125,15 +123,15 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
// The non-empty `row_ranges` means there are some rows left in this segment not added into `split`.
if (!row_ranges.is_empty()) {
DCHECK_GT(rows_collected, 0);
DCHECK_EQ(row_ranges.to(), segment->num_rows());
DCHECK_EQ(row_ranges.to(), rows_of_segment);
split.segment_row_ranges.emplace_back(std::move(row_ranges));
}
}

DCHECK_LE(rows_collected, _rows_per_scanner);
if (rows_collected > 0) {
split.segment_offsets.first = segment_start;
split.segment_offsets.second = segments.size();
split.segment_offsets.second = segments_rows.size();
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
DCHECK_EQ(split.segment_row_ranges.size(),
split.segment_offsets.second - split.segment_offsets.first);
Expand Down Expand Up @@ -181,11 +179,15 @@ Status ParallelScannerBuilder::_load() {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
SegmentCacheHandle segment_cache_handle;

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false));

for (const auto& segment : segment_cache_handle.get_segments()) {
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
}
_total_rows += rowset->num_rows();
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/parallel_scanner_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ParallelScannerBuilder {

size_t _rows_per_scanner {_min_rows_per_scanner};

std::map<RowsetId, SegmentCacheHandle> _segment_cache_handles;
std::map<RowsetId, std::vector<size_t>> _all_segments_rows;

std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeState* _state;
Expand Down
55 changes: 28 additions & 27 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,24 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
if (_closed) {
return Status::OK();
}
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table) {
// The build side hash key column maybe no need output, but we need to keep the column in block
// because it is used to compare with probe side hash key column
if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
p._should_keep_column_flags[_build_col_ids[0]] = true;
}
if (!_should_build_hash_table) {
return;
}
// The build side hash key column maybe no need output, but we need to keep the column in block
// because it is used to compare with probe side hash key column
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
p._should_keep_column_flags[_build_col_ids[0]] = true;
}

if (_shared_state->build_block) {
// release the memory of unused column in probe stage
_shared_state->build_block->clear_column_mem_not_keep(
p._should_keep_column_flags, bool(p._shared_hashtable_controller));
}
if (_shared_state->build_block) {
// release the memory of unused column in probe stage
_shared_state->build_block->clear_column_mem_not_keep(
p._should_keep_column_flags, bool(p._shared_hashtable_controller));
}

if (_should_build_hash_table && p._shared_hashtable_controller) {
if (p._shared_hashtable_controller) {
p._shared_hashtable_controller->signal_finish(p.node_id());
}
}};
Expand All @@ -137,22 +138,22 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
return Base::close(state, exec_status);
}

if (state->get_task()->wake_up_by_downstream()) {
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
if (_should_build_hash_table) {
if (state->get_task()->wake_up_by_downstream()) {
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
}
SCOPED_TIMER(_publish_runtime_filter_timer);
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class ExecEnv {
static void set_tracking_memory(bool tracking_memory) {
_s_tracking_memory.store(tracking_memory, std::memory_order_release);
}
void set_orc_memory_pool(orc::MemoryPool* pool) { _orc_memory_pool = pool; }
#endif
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }

Expand Down
Loading

0 comments on commit d5c08b0

Please sign in to comment.