Skip to content

Commit

Permalink
upp
Browse files Browse the repository at this point in the history
Signed-off-by: Seaven <[email protected]>
  • Loading branch information
Seaven committed Jul 18, 2024
1 parent efba393 commit 4e96819
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 71 deletions.
65 changes: 46 additions & 19 deletions be/src/column/column_access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "column/column_access_path.h"

#include <cstddef>
#include <utility>
#include <vector>

#include "column/column.h"
Expand All @@ -23,8 +24,10 @@
#include "column/vectorized_fwd.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "common/statusor.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "types/logical_type.h"
Expand Down Expand Up @@ -73,15 +76,6 @@ Status ColumnAccessPath::init(const std::string& parent_path, const TColumnAcces
return Status::OK();
}

Status ColumnAccessPath::init(TAccessPathType::type type, const std::string& path, uint32_t index) {
_type = type;
_path = path;
_column_index = index;
_absolute_path = path;
_value_type = TypeDescriptor(LogicalType::TYPE_JSON);
return Status::OK();
}

ColumnAccessPath* ColumnAccessPath::get_child(const std::string& path) {
for (const auto& child : _children) {
if (child->_path == path) {
Expand Down Expand Up @@ -201,18 +195,51 @@ StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TColu
StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TAccessPathType::type& type,
const std::string& path, uint32_t index) {
auto p = std::make_unique<ColumnAccessPath>();
RETURN_IF_ERROR(p->init(type, path, index));
return p;
}

StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(LogicalType type, const std::string& path) {
auto p = std::make_unique<ColumnAccessPath>();
p->_type = TAccessPathType::type::FIELD;
p->_type = type;
p->_path = path;
p->_column_index = 0;
p->_column_index = index;
p->_absolute_path = path;
p->_value_type = TypeDescriptor(type);
return p;
p->_value_type = TypeDescriptor(LogicalType::TYPE_JSON);
p->_children.clear();
return std::move(p);
}

ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) {
if (path.empty()) {
return root;
}

size_t pos = 0;
if (path.starts_with("\"")) {
pos = path.find('\"', 1);
DCHECK(pos != std::string::npos);
}
pos = path.find('.', pos);
std::string key;
std::string next;
if (pos == std::string::npos) {
key = path;
} else {
key = path.substr(0, pos);
next = path.substr(pos + 1);
}

auto child = root->get_child(key);
if (child == nullptr) {
auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0);
DCHECK(n.ok());
root->children().emplace_back(std::move(n.value()));
child = root->children().back().get();
}
return insert_json_path_impl(next, child);
}

void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path) {
auto leaf = insert_json_path_impl(path, root);
leaf->_type = TAccessPathType::type::FIELD;
leaf->_column_index = 0;
leaf->_absolute_path = path;
leaf->_value_type = TypeDescriptor(type);
}

} // namespace starrocks
12 changes: 5 additions & 7 deletions be/src/column/column_access_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <string>
#include <vector>

#include "column/column.h"
#include "common/status.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/types.h"
Expand All @@ -42,17 +43,14 @@ class ColumnAccessPath {
static StatusOr<std::unique_ptr<ColumnAccessPath>> create(const TColumnAccessPath& column_path, RuntimeState* state,
ObjectPool* pool);

// for test
static StatusOr<std::unique_ptr<ColumnAccessPath>> create(const TAccessPathType::type& type,
const std::string& path, uint32_t index);

static StatusOr<std::unique_ptr<ColumnAccessPath>> create(LogicalType type, const std::string& path);

Status init(const std::string& parent_path, const TColumnAccessPath& column_path, RuntimeState* state,
ObjectPool* pool);

// for test
Status init(TAccessPathType::type type, const std::string& path, uint32_t index);
static StatusOr<std::unique_ptr<ColumnAccessPath>> create(const TAccessPathType::type& type,
const std::string& path, uint32_t index);
static void insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path);
// end test

const std::string& path() const { return _path; }

Expand Down
9 changes: 6 additions & 3 deletions be/src/storage/rowset/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,16 +728,19 @@ StatusOr<std::unique_ptr<ColumnIterator>> ColumnReader::new_iterator(ColumnAcces
std::string name = rd->name();
// target: b.b2.b3
// source: b.b2
if (target == name) {
if (target == name || target.starts_with(name + ".")) {
ASSIGN_OR_RETURN(auto iter, rd->new_iterator());
source_paths.emplace_back(name);
source_types.emplace_back(rd->column_type());
all_iters.emplace_back(std::move(iter));
break;
} else if (name.starts_with(target + ".") &&
(target_types[k] == TYPE_JSON || is_string_type(target_types[i]))) {
} else if (name.starts_with(target + ".")) {
// target: b.b2
// source: b.b2.b3
if (target_types[k] != TYPE_JSON && !is_string_type(target_types[k])) {
// don't need column and remain
break;
}
need_remain = true;
ASSIGN_OR_RETURN(auto iter, rd->new_iterator());
source_paths.emplace_back(name);
Expand Down
16 changes: 9 additions & 7 deletions be/src/util/json_flattener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ static const std::unordered_map<uint8_t, JsonFlatExtractFunc> JSON_EXTRACT_FUNC
{LogicalType::TYPE_LARGEINT, &extract_number<LogicalType::TYPE_LARGEINT>},
{LogicalType::TYPE_DOUBLE, &extract_number<LogicalType::TYPE_DOUBLE>},
{LogicalType::TYPE_VARCHAR, &extract_string},
{LogicalType::TYPE_CHAR, &extract_string},
{LogicalType::TYPE_JSON, &extract_json},
};

Expand Down Expand Up @@ -905,11 +906,13 @@ void HyperJsonTransformer::init_read_task(const std::vector<std::string>& paths,
}
} else if (!merges.empty()) {
check_dst.emplace(i);
if (_dst_types[i] != TYPE_JSON && !is_string_type(_dst_types[i])) {
continue;
}
auto& mk = _merge_tasks.emplace_back();
mk.is_merge = true;
mk.src_index = merges;
mk.dst_index = i;
DCHECK(_dst_types[i] == TYPE_JSON || is_string_type(_dst_types[i]));
if (_dst_types[i] != TYPE_JSON) {
// must be to string, merge result must be string
mk.need_cast = true;
Expand Down Expand Up @@ -1103,11 +1106,11 @@ Status HyperJsonTransformer::trans(std::vector<ColumnPtr>& columns) {

size_t rows = columns[0]->size();
for (size_t i = 0; i < _dst_columns.size() - 1; i++) {
// if (_dst_columns[i]->size() == 0) {
// _dst_columns[i]->resize(rows);
// } else {
DCHECK_EQ(rows, _dst_columns[i]->size());
// }
if (_dst_columns[i]->size() == 0) {
_dst_columns[i]->resize(rows);
} else {
DCHECK_EQ(rows, _dst_columns[i]->size());
}
}
return Status::OK();
}
Expand All @@ -1123,7 +1126,6 @@ Status HyperJsonTransformer::_equals(const MergeTask& task, std::vector<ColumnPt
}

Status HyperJsonTransformer::_cast(const MergeTask& task, ColumnPtr& col) {
DCHECK(task.src_index.size() == 1);
DCHECK(task.need_cast);
Chunk chunk;
chunk.append_column(col, task.dst_index);
Expand Down
Loading

0 comments on commit 4e96819

Please sign in to comment.