Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3722 add simdjson parser into RD #10204

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -91,28 +91,32 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, T
}
}

void OnObject(std::pair<ui64, TList<TString>> value) override {
void OnObject(std::pair<ui64, const TVector<TVector<std::string_view>>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;

with_lock (Worker->GetScopedAlloc()) {
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
holderFactory,
static_cast<ui32>(value.second.size() + 1),
items);

items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(value.first);
// TODO: use blocks here
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
holderFactory,
static_cast<ui32>(values.second.size() + 1),
items);

Y_ENSURE(FieldsPositions.size() == value.second.size());
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first++);

size_t i = 0;
for (const auto& v : value.second) {
NYql::NUdf::TStringValue str(v);
items[FieldsPositions[i++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
size_t fieldId = 0;
for (const auto& column : values.second) {
NYql::NUdf::TStringValue str(column[rowId]);
items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
}

Worker->Push(std::move(result));
}
Worker->Push(std::move(result));
}
}

Expand Down Expand Up @@ -196,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -238,8 +242,9 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(ui64 offset, const TList<TString>& value) {
InputConsumer->OnObject(std::make_pair(offset, value));
void Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offset, values));
}

TString GetSql() const {
Expand All @@ -266,7 +271,7 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>> InputConsumer;
const TString Sql;
};

Expand All @@ -280,9 +285,9 @@ TJsonFilter::TJsonFilter(

TJsonFilter::~TJsonFilter() {
}
void TJsonFilter::Push(ui64 offset, const TList<TString>& value) {
Impl->Push(offset, value);

void TJsonFilter::Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
Impl->Push(offset, values);
}

TString TJsonFilter::GetSql() {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@

#pragma once

namespace NFq {

#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/public/udf/udf_value.h>

namespace NFq {

class TJsonFilter {
public:
using TCallback = std::function<void(ui64, const TString&)>;

public:
TJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback);

~TJsonFilter();
void Push(ui64 offset, const TList<TString>& value);

void Push(ui64 offset, const TVector<TVector<std::string_view>>& values);
TString GetSql();

private:
Expand Down
Loading
Loading