Skip to content

Commit

Permalink
upxxx
Browse files Browse the repository at this point in the history
Signed-off-by: Seaven <[email protected]>
  • Loading branch information
Seaven committed Jul 18, 2024
1 parent fb86d7c commit cf89927
Show file tree
Hide file tree
Showing 38 changed files with 3,584 additions and 883 deletions.
73 changes: 60 additions & 13 deletions be/src/column/column_access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "column/column_access_path.h"

#include <cstddef>
#include <utility>
#include <vector>

#include "column/column.h"
Expand All @@ -23,8 +24,10 @@
#include "column/vectorized_fwd.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "common/statusor.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "types/logical_type.h"
Expand Down Expand Up @@ -66,22 +69,13 @@ Status ColumnAccessPath::init(const std::string& parent_path, const TColumnAcces

for (const auto& child : column_path.children) {
ColumnAccessPathPtr child_path = std::make_unique<ColumnAccessPath>();
RETURN_IF_ERROR(child_path->init(_absolute_path + "/", child, state, pool));
RETURN_IF_ERROR(child_path->init(_absolute_path + ".", child, state, pool));
_children.emplace_back(std::move(child_path));
}

return Status::OK();
}

Status ColumnAccessPath::init(TAccessPathType::type type, const std::string& path, uint32_t index) {
_type = type;
_path = path;
_column_index = index;
_absolute_path = path;
_value_type = TypeDescriptor(LogicalType::TYPE_JSON);
return Status::OK();
}

ColumnAccessPath* ColumnAccessPath::get_child(const std::string& path) {
for (const auto& child : _children) {
if (child->_path == path) {
Expand Down Expand Up @@ -175,6 +169,16 @@ size_t ColumnAccessPath::leaf_size() const {
return size;
}

void ColumnAccessPath::get_all_leafs(std::vector<ColumnAccessPath*>* result) {
if (_children.empty()) {
result->emplace_back(this);
return;
}
for (const auto& child : _children) {
child->get_all_leafs(result);
}
}

const std::string ColumnAccessPath::to_string() const {
std::stringstream ss;
ss << _path << "(" << _type << ")";
Expand All @@ -184,15 +188,58 @@ const std::string ColumnAccessPath::to_string() const {
StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TColumnAccessPath& column_path,
RuntimeState* state, ObjectPool* pool) {
auto path = std::make_unique<ColumnAccessPath>();
RETURN_IF_ERROR(path->init("/", column_path, state, pool));
RETURN_IF_ERROR(path->init("", column_path, state, pool));
return path;
}

StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TAccessPathType::type& type,
const std::string& path, uint32_t index) {
auto p = std::make_unique<ColumnAccessPath>();
RETURN_IF_ERROR(p->init(type, path, index));
return p;
p->_type = type;
p->_path = path;
p->_column_index = index;
p->_absolute_path = path;
p->_value_type = TypeDescriptor(LogicalType::TYPE_JSON);
p->_children.clear();
return std::move(p);
}

ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) {
if (path.empty()) {
return root;
}

size_t pos = 0;
if (path.starts_with("\"")) {
pos = path.find('\"', 1);
DCHECK(pos != std::string::npos);
}
pos = path.find('.', pos);
std::string key;
std::string next;
if (pos == std::string::npos) {
key = path;
} else {
key = path.substr(0, pos);
next = path.substr(pos + 1);
}

auto child = root->get_child(key);
if (child == nullptr) {
auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0);
DCHECK(n.ok());
root->children().emplace_back(std::move(n.value()));
child = root->children().back().get();
}
return insert_json_path_impl(next, child);
}

void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path) {
auto leaf = insert_json_path_impl(path, root);
leaf->_type = TAccessPathType::type::FIELD;
leaf->_column_index = 0;
leaf->_absolute_path = path;
leaf->_value_type = TypeDescriptor(type);
}

} // namespace starrocks
13 changes: 8 additions & 5 deletions be/src/column/column_access_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
#include <string>
#include <vector>

#include "column/column.h"
#include "common/status.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/types.h"
#include "types/logical_type.h"

