From 8222d36ac27901c7686c8f8d93a22d9dffecfa8d Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 8 Oct 2024 10:05:37 +0000 Subject: [PATCH 1/4] Added simdjson parsing --- .../fq/libs/row_dispatcher/json_filter.cpp | 51 ++- ydb/core/fq/libs/row_dispatcher/json_filter.h | 13 +- .../fq/libs/row_dispatcher/json_parser.cpp | 422 +++++++----------- ydb/core/fq/libs/row_dispatcher/json_parser.h | 42 +- .../fq/libs/row_dispatcher/topic_session.cpp | 116 +++-- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 28 +- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 136 +++--- .../row_dispatcher/ut/topic_session_ut.cpp | 14 +- ydb/core/fq/libs/row_dispatcher/ut/ya.make | 1 - ydb/core/fq/libs/row_dispatcher/ya.make | 2 +- 10 files changed, 406 insertions(+), 419 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 8b7d46a690f2..d51527abc297 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -53,7 +53,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { TVector Schemas; }; -class TFilterInputConsumer : public NYql::NPureCalc::IConsumer>> { +class TFilterInputConsumer : public NYql::NPureCalc::IConsumer>>> { public: TFilterInputConsumer( const TFilterInputSpec& spec, @@ -91,28 +91,32 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer> value) override { + void OnObject(std::pair>> values) override { + Y_ENSURE(FieldsPositions.size() == values.second.size()); + NKikimr::NMiniKQL::TThrowingBindTerminator bind; - with_lock (Worker->GetScopedAlloc()) { auto& holderFactory = Worker->GetGraph().GetHolderFactory(); - NYql::NUdf::TUnboxedValue* items = nullptr; - NYql::NUdf::TUnboxedValue result = Cache.NewArray( - holderFactory, - static_cast(value.second.size() + 1), - items); - - items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(value.first); + // TODO: use blocks here + for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) { + NYql::NUdf::TUnboxedValue* items = nullptr; + + NYql::NUdf::TUnboxedValue result = Cache.NewArray( + holderFactory, + static_cast(values.second.size() + 1), + items); - Y_ENSURE(FieldsPositions.size() == value.second.size()); + items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first++); - size_t i = 0; - for (const auto& v : value.second) { - NYql::NUdf::TStringValue str(v); - items[FieldsPositions[i++]] = NYql::NUdf::TUnboxedValuePod(std::move(str)); + size_t fieldId = 0; + for (const auto& column : values.second) { + NYql::NUdf::TStringValue str(column[rowId]); + items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod(std::move(str)); + } + + Worker->Push(std::move(result)); } - Worker->Push(std::move(result)); } } @@ -196,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits { static constexpr bool IsPartial = false; static constexpr bool SupportPushStreamMode = true; - using TConsumerType = THolder>>>; + using TConsumerType = THolder>>>>; static TConsumerType MakeConsumer( const TFilterInputSpec& spec, @@ -238,8 +242,9 @@ class TJsonFilter::TImpl { LOG_ROW_DISPATCHER_DEBUG("Program created"); } - void Push(ui64 offset, const TList& value) { - InputConsumer->OnObject(std::make_pair(offset, value)); + void Push(ui64 offset, const TVector>& values) { + Y_ENSURE(values, "Expected non empty schema"); + InputConsumer->OnObject(std::make_pair(offset, values)); } TString GetSql() const { @@ -266,7 +271,7 @@ class TJsonFilter::TImpl { private: THolder> Program; - THolder>>> InputConsumer; + THolder>>>> InputConsumer; const TString Sql; }; @@ -280,9 +285,9 @@ TJsonFilter::TJsonFilter( TJsonFilter::~TJsonFilter() { } - -void TJsonFilter::Push(ui64 offset, const TList& value) { - Impl->Push(offset, value); + +void TJsonFilter::Push(ui64 offset, const TVector>& values) { + Impl->Push(offset, values); } TString TJsonFilter::GetSql() { diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index f1694a277fbb..f9e26afc1dfa 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -1,23 +1,24 @@ - #pragma once -namespace NFq { - #include #include +namespace NFq { + class TJsonFilter { public: using TCallback = std::function; - + public: TJsonFilter( - const TVector& columns, + const TVector& columns, const TVector& types, const TString& whereFilter, TCallback callback); + ~TJsonFilter(); - void Push(ui64 offset, const TList& value); + + void Push(ui64 offset, const TVector>& values); TString GetSql(); private: diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 84ca3018b509..6ce35203e12f 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -1,336 +1,212 @@ -#include +#include "json_parser.h" -#include -#include -#include -#include #include +#include + +#include namespace { using TCallback = NFq::TJsonParser::TCallback; -using TInputConsumerArg = std::pair; -const char* OffsetFieldName = "_offset"; TString LogPrefix = "JsonParser: "; +constexpr ui64 MAX_NUMBER_BUFFERS = 1000; -void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { - node.Add( - NYT::TNode::CreateList() - .Add(fieldName) - .Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType)) - ); -} +} // anonymous namespace -NYT::TNode MakeInputSchema() { - auto structMembers = NYT::TNode::CreateList(); - AddField(structMembers, OffsetFieldName, "Uint64"); - AddField(structMembers, "data", "String"); - return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); -} +namespace NFq { -NYT::TNode MakeOutputSchema(const TVector& columns) { - auto structMembers = NYT::TNode::CreateList(); - AddField(structMembers, OffsetFieldName, "Uint64"); - for (const auto& col : columns) { - AddField(structMembers, col, "String"); - } - return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); -} +//// TParserBuffer -class TParserInputConsumer : public NYql::NPureCalc::IConsumer { -public: - explicit TParserInputConsumer(NYql::NPureCalc::TWorkerHolder worker) - : Worker(std::move(worker)) { - } +TJsonParserBuffer::TJsonParserBuffer() + : Offset(0) + , NumberValues(0) + , Finished(false) +{} - ~TParserInputConsumer() override { - with_lock(Worker->GetScopedAlloc()) { - Cache.Clear(); - } - } +void TJsonParserBuffer::Reserve(size_t size) { + Y_ENSURE(!Finished, "Cannot reserve finished buffer"); - void OnObject(std::pair value) override { - NKikimr::NMiniKQL::TThrowingBindTerminator bind; - - with_lock (Worker->GetScopedAlloc()) { - auto& holderFactory = Worker->GetGraph().GetHolderFactory(); - NYql::NUdf::TUnboxedValue* items = nullptr; - - NYql::NUdf::TUnboxedValue result = Cache.NewArray( - holderFactory, - static_cast(2), - items); - - items[0] = NYql::NUdf::TUnboxedValuePod(value.first); - NYql::NUdf::TStringValue str(value.second.Size()); - std::memcpy(str.Data(), value.second.Data(), value.second.Size()); - items[1] = NYql::NUdf::TUnboxedValuePod(std::move(str)); - Worker->Push(std::move(result)); - } - } - - void OnFinish() override { - NKikimr::NMiniKQL::TBindTerminator bind(Worker->GetGraph().GetTerminator()); - with_lock(Worker->GetScopedAlloc()) { - Worker->OnFinish(); - } - } - -private: - NYql::NPureCalc::TWorkerHolder Worker; - NKikimr::NMiniKQL::TPlainContainerCache Cache; -}; + Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING)); +} +void TJsonParserBuffer::AddValue(const TString& value) { + Y_ENSURE(!Finished, "Cannot add value into finished buffer"); -class TParserInputSpec : public NYql::NPureCalc::TInputSpecBase { -public: - TParserInputSpec() { - Schemas = {MakeInputSchema()}; - } + NumberValues++; + Values << value; +} - const TVector& GetSchemas() const override { - return Schemas; - } +std::string_view TJsonParserBuffer::AddHolder(std::string_view value) { + Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders"); -private: - TVector Schemas; -}; + const size_t startPos = Values.size(); + Values << value; + return std::string_view(Values).substr(startPos, value.length()); +} +std::pair TJsonParserBuffer::Finish() { + Y_ENSURE(!Finished, "Cannot finish buffer twice"); + Finished = true; -class TParserOutputConsumer: public NYql::NPureCalc::IConsumer>> { -public: - TParserOutputConsumer(TCallback callback) - : Callback(callback) { - } + Values << TString(simdjson::SIMDJSON_PADDING, ' '); + Values.reserve(2 * Values.size()); + return {Values.data(), Values.size()}; +} - void OnObject(std::pair> value) override { - Callback(value.first, std::move(value.second)); - } +void TJsonParserBuffer::Clear() { + Y_ENSURE(Finished, "Cannot clear not finished buffer"); - void OnFinish() override { - Y_UNREACHABLE(); - } -private: - TCallback Callback; -}; + Offset = 0; + NumberValues = 0; + Finished = false; + Values.clear(); +} -class TParserOutputSpec: public NYql::NPureCalc::TOutputSpecBase { -public: - explicit TParserOutputSpec(const NYT::TNode& schema) - : Schema(schema) - {} +//// TJsonParser +class TJsonParser::TImpl { public: - const NYT::TNode& GetSchema() const override { - return Schema; - } - -private: - NYT::TNode Schema; -}; - -struct TFieldsMapping{ - TVector FieldsPositions; - size_t OffsetPosition; - - TFieldsMapping(const NYT::TNode& schema, const NKikimr::NMiniKQL::TType* outputType) { - THashMap outputPositions; - Y_ENSURE(outputType->IsStruct()); - const auto structType = static_cast(outputType); - const auto count = structType->GetMembersCount(); - - for (ui32 i = 1; i < count; ++i) { // 0 index - OffsetFieldName - const auto name = structType->GetMemberName(i); - outputPositions[name] = i; - } - - const auto& fields = schema[1]; - Y_ENSURE(fields.IsList()); - Y_ENSURE(count == fields.Size()); - for (size_t i = 0; i < fields.Size(); ++i) { - auto name = fields[i][0].AsString(); - if (name == OffsetFieldName) { - OffsetPosition = i; - continue; - } - FieldsPositions.push_back(outputPositions[name]); + TImpl(const TVector& columns, const TVector& types, TCallback callback) + : Callback(callback) + { + Y_UNUSED(types); // TODO: Will be used for UV creation + + Columns.reserve(columns.size()); + for (const auto& column : columns) { + Columns.emplace_back(column); } - } -}; -class TParserPushRelayImpl: public NYql::NPureCalc::IConsumer { -public: - TParserPushRelayImpl(const TParserOutputSpec& outputSpec, NYql::NPureCalc::IPushStreamWorker* worker, THolder>>> underlying) - : Underlying(std::move(underlying)) - , Worker(worker) - , FieldsMapping(outputSpec.GetSchema(), Worker->GetOutputType()) - { } - -public: - void OnObject(const NYql::NUdf::TUnboxedValue* value) override { - auto unguard = Unguard(Worker->GetScopedAlloc()); - TList result; - - Y_ENSURE(value->GetListLength() == FieldsMapping.FieldsPositions.size() + 1); - ui64 offset = value->GetElement(FieldsMapping.OffsetPosition).Get(); - - for (auto pos : FieldsMapping.FieldsPositions) { - const auto& cell = value->GetElement(pos); - - NYql::NUdf::TStringRef strRef(cell.AsStringRef()); - result.emplace_back(strRef.Data(), strRef.Size()); + ColumnsIndex.reserve(columns.size()); + for (size_t i = 0; i < Columns.size(); i++) { + ColumnsIndex.emplace(std::string_view(Columns[i]), i); } - - Underlying->OnObject(std::make_pair(offset, std::move(result))); } - void OnFinish() override { - auto unguard = Unguard(Worker->GetScopedAlloc()); - Underlying->OnFinish(); - } + void Parse() { + Y_ENSURE(UsedBuffers, "Nothing to parse"); -private: - THolder>>> Underlying; - NYql::NPureCalc::IWorker* Worker; - TFieldsMapping FieldsMapping; -}; + TJsonParserBuffer::TPtr buffer = UsedBuffers.begin(); + const auto [values, size] = buffer->Finish(); + LOG_ROW_DISPATCHER_TRACE("Parse values for offset " << buffer->GetOffset() << ":\n" << values); -} + const ui64 numberValues = buffer->GetNumberValues(); + TVector> parsedValues(Columns.size()); + for (auto& parsedColumn : parsedValues) { + parsedColumn.reserve(numberValues); + } -template <> -struct NYql::NPureCalc::TInputSpecTraits { - static constexpr bool IsPartial = false; - static constexpr bool SupportPushStreamMode = true; + static int counter = 0; - using TConsumerType = THolder>; + simdjson::ondemand::parser parser; + parser.threaded = false; - static TConsumerType MakeConsumer( - const TParserInputSpec& spec, - NYql::NPureCalc::TWorkerHolder worker - ) { - Y_UNUSED(spec); - return MakeHolder(std::move(worker)); - } -}; + simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE); + for (auto document : documents) { + counter++; + if (counter % 100000 == 0) { + Cerr << "---------------------------------- TJsonParser::Push, nuber buffers: " << FreeBuffers.size() + UsedBuffers.size() << ", counter: " << counter << ", has threads: " << parser.threaded << ", time: " << TInstant::Now() << Endl; + } + for (auto item : document.get_object()) { + const auto it = ColumnsIndex.find(item.escaped_key().value()); + if (it == ColumnsIndex.end()) { + continue; + } + + auto& parsedColumn = parsedValues[it->second]; + if (item.value().is_string()) { + parsedColumn.emplace_back(CreateHolderIfNeeded( + values, size, buffer, item.value().get_string().value() + )); + } else { + parsedColumn.emplace_back(CreateHolderIfNeeded( + values, size, buffer, item.value().raw_json_token().value() + )); + } + } + } -template <> -struct NYql::NPureCalc::TOutputSpecTraits { - static const constexpr bool IsPartial = false; - static const constexpr bool SupportPushStreamMode = true; + if (counter == 2200000) { + Cerr << "---------------------------------- Parsing finished, nuber buffers: " << FreeBuffers.size() + UsedBuffers.size() << ", numberInner: " << NumberInner << ", numberOuter: " << NumberOuter << ", time: " << TInstant::Now() << Endl; + } - static void SetConsumerToWorker(const TParserOutputSpec& outputSpec, NYql::NPureCalc::IPushStreamWorker* worker, THolder>>> consumer) { - worker->SetConsumer(MakeHolder(outputSpec, worker, std::move(consumer))); + Callback(std::move(parsedValues), buffer); } -}; -namespace NFq { - -class TJsonParser::TImpl { -public: - TImpl( - const TVector& columns, - const TVector& types, - TCallback callback) - : Sql(GenerateSql(columns, types)) { - auto options = NYql::NPureCalc::TProgramFactoryOptions(); - auto factory = NYql::NPureCalc::MakeProgramFactory(options); - - LOG_ROW_DISPATCHER_DEBUG("Creating program..."); - Program = factory->MakePushStreamProgram( - TParserInputSpec(), - TParserOutputSpec(MakeOutputSchema(columns)), - Sql, - NYql::NPureCalc::ETranslationMode::SExpr - ); - LOG_ROW_DISPATCHER_DEBUG("Program created"); - InputConsumer = Program->Apply(MakeHolder(callback)); - LOG_ROW_DISPATCHER_DEBUG("InputConsumer created"); - } + TJsonParserBuffer& GetBuffer(ui64 offset) { + if (FreeBuffers) { + UsedBuffers.emplace_front(std::move(FreeBuffers.front())); + FreeBuffers.erase(FreeBuffers.begin()); + } else { + UsedBuffers.emplace_front(); + } - void Push( ui64 offset, const TString& value) { - LOG_ROW_DISPATCHER_TRACE("Push " << value); - InputConsumer->OnObject(std::make_pair(offset, value)); + return UsedBuffers.front().SetOffset(offset); } - TString GetSql() const { - return Sql; + void ReleaseBuffer(TJsonParserBuffer::TPtr buffer) { + buffer->Clear(); + if (FreeBuffers.size() + UsedBuffers.size() <= MAX_NUMBER_BUFFERS) { + FreeBuffers.emplace_back(std::move(*buffer)); + } + UsedBuffers.erase(buffer); } -private: - TString GenerateSql(const TVector& columnNames, const TVector& columnTypes) { - Y_ABORT_UNLESS(columnNames.size() == columnTypes.size(), "Unexpected column types size"); - - TStringStream udfOutputType; - TStringStream resultType; - for (size_t i = 0; i < columnNames.size(); ++i) { - const TString& lastSymbol = i + 1 == columnNames.size() ? "" : " "; - const TString& column = columnNames[i]; - const TString& type = SkipOptional(columnTypes[i]); - - udfOutputType << "'('" << column << " (DataType '" << type << "))" << lastSymbol; - resultType << "'('" << column << " (SafeCast (Member $parsed '" << column << ") $string_type))" << lastSymbol; + TString GetDescription() const { + TStringBuilder description = TStringBuilder() << "Columns: "; + for (const auto& column : Columns) { + description << "'" << column << "' "; } - - TStringStream str; - str << R"( - ( - (let $string_type (DataType 'String)) - - (let $input_type (TupleType $string_type (DataType 'Uint64))) - (let $output_type (TupleType (StructType )" << udfOutputType.Str() << R"() (DataType 'Uint64))) - (let $udf_argument_type (TupleType $input_type (StructType) $output_type)) - (let $udf_callable_type (CallableType '('1) '((StreamType $output_type)) '((StreamType $input_type)) '((OptionalType (DataType 'Utf8))))) - (let $udf (Udf 'ClickHouseClient.ParseFormat (Void) $udf_argument_type 'json_each_row $udf_callable_type (VoidType) '"" '())) - - (return (Map (Apply $udf (Map (Self '0) (lambda '($input) (block '( - (return '((Member $input 'data) (Member $input ')" << OffsetFieldName << R"())) - ))))) (lambda '($output) (block '( - (let $parsed (Nth $output '0)) - (return (AsStruct '(')" << OffsetFieldName << R"( (Nth $output '1)) )" << resultType.Str() << R"()) - ))))) - ) - )"; - LOG_ROW_DISPATCHER_DEBUG("GenerateSql " << str.Str()); - return str.Str(); + return description; } - static TString SkipOptional(TStringBuf type) { - if (type.StartsWith("Optional")) { - Y_ABORT_UNLESS(type.SkipPrefix("Optional<")); - Y_ABORT_UNLESS(type.ChopSuffix(">")); +private: + std::string_view CreateHolderIfNeeded(const char* dataHolder, size_t size, TJsonParserBuffer::TPtr buffer, std::string_view value) { + ptrdiff_t diff = value.data() - dataHolder; + if (0 <= diff && static_cast(diff) < size) { + NumberInner++; + return value; } - return TString(type); + NumberOuter++; + return buffer->AddHolder(value); } private: - THolder> Program; - THolder> InputConsumer; - const TString Sql; + const TCallback Callback; + TVector Columns; + absl::flat_hash_map ColumnsIndex; + + TList UsedBuffers; + TList FreeBuffers; + + inline static int NumberInner = 0; + inline static int NumberOuter = 0; }; -TJsonParser::TJsonParser( - const TVector& columns, - const TVector& types, - TCallback callback) - : Impl(std::make_unique(columns, types, callback)) { -} +TJsonParser::TJsonParser(const TVector& columns, const TVector& types, TCallback callback) + : Impl(std::make_unique(columns, types, callback)) +{} TJsonParser::~TJsonParser() { } - -void TJsonParser::Push(ui64 offset, const TString& value) { - Impl->Push(offset, value); + +TJsonParserBuffer& TJsonParser::GetBuffer(ui64 offset) { + return Impl->GetBuffer(offset); +} + +void TJsonParser::ReleaseBuffer(TJsonParserBuffer::TPtr buffer) { + Impl->ReleaseBuffer(buffer); +} + +void TJsonParser::Parse() { + Impl->Parse(); } -TString TJsonParser::GetSql() { - return Impl->GetSql(); +TString TJsonParser::GetDescription() const { + return Impl->GetDescription(); } -std::unique_ptr NewJsonParser( - const TVector& columns, - const TVector& types, - TCallback callback) { +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TCallback callback) { return std::unique_ptr(new TJsonParser(columns, types, callback)); } diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index cb5137105e6b..7488488f4b45 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -1,23 +1,53 @@ #pragma once -#include +#include +#include -#include +#include namespace NFq { +class TJsonParserBuffer { +public: + using TPtr = TList::iterator; + +public: + TJsonParserBuffer(); + + void Reserve(size_t size); + + void AddValue(const TString& value); + std::string_view AddHolder(std::string_view value); + + std::pair Finish(); + void Clear(); + +private: + YDB_ACCESSOR_DEF(ui64, Offset); + YDB_READONLY_DEF(size_t, NumberValues); + YDB_READONLY_DEF(bool, Finished) + + TStringBuilder Values; +}; + class TJsonParser { public: - using TCallback = std::function&&)>; - + using TCallback = std::function>&&, TJsonParserBuffer::TPtr)>; + public: TJsonParser( const TVector& columns, const TVector& types, TCallback callback); + ~TJsonParser(); - void Push(ui64 offset, const TString& value); - TString GetSql(); + + TJsonParserBuffer& GetBuffer(ui64 offset); + void ReleaseBuffer(TJsonParserBuffer::TPtr buffer); + + void Parse(); + + TString GetDescription() const; private: class TImpl; diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index a1dc73bfef63..0ef7b0aafa20 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -63,20 +63,41 @@ struct TEvPrivate { struct TEvCreateSession : public NActors::TEventLocal {}; struct TEvPrintState : public NActors::TEventLocal {}; struct TEvStatus : public NActors::TEventLocal {}; + struct TEvDataParsed : public NActors::TEventLocal { - TEvDataParsed(ui64 offset, TList&& value) - : Offset(offset) - , Value(std::move(value)) + TEvDataParsed(TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer) + : Offset(buffer->GetOffset()) + , NumberValues(parsedValues.empty() ? 0 : parsedValues.front().size()) + , ParsedValues(std::move(parsedValues)) + , Buffer(buffer) {} - ui64 Offset = 0; - TList Value; + + TString DebugString() const { + TStringBuilder result; + for (const auto& columnResult : ParsedValues) { + result << "Parsed column: "; + for (const auto& value : columnResult) { + result << "'" << value << "' "; + } + result << "\n"; + } + return result; + } + + const ui64 Offset; + const ui64 NumberValues; + TVector> ParsedValues; + TJsonParserBuffer::TPtr Buffer; }; struct TEvDataFiltered : public NActors::TEventLocal { - TEvDataFiltered(ui64 offset) + TEvDataFiltered(ui64 offset, ui64 numberValues) : Offset(offset) + , NumberValues(numberValues) {} - ui64 Offset = 0; + + const ui64 Offset; + const ui64 NumberValues; }; struct TEvDataAfterFilteration : public NActors::TEventLocal { @@ -179,7 +200,7 @@ class TTopicSession : public TActorBootstrapped { void CreateTopicSession(); void CloseTopicSession(); void SubscribeOnNextEvent(); - void SendToParsing(ui64 offset, const TString& message); + void SendToParsing(const TVector& messages); void SendData(ClientsInfo& info); void InitParser(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams); void FatalError(const TString& message, const std::unique_ptr* filter = nullptr); @@ -371,24 +392,27 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) { } void TTopicSession::Handle(NFq::TEvPrivate::TEvDataParsed::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("TEvDataParsed, offset " << ev->Get()->Offset); - - for (auto v: ev->Get()->Value) { - LOG_ROW_DISPATCHER_TRACE("v " << v); - } + LOG_ROW_DISPATCHER_TRACE("TEvDataParsed, offset " << ev->Get()->Offset << ", data:\n" << ev->Get()->DebugString()); for (auto& [actorId, info] : Clients) { try { - if (!info.Filter) { - continue; + if (info.Filter) { + info.Filter->Push(ev->Get()->Offset, ev->Get()->ParsedValues); } - info.Filter->Push(ev->Get()->Offset, ev->Get()->Value); } catch (const std::exception& e) { FatalError(e.what(), &info.Filter); } } - auto event = std::make_unique(ev->Get()->Offset); - Send(SelfId(), event.release()); + Parser->ReleaseBuffer(ev->Get()->Buffer); + + Send(SelfId(), new TEvPrivate::TEvDataFiltered(ev->Get()->Offset, ev->Get()->NumberValues)); + + if (counter / 100000 > (counter - ev->Get()->NumberValues) / 100000) { + Cerr << "---------------------------------- TEvDataParsed, counter: " << counter << ", time: " << TInstant::Now() << Endl; + } + if (counter == 2200000) { + Cerr << "---------------------------------- Filtering finished, time: " << TInstant::Now() << Endl; + } } void TTopicSession::Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr& ev) { @@ -422,11 +446,11 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvStatus::TPtr&) { } void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("TEvDataFiltered, offset " << ev->Get()->Offset); + LOG_ROW_DISPATCHER_TRACE("TEvDataFiltered, offset " << ev->Get()->Offset << ", number values " << ev->Get()->NumberValues); + const ui64 offset = ev->Get()->Offset + ev->Get()->NumberValues; for (auto& [actorId, info] : Clients) { - if (!info.NextMessageOffset - || *info.NextMessageOffset < ev->Get()->Offset + 1) { - info.NextMessageOffset = ev->Get()->Offset + 1; + if (!info.NextMessageOffset || *info.NextMessageOffset < offset) { + info.NextMessageOffset = offset; } } } @@ -472,14 +496,13 @@ void TTopicSession::CloseTopicSession() { void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { Self.Metrics.RowsRead->Add(event.GetMessages().size()); for (const auto& message : event.GetMessages()) { - const TString& data = message.GetData(); - Self.IngressStats.Bytes += data.size(); LOG_ROW_DISPATCHER_TRACE("Data received: " << message.DebugString(true)); - TString item = message.GetData(); - Self.SendToParsing(message.GetOffset(), item); + Self.IngressStats.Bytes += message.GetData().size(); Self.LastMessageOffset = message.GetOffset(); } + + Self.SendToParsing(event.GetMessages()); } void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClosedEvent& ev) { @@ -528,25 +551,36 @@ TString TTopicSession::GetSessionId() const { return ReadSession ? ReadSession->GetSessionId() : TString{"empty"}; } -void TTopicSession::SendToParsing(ui64 offset, const TString& message) { - LOG_ROW_DISPATCHER_TRACE("SendToParsing, message " << message); +void TTopicSession::SendToParsing(const TVector& messages) { + size_t valuesSize = 0; + for (const auto& message : messages) { + const auto& data = message.GetData(); + valuesSize += data.size(); - for (auto& readActorId : ClientsWithoutPredicate) { - auto it = Clients.find(readActorId); - Y_ENSURE(it != Clients.end(), "Internal error: unknown client"); - auto& info = it->second; - if (!info.Filter) { - LOG_ROW_DISPATCHER_TRACE("Send message to client without parsing/filtering"); - AddDataToClient(info, offset, message); + for (const auto& readActorId : ClientsWithoutPredicate) { + const auto it = Clients.find(readActorId); + Y_ENSURE(it != Clients.end(), "Internal error: unknown client"); + if (auto& info = it->second; !info.Filter) { + LOG_ROW_DISPATCHER_TRACE("Send message to client without parsing/filtering"); + AddDataToClient(info, message.GetOffset(), data); + } } } - if (ClientsWithoutPredicate.size() == Clients.size()) { + if (ClientsWithoutPredicate.size() == Clients.size() || messages.empty()) { return; } + TJsonParserBuffer& buffer = Parser->GetBuffer(messages.front().GetOffset()); + buffer.Reserve(valuesSize); + for (const auto& message : messages) { + buffer.AddValue(message.GetData()); + } + + LOG_ROW_DISPATCHER_TRACE("SendToParsing, buffer with offset " << buffer.GetOffset() << " and size " << buffer.GetNumberValues()); + try { - Parser->Push(offset, message); + Parser->Parse(); } catch (const std::exception& e) { FatalError(e.what()); } @@ -682,8 +716,8 @@ void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& source Parser = NewJsonParser( GetVector(sourceParams.GetColumns()), GetVector(sourceParams.GetColumnTypes()), - [actorSystem, selfId = SelfId()](ui64 offset, TList&& value){ - actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(offset, std::move(value))); + [actorSystem, selfId = SelfId()](TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer){ + actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(std::move(parsedValues), buffer)); }); } catch (const NYql::NPureCalc::TCompileError& e) { FatalError(e.GetIssues()); @@ -694,10 +728,10 @@ void TTopicSession::FatalError(const TString& message, const std::unique_ptrGetSql(); + str << ", parser description:\n" << Parser->GetDescription(); } if (filter) { - str << ", filter sql:" << (*filter)->GetSql(); + str << ", filter sql:\n" << (*filter)->GetSql(); } LOG_ROW_DISPATCHER_ERROR("FatalError: " << str.Str()); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 1645f521051d..61bae7e6afbd 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -56,8 +56,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push(5, {"hello1", "99"}); - Filter->Push(6, {"hello2", "101"}); + Filter->Push(5, {{"hello1"}, {"99"}}); + Filter->Push(6, {{"hello2"}, {"101"}}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } @@ -71,20 +71,34 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push(5, {"99", "hello1"}); - Filter->Push(6, {"101", "hello2"}); + Filter->Push(5, {{"99"}, {"hello1"}}); + Filter->Push(6, {{"101"}, {"hello2"}}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } - Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { + Y_UNIT_TEST_F(ManyValues, TFixture) { + TMap result; + MakeFilter( + {"a1", "a2"}, + {"String", "UInt64"}, + "where a2 > 100", + [&](ui64 offset, const TString& json) { + result[offset] = json; + }); + Filter->Push(5, {{"hello1", "hello2"}, {"99", "101"}}); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); + } + + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeFilter( {"a1", "a2"}, {"String", "UInt64"}, "where Unwrap(a2) = 1", [&](ui64, const TString&) { }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push(5, {"99", "hello1"}), yexception, "Failed to unwrap empty optional"); - } + UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push(5, {{"99"}, {"hello1"}}), yexception, "Failed to unwrap empty optional"); + } } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index a9c389d3900f..2c394887a48f 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -7,10 +7,10 @@ #include #include -#include - #include +#include + namespace { using namespace NKikimr; @@ -24,8 +24,9 @@ class TFixture : public NUnitTest::TBaseFixture { void SetUp(NUnitTest::TTestContext&) override { TAutoPtr app = new TAppPrepare(); + Runtime.SetLogBackend(CreateStderrBackend()); + Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_TRACE); Runtime.Initialize(app->Unwrap()); - Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG); } void TearDown(NUnitTest::TTestContext& /* context */) override { @@ -34,89 +35,116 @@ class TFixture : public NUnitTest::TBaseFixture { } } - void MakeParser(TVector columns, TVector types, NFq::TJsonParser::TCallback callback) { - try { - Parser = NFq::NewJsonParser( - columns, - types, - callback); - } catch (NYql::NPureCalc::TCompileError compileError) { - UNIT_ASSERT_C(false, TStringBuilder() << "Failed to create json parser: " << compileError.what() << "\nQuery text:\n" << compileError.GetYql() << "Reason:\n" << compileError.GetIssues()); - } + void MakeParser(TVector columns, TVector types) { + Parser = NFq::NewJsonParser(columns, types, [this](TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer){ + ResultOffset = buffer->GetOffset(); + ParsedValues = std::move(parsedValues); + ResultNumberValues = ParsedValues.empty() ? 0 : ParsedValues.front().size(); + }); + } + + void MakeParser(TVector columns) { + MakeParser(columns, TVector(columns.size(), "String")); } - void MakeParser(TVector columns, NFq::TJsonParser::TCallback callback) { - MakeParser(columns, TVector(columns.size(), "String"), callback); + void PushToParser(ui64 offset, const TString& data) const { + TJsonParserBuffer& buffer = Parser->GetBuffer(offset); + buffer.Reserve(data.size()); + buffer.AddValue(data); + Parser->Parse(); + } + + TVector GetParsedRow(size_t id) const { + TVector result; + result.reserve(ParsedValues.size()); + for (const auto& columnResult : ParsedValues) { + result.emplace_back(columnResult[id]); + } + return result; } TActorSystemStub actorSystemStub; NActors::TTestActorRuntime Runtime; std::unique_ptr Parser; + + ui64 ResultOffset; + ui64 ResultNumberValues; + TVector> ParsedValues; }; Y_UNIT_TEST_SUITE(TJsonParserTests) { - Y_UNIT_TEST_F(Simple1, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a1", "a2"}, {"String", "Optional"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple1, TFixture) { + MakeParser({"a1", "a2"}, {"String", "Optional"}); + PushToParser(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); UNIT_ASSERT_VALUES_EQUAL("101", result.back()); } - Y_UNIT_TEST_F(Simple2, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a2", "a1"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple2, TFixture) { + MakeParser({"a2", "a1"}); + PushToParser(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("101", result.front()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.back()); } - Y_UNIT_TEST_F(Simple3, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a1", "a2"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple3, TFixture) { + MakeParser({"a1", "a2"}); + PushToParser(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("101", result.front()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.back()); } - Y_UNIT_TEST_F(Simple4, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a2", "a1"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple4, TFixture) { + MakeParser({"a2", "a1"}); + PushToParser(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); UNIT_ASSERT_VALUES_EQUAL("101", result.back()); } - Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { + Y_UNIT_TEST_F(ManyValues, TFixture) { + MakeParser({"a1", "a2"}); + + TJsonParserBuffer& buffer = Parser->GetBuffer(42); + buffer.AddValue(R"({"a1": "hello1", "a2": 101, "event": "event1"})"); + buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event2"})"); + buffer.AddValue(R"({"a2": "101", "a1": "hello1", "event": "event3"})"); + + Parser->Parse(); + UNIT_ASSERT_VALUES_EQUAL(42, ResultOffset); + UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); - MakeParser({"a2", "a1"}, [&](ui64, TList&&){ }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, "DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)"); + for (size_t i = 0; i < ResultNumberValues; ++i) { + const auto& result = GetParsedRow(i); + UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", result.front(), i); + UNIT_ASSERT_VALUES_EQUAL_C("101", result.back(), i); + } } -} + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { + MakeParser({"a2", "a1"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(5, R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); + } } +} diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 65c24fcb85f1..25e9d5a7c810 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -99,20 +99,20 @@ class TFixture : public NUnitTest::TBaseFixture { void ExpectMessageBatch(NActors::TActorId readActorId, const std::vector& expected) { auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->ReadActorId == readActorId); - UNIT_ASSERT(expected.size() == eventHolder->Get()->Record.MessagesSize()); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); + UNIT_ASSERT_VALUES_EQUAL(expected.size(), eventHolder->Get()->Record.MessagesSize()); for (size_t i = 0; i < expected.size(); ++i) { NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(i); std::cerr << "message.GetJson() " << message.GetJson() << std::endl; - UNIT_ASSERT(expected[i] == message.GetJson()); + UNIT_ASSERT_VALUES_EQUAL(expected[i], message.GetJson()); } } void ExpectSessionError(NActors::TActorId readActorId, TString message) { auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->ReadActorId == readActorId); - UNIT_ASSERT(TString(eventHolder->Get()->Record.GetMessage()).Contains(message)); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); + UNIT_ASSERT_STRING_CONTAINS(TString(eventHolder->Get()->Record.GetMessage()), message); } void ExpectNewDataArrived(TSet readActorIds) { @@ -129,7 +129,7 @@ class TFixture : public NUnitTest::TBaseFixture { Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch())); auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->ReadActorId == readActorId); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); return eventHolder->Get()->Record.MessagesSize(); } @@ -284,7 +284,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { "not json", "noch einmal / nicht json" }; PQWrite(data, topicName); - ExpectSessionError(ReadActorId1, "DB::ParsingException: Cannot parse input: expected '{' before: 'not json': (at row 1)"); + ExpectSessionError(ReadActorId1, "INCORRECT_TYPE: The JSON element does not have the requested type."); StopSession(ReadActorId1, source); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/ya.make b/ydb/core/fq/libs/row_dispatcher/ut/ya.make index 0fbe9a1c0b63..9486156aad38 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ut/ya.make @@ -19,7 +19,6 @@ PEERDIR( ydb/library/yql/udfs/common/json2 ydb/library/yql/udfs/common/yson2 ydb/tests/fq/pq_async_io - ydb/library/yql/udfs/common/clickhouse/client ) SIZE(MEDIUM) diff --git a/ydb/core/fq/libs/row_dispatcher/ya.make b/ydb/core/fq/libs/row_dispatcher/ya.make index c0af300978aa..0ec1b5244197 100644 --- a/ydb/core/fq/libs/row_dispatcher/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ya.make @@ -13,6 +13,7 @@ SRCS( PEERDIR( contrib/libs/fmt + contrib/libs/simdjson ydb/core/fq/libs/actors/logging ydb/core/fq/libs/config/protos ydb/core/fq/libs/control_plane_storage @@ -39,4 +40,3 @@ IF(NOT EXPORT_CMAKE) ut ) ENDIF() - From 41d45d6508f54b5155ac71371e854889d5d34415 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 8 Oct 2024 11:00:01 +0000 Subject: [PATCH 2/4] Removed debug counters --- .../fq/libs/row_dispatcher/json_parser.cpp | 22 +------------------ .../fq/libs/row_dispatcher/topic_session.cpp | 7 ------ .../libs/row_dispatcher/ut/json_parser_ut.cpp | 2 +- 3 files changed, 2 insertions(+), 29 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 6ce35203e12f..964cc7720bf8 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -10,7 +10,7 @@ namespace { using TCallback = NFq::TJsonParser::TCallback; TString LogPrefix = "JsonParser: "; -constexpr ui64 MAX_NUMBER_BUFFERS = 1000; +constexpr ui64 MAX_NUMBER_BUFFERS = 10; } // anonymous namespace @@ -26,20 +26,17 @@ TJsonParserBuffer::TJsonParserBuffer() void TJsonParserBuffer::Reserve(size_t size) { Y_ENSURE(!Finished, "Cannot reserve finished buffer"); - Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING)); } void TJsonParserBuffer::AddValue(const TString& value) { Y_ENSURE(!Finished, "Cannot add value into finished buffer"); - NumberValues++; Values << value; } std::string_view TJsonParserBuffer::AddHolder(std::string_view value) { Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders"); - const size_t startPos = Values.size(); Values << value; return std::string_view(Values).substr(startPos, value.length()); @@ -48,7 +45,6 @@ std::string_view TJsonParserBuffer::AddHolder(std::string_view value) { std::pair TJsonParserBuffer::Finish() { Y_ENSURE(!Finished, "Cannot finish buffer twice"); Finished = true; - Values << TString(simdjson::SIMDJSON_PADDING, ' '); Values.reserve(2 * Values.size()); return {Values.data(), Values.size()}; @@ -56,7 +52,6 @@ std::pair TJsonParserBuffer::Finish() { void TJsonParserBuffer::Clear() { Y_ENSURE(Finished, "Cannot clear not finished buffer"); - Offset = 0; NumberValues = 0; Finished = false; @@ -96,17 +91,11 @@ class TJsonParser::TImpl { parsedColumn.reserve(numberValues); } - static int counter = 0; - simdjson::ondemand::parser parser; parser.threaded = false; simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE); for (auto document : documents) { - counter++; - if (counter % 100000 == 0) { - Cerr << "---------------------------------- TJsonParser::Push, nuber buffers: " << FreeBuffers.size() + UsedBuffers.size() << ", counter: " << counter << ", has threads: " << parser.threaded << ", time: " << TInstant::Now() << Endl; - } for (auto item : document.get_object()) { const auto it = ColumnsIndex.find(item.escaped_key().value()); if (it == ColumnsIndex.end()) { @@ -126,10 +115,6 @@ class TJsonParser::TImpl { } } - if (counter == 2200000) { - Cerr << "---------------------------------- Parsing finished, nuber buffers: " << FreeBuffers.size() + UsedBuffers.size() << ", numberInner: " << NumberInner << ", numberOuter: " << NumberOuter << ", time: " << TInstant::Now() << Endl; - } - Callback(std::move(parsedValues), buffer); } @@ -164,10 +149,8 @@ class TJsonParser::TImpl { std::string_view CreateHolderIfNeeded(const char* dataHolder, size_t size, TJsonParserBuffer::TPtr buffer, std::string_view value) { ptrdiff_t diff = value.data() - dataHolder; if (0 <= diff && static_cast(diff) < size) { - NumberInner++; return value; } - NumberOuter++; return buffer->AddHolder(value); } @@ -178,9 +161,6 @@ class TJsonParser::TImpl { TList UsedBuffers; TList FreeBuffers; - - inline static int NumberInner = 0; - inline static int NumberOuter = 0; }; TJsonParser::TJsonParser(const TVector& columns, const TVector& types, TCallback callback) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 0ef7b0aafa20..03460a014cc9 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -406,13 +406,6 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvDataParsed::TPtr& ev) { Parser->ReleaseBuffer(ev->Get()->Buffer); Send(SelfId(), new TEvPrivate::TEvDataFiltered(ev->Get()->Offset, ev->Get()->NumberValues)); - - if (counter / 100000 > (counter - ev->Get()->NumberValues) / 100000) { - Cerr << "---------------------------------- TEvDataParsed, counter: " << counter << ", time: " << TInstant::Now() << Endl; - } - if (counter == 2200000) { - Cerr << "---------------------------------- Filtering finished, time: " << TInstant::Now() << Endl; - } } void TTopicSession::Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr& ev) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 2c394887a48f..e757706a0eff 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -36,7 +36,7 @@ class TFixture : public NUnitTest::TBaseFixture { } void MakeParser(TVector columns, TVector types) { - Parser = NFq::NewJsonParser(columns, types, [this](TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer){ + Parser = NFq::NewJsonParser(columns, types, [this](TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer) { ResultOffset = buffer->GetOffset(); ParsedValues = std::move(parsedValues); ResultNumberValues = ParsedValues.empty() ? 0 : ParsedValues.front().size(); From 145a3c18deecc073fd8c429453efe7f54f7b0d54 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 8 Oct 2024 13:35:45 +0000 Subject: [PATCH 3/4] Removed TEvDataParsed --- .../fq/libs/row_dispatcher/json_parser.cpp | 101 ++++++++---------- ydb/core/fq/libs/row_dispatcher/json_parser.h | 26 +---- .../fq/libs/row_dispatcher/topic_session.cpp | 81 +++++--------- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 38 +++---- 4 files changed, 90 insertions(+), 156 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 964cc7720bf8..f0e9ab7122d6 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -8,9 +8,7 @@ namespace { -using TCallback = NFq::TJsonParser::TCallback; TString LogPrefix = "JsonParser: "; -constexpr ui64 MAX_NUMBER_BUFFERS = 10; } // anonymous namespace @@ -19,8 +17,7 @@ namespace NFq { //// TParserBuffer TJsonParserBuffer::TJsonParserBuffer() - : Offset(0) - , NumberValues(0) + : NumberValues(0) , Finished(false) {} @@ -52,7 +49,6 @@ std::pair TJsonParserBuffer::Finish() { void TJsonParserBuffer::Clear() { Y_ENSURE(Finished, "Cannot clear not finished buffer"); - Offset = 0; NumberValues = 0; Finished = false; Values.clear(); @@ -62,8 +58,8 @@ void TJsonParserBuffer::Clear() { class TJsonParser::TImpl { public: - TImpl(const TVector& columns, const TVector& types, TCallback callback) - : Callback(callback) + TImpl(const TVector& columns, const TVector& types) + : ParsedValues(columns.size()) { Y_UNUSED(types); // TODO: Will be used for UV creation @@ -78,17 +74,13 @@ class TJsonParser::TImpl { } } - void Parse() { - Y_ENSURE(UsedBuffers, "Nothing to parse"); + const TVector>& Parse() { + const auto [values, size] = Buffer.Finish(); + LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); - TJsonParserBuffer::TPtr buffer = UsedBuffers.begin(); - const auto [values, size] = buffer->Finish(); - LOG_ROW_DISPATCHER_TRACE("Parse values for offset " << buffer->GetOffset() << ":\n" << values); - - const ui64 numberValues = buffer->GetNumberValues(); - TVector> parsedValues(Columns.size()); - for (auto& parsedColumn : parsedValues) { - parsedColumn.reserve(numberValues); + for (auto& parsedColumn : ParsedValues) { + parsedColumn.clear(); + parsedColumn.reserve(Buffer.GetNumberValues()); } simdjson::ondemand::parser parser; @@ -102,39 +94,26 @@ class TJsonParser::TImpl { continue; } - auto& parsedColumn = parsedValues[it->second]; + auto& parsedColumn = ParsedValues[it->second]; if (item.value().is_string()) { parsedColumn.emplace_back(CreateHolderIfNeeded( - values, size, buffer, item.value().get_string().value() + values, size, item.value().get_string().value() )); } else { parsedColumn.emplace_back(CreateHolderIfNeeded( - values, size, buffer, item.value().raw_json_token().value() + values, size, item.value().raw_json_token().value() )); } } } - - Callback(std::move(parsedValues), buffer); + return ParsedValues; } - TJsonParserBuffer& GetBuffer(ui64 offset) { - if (FreeBuffers) { - UsedBuffers.emplace_front(std::move(FreeBuffers.front())); - FreeBuffers.erase(FreeBuffers.begin()); - } else { - UsedBuffers.emplace_front(); + TJsonParserBuffer& GetBuffer() { + if (Buffer.GetFinished()) { + Buffer.Clear(); } - - return UsedBuffers.front().SetOffset(offset); - } - - void ReleaseBuffer(TJsonParserBuffer::TPtr buffer) { - buffer->Clear(); - if (FreeBuffers.size() + UsedBuffers.size() <= MAX_NUMBER_BUFFERS) { - FreeBuffers.emplace_back(std::move(*buffer)); - } - UsedBuffers.erase(buffer); + return Buffer; } TString GetDescription() const { @@ -142,52 +121,64 @@ class TJsonParser::TImpl { for (const auto& column : Columns) { description << "'" << column << "' "; } + description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished(); return description; } + TString GetDebugString(const TVector>& parsedValues) const { + TStringBuilder result; + for (size_t i = 0; i < Columns.size(); ++i) { + result << "Parsed column '" << Columns[i] << "': "; + for (const auto& value : parsedValues[i]) { + result << "'" << value << "' "; + } + result << "\n"; + } + return result; + } + private: - std::string_view CreateHolderIfNeeded(const char* dataHolder, size_t size, TJsonParserBuffer::TPtr buffer, std::string_view value) { + std::string_view CreateHolderIfNeeded(const char* dataHolder, size_t size, std::string_view value) { ptrdiff_t diff = value.data() - dataHolder; if (0 <= diff && static_cast(diff) < size) { return value; } - return buffer->AddHolder(value); + return Buffer.AddHolder(value); } private: - const TCallback Callback; TVector Columns; absl::flat_hash_map ColumnsIndex; - TList UsedBuffers; - TList FreeBuffers; + TJsonParserBuffer Buffer; + TVector> ParsedValues; }; -TJsonParser::TJsonParser(const TVector& columns, const TVector& types, TCallback callback) - : Impl(std::make_unique(columns, types, callback)) +TJsonParser::TJsonParser(const TVector& columns, const TVector& types) + : Impl(std::make_unique(columns, types)) {} TJsonParser::~TJsonParser() { } -TJsonParserBuffer& TJsonParser::GetBuffer(ui64 offset) { - return Impl->GetBuffer(offset); +TJsonParserBuffer& TJsonParser::GetBuffer() { + return Impl->GetBuffer(); } -void TJsonParser::ReleaseBuffer(TJsonParserBuffer::TPtr buffer) { - Impl->ReleaseBuffer(buffer); -} - -void TJsonParser::Parse() { - Impl->Parse(); +const TVector>& TJsonParser::Parse() { + return Impl->Parse(); } TString TJsonParser::GetDescription() const { return Impl->GetDescription(); } -std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TCallback callback) { - return std::unique_ptr(new TJsonParser(columns, types, callback)); +TString TJsonParser::GetDebugString(const TVector>& parsedValues) const { + return Impl->GetDebugString(parsedValues); +} + +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types) { + return std::unique_ptr(new TJsonParser(columns, types)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index 7488488f4b45..fcdf707236b1 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include @@ -8,9 +7,6 @@ namespace NFq { class TJsonParserBuffer { -public: - using TPtr = TList::iterator; - public: TJsonParserBuffer(); @@ -23,7 +19,6 @@ class TJsonParserBuffer { void Clear(); private: - YDB_ACCESSOR_DEF(ui64, Offset); YDB_READONLY_DEF(size_t, NumberValues); YDB_READONLY_DEF(bool, Finished) @@ -32,31 +27,20 @@ class TJsonParserBuffer { class TJsonParser { public: - using TCallback = std::function>&&, TJsonParserBuffer::TPtr)>; - -public: - TJsonParser( - const TVector& columns, - const TVector& types, - TCallback callback); - + TJsonParser(const TVector& columns, const TVector& types); ~TJsonParser(); - TJsonParserBuffer& GetBuffer(ui64 offset); - void ReleaseBuffer(TJsonParserBuffer::TPtr buffer); - - void Parse(); + TJsonParserBuffer& GetBuffer(); + const TVector>& Parse(); TString GetDescription() const; + TString GetDebugString(const TVector>& parsedValues) const; private: class TImpl; const std::unique_ptr Impl; }; -std::unique_ptr NewJsonParser( - const TVector& columns, - const TVector& types, - TJsonParser::TCallback callback); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 03460a014cc9..d1ae9fd1f73c 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -50,7 +50,6 @@ struct TEvPrivate { EvPqEventsReady = EvBegin + 10, EvCreateSession, EvStatus, - EvDataParsed, EvDataAfterFilteration, EvDataFiltered, EvPrintState, @@ -64,32 +63,6 @@ struct TEvPrivate { struct TEvPrintState : public NActors::TEventLocal {}; struct TEvStatus : public NActors::TEventLocal {}; - struct TEvDataParsed : public NActors::TEventLocal { - TEvDataParsed(TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer) - : Offset(buffer->GetOffset()) - , NumberValues(parsedValues.empty() ? 0 : parsedValues.front().size()) - , ParsedValues(std::move(parsedValues)) - , Buffer(buffer) - {} - - TString DebugString() const { - TStringBuilder result; - for (const auto& columnResult : ParsedValues) { - result << "Parsed column: "; - for (const auto& value : columnResult) { - result << "'" << value << "' "; - } - result << "\n"; - } - return result; - } - - const ui64 Offset; - const ui64 NumberValues; - TVector> ParsedValues; - TJsonParserBuffer::TPtr Buffer; - }; - struct TEvDataFiltered : public NActors::TEventLocal { TEvDataFiltered(ui64 offset, ui64 numberValues) : Offset(offset) @@ -201,6 +174,7 @@ class TTopicSession : public TActorBootstrapped { void CloseTopicSession(); void SubscribeOnNextEvent(); void SendToParsing(const TVector& messages); + void SendToFiltering(ui64 offset, const TVector>& parsedValues); void SendData(ClientsInfo& info); void InitParser(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams); void FatalError(const TString& message, const std::unique_ptr* filter = nullptr); @@ -215,7 +189,6 @@ class TTopicSession : public TActorBootstrapped { void Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&); void Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&); - void Handle(NFq::TEvPrivate::TEvDataParsed::TPtr&); void Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr&); void Handle(NFq::TEvPrivate::TEvStatus::TPtr&); void Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr&); @@ -233,7 +206,6 @@ class TTopicSession : public TActorBootstrapped { STRICT_STFUNC_EXC(StateFunc, hFunc(NFq::TEvPrivate::TEvPqEventsReady, Handle); hFunc(NFq::TEvPrivate::TEvCreateSession, Handle); - hFunc(NFq::TEvPrivate::TEvDataParsed, Handle); hFunc(NFq::TEvPrivate::TEvDataAfterFilteration, Handle); hFunc(NFq::TEvPrivate::TEvStatus, Handle); hFunc(NFq::TEvPrivate::TEvDataFiltered, Handle); @@ -249,7 +221,6 @@ class TTopicSession : public TActorBootstrapped { cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway); IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady); IgnoreFunc(NFq::TEvPrivate::TEvCreateSession); - IgnoreFunc(NFq::TEvPrivate::TEvDataParsed); IgnoreFunc(NFq::TEvPrivate::TEvDataAfterFilteration); IgnoreFunc(NFq::TEvPrivate::TEvStatus); IgnoreFunc(NFq::TEvPrivate::TEvDataFiltered); @@ -391,23 +362,6 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) { CreateTopicSession(); } -void TTopicSession::Handle(NFq::TEvPrivate::TEvDataParsed::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("TEvDataParsed, offset " << ev->Get()->Offset << ", data:\n" << ev->Get()->DebugString()); - - for (auto& [actorId, info] : Clients) { - try { - if (info.Filter) { - info.Filter->Push(ev->Get()->Offset, ev->Get()->ParsedValues); - } - } catch (const std::exception& e) { - FatalError(e.what(), &info.Filter); - } - } - Parser->ReleaseBuffer(ev->Get()->Buffer); - - Send(SelfId(), new TEvPrivate::TEvDataFiltered(ev->Get()->Offset, ev->Get()->NumberValues)); -} - void TTopicSession::Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvDataAfterFilteration, read actor id " << ev->Get()->ReadActorId.ToString()); auto it = Clients.find(ev->Get()->ReadActorId); @@ -564,21 +518,40 @@ void TTopicSession::SendToParsing(const TVectorGetBuffer(messages.front().GetOffset()); + TJsonParserBuffer& buffer = Parser->GetBuffer(); buffer.Reserve(valuesSize); for (const auto& message : messages) { buffer.AddValue(message.GetData()); } - LOG_ROW_DISPATCHER_TRACE("SendToParsing, buffer with offset " << buffer.GetOffset() << " and size " << buffer.GetNumberValues()); + const ui64 offset = messages.front().GetOffset(); + LOG_ROW_DISPATCHER_TRACE("SendToParsing, buffer with offset " << offset << " and size " << buffer.GetNumberValues()); try { - Parser->Parse(); + const auto& parsedValues = Parser->Parse(); + SendToFiltering(offset, parsedValues); } catch (const std::exception& e) { FatalError(e.what()); } } +void TTopicSession::SendToFiltering(ui64 offset, const TVector>& parsedValues) { + Y_ENSURE(parsedValues, "Expected non empty schema"); + LOG_ROW_DISPATCHER_TRACE("TEvDataParsed, offset " << offset << ", data:\n" << Parser->GetDebugString(parsedValues)); + + for (auto& [actorId, info] : Clients) { + try { + if (info.Filter) { + info.Filter->Push(offset, parsedValues); + } + } catch (const std::exception& e) { + FatalError(e.what(), &info.Filter); + } + } + + Send(SelfId(), new TEvPrivate::TEvDataFiltered(offset, parsedValues.front().size())); +} + void TTopicSession::SendData(ClientsInfo& info) { info.DataArrivedSent = false; if (info.Buffer.empty()) { @@ -705,13 +678,7 @@ void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& source } try { CurrentParserTypes = std::make_pair(GetVector(sourceParams.GetColumns()), GetVector(sourceParams.GetColumnTypes())); - NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); - Parser = NewJsonParser( - GetVector(sourceParams.GetColumns()), - GetVector(sourceParams.GetColumnTypes()), - [actorSystem, selfId = SelfId()](TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer){ - actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(std::move(parsedValues), buffer)); - }); + Parser = NewJsonParser(GetVector(sourceParams.GetColumns()), GetVector(sourceParams.GetColumnTypes())); } catch (const NYql::NPureCalc::TCompileError& e) { FatalError(e.GetIssues()); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index e757706a0eff..54fde6580784 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -36,22 +36,20 @@ class TFixture : public NUnitTest::TBaseFixture { } void MakeParser(TVector columns, TVector types) { - Parser = NFq::NewJsonParser(columns, types, [this](TVector>&& parsedValues, TJsonParserBuffer::TPtr buffer) { - ResultOffset = buffer->GetOffset(); - ParsedValues = std::move(parsedValues); - ResultNumberValues = ParsedValues.empty() ? 0 : ParsedValues.front().size(); - }); + Parser = NFq::NewJsonParser(columns, types); } void MakeParser(TVector columns) { MakeParser(columns, TVector(columns.size(), "String")); } - void PushToParser(ui64 offset, const TString& data) const { - TJsonParserBuffer& buffer = Parser->GetBuffer(offset); + void PushToParser(const TString& data) { + TJsonParserBuffer& buffer = Parser->GetBuffer(); buffer.Reserve(data.size()); buffer.AddValue(data); - Parser->Parse(); + + ParsedValues = Parser->Parse(); + ResultNumberValues = ParsedValues ? ParsedValues.front().size() : 0; } TVector GetParsedRow(size_t id) const { @@ -67,16 +65,14 @@ class TFixture : public NUnitTest::TBaseFixture { NActors::TTestActorRuntime Runtime; std::unique_ptr Parser; - ui64 ResultOffset; - ui64 ResultNumberValues; + ui64 ResultNumberValues = 0; TVector> ParsedValues; }; Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple1, TFixture) { MakeParser({"a1", "a2"}, {"String", "Optional"}); - PushToParser(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + PushToParser(R"({"a1": "hello1", "a2": 101, "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); const auto& result = GetParsedRow(0); @@ -87,8 +83,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple2, TFixture) { MakeParser({"a2", "a1"}); - PushToParser(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + PushToParser(R"({"a1": "hello1", "a2": "101", "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); const auto& result = GetParsedRow(0); @@ -99,8 +94,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple3, TFixture) { MakeParser({"a1", "a2"}); - PushToParser(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + PushToParser(R"({"a2": "hello1", "a1": "101", "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); const auto& result = GetParsedRow(0); @@ -111,8 +105,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple4, TFixture) { MakeParser({"a2", "a1"}); - PushToParser(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, ResultOffset); + PushToParser(R"({"a2": "hello1", "a1": "101", "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); const auto& result = GetParsedRow(0); @@ -124,15 +117,14 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(ManyValues, TFixture) { MakeParser({"a1", "a2"}); - TJsonParserBuffer& buffer = Parser->GetBuffer(42); + TJsonParserBuffer& buffer = Parser->GetBuffer(); buffer.AddValue(R"({"a1": "hello1", "a2": 101, "event": "event1"})"); buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event2"})"); buffer.AddValue(R"({"a2": "101", "a1": "hello1", "event": "event3"})"); - Parser->Parse(); - UNIT_ASSERT_VALUES_EQUAL(42, ResultOffset); + ParsedValues = Parser->Parse(); + ResultNumberValues = ParsedValues.front().size(); UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); - for (size_t i = 0; i < ResultNumberValues; ++i) { const auto& result = GetParsedRow(i); UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i); @@ -143,7 +135,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeParser({"a2", "a1"}); - UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(5, R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); } } From 4c1fcfc09a95e6f3ae924b4e04e56bda80743df0 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 8 Oct 2024 13:57:51 +0000 Subject: [PATCH 4/4] Changed json filter input type --- ydb/core/fq/libs/row_dispatcher/json_filter.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index d51527abc297..b432494d6623 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -53,7 +53,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { TVector Schemas; }; -class TFilterInputConsumer : public NYql::NPureCalc::IConsumer>>> { +class TFilterInputConsumer : public NYql::NPureCalc::IConsumer>&>> { public: TFilterInputConsumer( const TFilterInputSpec& spec, @@ -91,7 +91,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer>> values) override { + void OnObject(std::pair>&> values) override { Y_ENSURE(FieldsPositions.size() == values.second.size()); NKikimr::NMiniKQL::TThrowingBindTerminator bind; @@ -200,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits { static constexpr bool IsPartial = false; static constexpr bool SupportPushStreamMode = true; - using TConsumerType = THolder>>>>; + using TConsumerType = THolder>&>>>; static TConsumerType MakeConsumer( const TFilterInputSpec& spec, @@ -271,7 +271,7 @@ class TJsonFilter::TImpl { private: THolder> Program; - THolder>>>> InputConsumer; + THolder>&>>> InputConsumer; const TString Sql; };