namespace starrocks {

Expand All @@ -41,15 +43,14 @@ class ColumnAccessPath {
static StatusOr<std::unique_ptr<ColumnAccessPath>> create(const TColumnAccessPath& column_path, RuntimeState* state,
ObjectPool* pool);

// for test
static StatusOr<std::unique_ptr<ColumnAccessPath>> create(const TAccessPathType::type& type,
const std::string& path, uint32_t index);

Status init(const std::string& parent_path, const TColumnAccessPath& column_path, RuntimeState* state,
ObjectPool* pool);

// for test
Status init(TAccessPathType::type type, const std::string& path, uint32_t index);
static StatusOr<std::unique_ptr<ColumnAccessPath>> create(const TAccessPathType::type& type,
const std::string& path, uint32_t index);
static void insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path);
// end test

const std::string& path() const { return _path; }

Expand Down Expand Up @@ -86,6 +87,8 @@ class ColumnAccessPath {

size_t leaf_size() const;

void get_all_leafs(std::vector<ColumnAccessPath*>* result);

private:
// path type, to mark the path is KEY/OFFSET/FIELD/ALL/INDEX
TAccessPathType::type _type;
Expand Down
69 changes: 27 additions & 42 deletions be/src/column/json_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ std::string JsonColumn::debug_item(size_t idx) const {
std::ostringstream ss;
ss << "{";
size_t i = 0;
for (; i < _flat_column_paths.size() - i; i++) {
// flat json debug is different with normal, lose quota
for (; i < _flat_column_paths.size() - 1; i++) {
ss << _flat_column_paths[i] << ": ";
ss << get_flat_field(i)->debug_item(idx) << ", ";
}
Expand Down Expand Up @@ -163,51 +164,31 @@ const ColumnPtr& JsonColumn::get_flat_field(int index) const {
return _flat_columns[index];
}

ColumnPtr& JsonColumn::get_remain() {
DCHECK(_flat_columns.size() == _flat_column_paths.size() + 1);
return _flat_columns[_flat_columns.size() - 1];
}

const ColumnPtr& JsonColumn::get_remain() const {
DCHECK(_flat_columns.size() == _flat_column_paths.size() + 1);
return _flat_columns[_flat_columns.size() - 1];
}

LogicalType JsonColumn::get_flat_field_type(const std::string& path) const {
DCHECK(_path_to_index.count(path) > 0);
return _flat_column_types[_path_to_index.at(path)];
}

void JsonColumn::init_flat_columns(const std::vector<std::string>& paths) {
if (_flat_column_paths.empty()) {
_flat_column_paths.insert(_flat_column_paths.cbegin(), paths.cbegin(), paths.cend());
_flat_column_types.assign(paths.size(), LogicalType::TYPE_JSON);
for (size_t i = 0; i < _flat_column_paths.size(); i++) {
// nullable column
_flat_columns.emplace_back(NullableColumn::create(JsonColumn::create(), NullColumn::create()));
_path_to_index[_flat_column_paths[i]] = i;
}
} else {
DCHECK(_flat_column_paths.size() == paths.size());
DCHECK(_flat_columns.size() == paths.size());
DCHECK(_flat_column_types.size() == paths.size());
for (size_t i = 0; i < _flat_column_paths.size(); i++) {
DCHECK(_flat_column_paths[i] == paths[i]);
DCHECK(_flat_columns[i]->is_nullable());
DCHECK(_flat_column_types[i] == LogicalType::TYPE_JSON);
}
}
}

void JsonColumn::init_flat_columns(const std::vector<std::string>& paths, const std::vector<LogicalType>& types) {
if (_flat_column_paths.empty()) {
DCHECK_EQ(paths.size(), types.size());
_flat_column_paths.insert(_flat_column_paths.cbegin(), paths.cbegin(), paths.cend());
_flat_column_types.insert(_flat_column_types.cbegin(), types.cbegin(), types.cend());
for (size_t i = 0; i < _flat_column_paths.size(); i++) {
// nullable column
_flat_columns.emplace_back(ColumnHelper::create_column(TypeDescriptor(types[i]), true));
_path_to_index[_flat_column_paths[i]] = i;
}
} else {
DCHECK(_flat_column_paths.size() == paths.size());
DCHECK(_flat_columns.size() == paths.size());
DCHECK(_flat_column_types.size() == paths.size());
for (size_t i = 0; i < _flat_column_paths.size(); i++) {
DCHECK(_flat_column_paths[i] == paths[i]);
DCHECK(_flat_columns[i]->is_nullable());
DCHECK(_flat_column_types[i] == types[i]);
}
void JsonColumn::set_flat_columns(const std::vector<std::string>& paths, const std::vector<LogicalType>& types,
const Columns& flat_columns) {
DCHECK_EQ(paths.size(), types.size());
DCHECK_GE(paths.size(), flat_columns.size());
DCHECK_LE(paths.size(), flat_columns.size() + 1);
_flat_column_paths.insert(_flat_column_paths.cbegin(), paths.cbegin(), paths.cend());
_flat_column_types.insert(_flat_column_types.cbegin(), types.cbegin(), types.cend());
_flat_columns.insert(_flat_columns.cbegin(), flat_columns.cbegin(), flat_columns.cend());
for (size_t i = 0; i < _flat_column_paths.size(); i++) {
_path_to_index[_flat_column_paths[i]] = i;
}
}

Expand Down Expand Up @@ -307,7 +288,11 @@ void JsonColumn::append(const Column& src, size_t offset, size_t count) {
if (other_json->is_flat_json() && !is_flat_json()) {
// only hit in AggregateIterator (Aggregate mode in storage)
DCHECK_EQ(0, this->size());
init_flat_columns(other_json->_flat_column_paths, other_json->_flat_column_types);
std::vector<ColumnPtr> copy;
for (const auto& col : other_json->_flat_columns) {
copy.emplace_back(col->clone_empty());
}
set_flat_columns(other_json->flat_column_paths(), other_json->flat_column_types(), copy);
}

if (is_flat_json()) {
Expand Down
15 changes: 11 additions & 4 deletions be/src/column/json_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,34 @@ class JsonColumn final : public ColumnFactory<ObjectColumn<JsonValue>, JsonColum

Columns& get_flat_fields() { return _flat_columns; };

const Columns& get_flat_fields() const { return _flat_columns; };

ColumnPtr& get_flat_field(int index);

const ColumnPtr& get_flat_field(int index) const;

ColumnPtr& get_remain();

const ColumnPtr& get_remain() const;

const std::vector<std::string>& flat_column_paths() const { return _flat_column_paths; }

const std::vector<LogicalType>& flat_column_types() const { return _flat_column_types; }

bool has_flat_column(const std::string& path) const;

void init_flat_columns(const std::vector<std::string>& paths);
bool has_remain() const { return _flat_columns.size() == (_flat_column_paths.size() + 1); }

void init_flat_columns(const std::vector<std::string>& paths, const std::vector<LogicalType>& types);
void set_flat_columns(const std::vector<std::string>& paths, const std::vector<LogicalType>& types,
const Columns& flat_columns);

std::string debug_flat_paths() const;

private:
// flat-columns
// flat-columns[sub_columns, remain_column]
Columns _flat_columns;

// flat-column paths
// flat-column paths, doesn't contains remain column
std::vector<std::string> _flat_column_paths;
std::vector<LogicalType> _flat_column_types;
std::unordered_map<std::string, int> _path_to_index;
Expand Down
7 changes: 2 additions & 5 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1295,16 +1295,13 @@ CONF_mInt32(dictionary_cache_refresh_threadpool_size, "8");
CONF_mBool(enable_json_flat, "true");

// extract flat json column when row_num * null_factor < null_row_num
CONF_mDouble(json_flat_null_factor, "0.3");
CONF_mDouble(json_flat_null_factor, "0.4");

// extract flat json column when row_num * sparsity_factor < hit_row_num
CONF_mDouble(json_flat_sparsity_factor, "0.9");

// only flatten json when the number of sub-field in the JSON exceeds the limit
CONF_mInt32(json_flat_internal_column_min_limit, "5");

// the maximum number of extracted JSON sub-field
CONF_mInt32(json_flat_column_max, "20");
CONF_mInt32(json_flat_column_max, "100");

// Allowable intervals for continuous generation of pk dumps
// Disable when pk_dump_interval_seconds <= 0
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ StatusOr<ColumnPtr> JsonFunctions::_flat_json_query_impl(FunctionContext* contex
chunk.append_column(flat_column, 0);
return state->cast_expr->evaluate_checked(nullptr, &chunk);
}
return flat_column;
return flat_column->clone();
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ add_library(Storage STATIC
rowset/bitshuffle_page.cpp
rowset/bitshuffle_wrapper.cpp
rowset/column_iterator.cpp
rowset/json_column_compactor.cpp
rowset/json_column_iterator.cpp
rowset/json_column_writer.cpp
rowset/cast_column_iterator.cpp
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/compaction_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Status CompactionUtils::construct_output_rowset_writer(Tablet* tablet, uint32_t
context.writer_type =
(algorithm == VERTICAL_COMPACTION ? RowsetWriterType::kVertical : RowsetWriterType::kHorizontal);
context.gtid = gtid;
context.is_compaction = true;
Status st = RowsetFactory::create_rowset_writer(context, output_rowset_writer);
if (!st.ok()) {
std::stringstream ss;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Status DeltaWriterImpl::build_schema_and_writer() {
_txn_id, nullptr, false /** no compaction**/);
} else {
_tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(_tablet_manager, _tablet_id, _write_schema,
_txn_id);
_txn_id, false);
}
RETURN_IF_ERROR(_tablet_writer->open());
_mem_table_sink = std::make_unique<TabletWriterSink>(_tablet_writer.get());
Expand Down
11 changes: 7 additions & 4 deletions be/src/storage/lake/general_tablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace starrocks::lake {

HorizontalGeneralTabletWriter::HorizontalGeneralTabletWriter(TabletManager* tablet_mgr, int64_t tablet_id,
std::shared_ptr<const TabletSchema> schema, int64_t txn_id,
ThreadPool* flush_pool)
: TabletWriter(tablet_mgr, tablet_id, std::move(schema), txn_id, flush_pool) {}
bool is_compaction, ThreadPool* flush_pool)
: TabletWriter(tablet_mgr, tablet_id, std::move(schema), txn_id, is_compaction, flush_pool) {}

HorizontalGeneralTabletWriter::~HorizontalGeneralTabletWriter() = default;

Expand Down Expand Up @@ -82,6 +82,7 @@ Status HorizontalGeneralTabletWriter::reset_segment_writer() {
DCHECK(_schema != nullptr);
auto name = gen_segment_filename(_txn_id);
SegmentWriterOptions opts;
opts.is_compaction = _is_compaction;
WritableFileOptions wopts;
if (config::enable_transparent_data_encryption) {
ASSIGN_OR_RETURN(auto pair, KeyCache::instance().create_encryption_meta_pair_using_current_kek());
Expand Down Expand Up @@ -118,8 +119,9 @@ Status HorizontalGeneralTabletWriter::flush_segment_writer(SegmentPB* segment) {

VerticalGeneralTabletWriter::VerticalGeneralTabletWriter(TabletManager* tablet_mgr, int64_t tablet_id,
std::shared_ptr<const TabletSchema> schema, int64_t txn_id,
uint32_t max_rows_per_segment, ThreadPool* flush_pool)
: TabletWriter(tablet_mgr, tablet_id, std::move(schema), txn_id, flush_pool),
uint32_t max_rows_per_segment, bool is_compaction,
ThreadPool* flush_pool)
: TabletWriter(tablet_mgr, tablet_id, std::move(schema), txn_id, _is_compaction, flush_pool),
_max_rows_per_segment(max_rows_per_segment) {}

VerticalGeneralTabletWriter::~VerticalGeneralTabletWriter() {
Expand Down Expand Up @@ -267,6 +269,7 @@ StatusOr<std::shared_ptr<SegmentWriter>> VerticalGeneralTabletWriter::create_seg
DCHECK(_schema != nullptr);
auto name = gen_segment_filename(_txn_id);
SegmentWriterOptions opts;
opts.is_compaction = _is_compaction;
WritableFileOptions wopts;
if (config::enable_transparent_data_encryption) {
ASSIGN_OR_RETURN(auto pair, KeyCache::instance().create_encryption_meta_pair_using_current_kek());
Expand Down
Loading

0 comments on commit cf89927

Please sign in to comment.