diff --git a/ydb/core/fq/libs/actors/result_writer.cpp b/ydb/core/fq/libs/actors/result_writer.cpp index 51de0958142e..eabc110303c0 100644 --- a/ydb/core/fq/libs/actors/result_writer.cpp +++ b/ydb/core/fq/libs/actors/result_writer.cpp @@ -111,7 +111,10 @@ class TResultWriter : public NActors::TActorBootstrapped { Finished = true; NYql::NDqProto::TQueryResponse queryResult(ev->Get()->Record); - *queryResult.MutableYson() = ResultBuilder->BuildYson(std::move(Head)); + for (const auto& x : Head) { + queryResult.AddSample()->CopyFrom(x.Proto); + } + Head.clear(); if (!Issues.Empty()) { IssuesToMessage(Issues, queryResult.MutableIssues()); diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index f52beadd6735..82b984a27ead 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -1264,7 +1264,21 @@ class TRunActor : public NActors::TActorBootstrapped { << ". " << it->second.Index << " response. Issues count: " << result.IssuesSize() << ". Rows count: " << result.GetRowsCount()); - queryResult.Data = result.yson(); + TVector rows; + for (const auto& s : result.GetSample()) { + NDq::TDqSerializedBatch batch; + batch.Proto = s; + rows.emplace_back(std::move(batch)); + } + + TProtoBuilder protoBuilder(ResultFormatSettings->ResultType, ResultFormatSettings->Columns); + + bool ysonTruncated = false; + queryResult.Data = protoBuilder.BuildYson(std::move(rows), ResultFormatSettings->SizeLimit.GetOrElse(Max()), + ResultFormatSettings->RowsLimit.GetOrElse(Max()), &ysonTruncated); + + queryResult.RowsCount = result.GetRowsCount(); + queryResult.Truncated = result.GetTruncated() || ysonTruncated; TIssues issues; IssuesFromMessage(result.GetIssues(), issues); @@ -1294,8 +1308,6 @@ class TRunActor : public NActors::TActorBootstrapped { } queryResult.AddIssues(issues); - queryResult.Truncated = result.GetTruncated(); - queryResult.RowsCount = result.GetRowsCount(); it->second.Result.SetValue(queryResult); EvalInfos.erase(it); } @@ -1515,6 +1527,7 @@ class TRunActor : public NActors::TActorBootstrapped { *request.MutableSettings() = dqGraphParams.GetSettings(); *request.MutableSecureParams() = dqGraphParams.GetSecureParams(); *request.MutableColumns() = dqGraphParams.GetColumns(); + PrepareResultFormatSettings(dqGraphParams, *dqConfiguration); NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram()); Send(info.ExecuterId, new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId)); LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId); @@ -1552,9 +1565,12 @@ class TRunActor : public NActors::TActorBootstrapped { CreateResultWriter( ExecuterId, dqGraphParams.GetResultType(), writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit)); + + PrepareResultFormatSettings(dqGraphParams, *dqConfiguration); } else { LOG_D("ResultWriter was NOT CREATED since ResultType is empty"); resultId = ExecuterId; + ClearResultFormatSettings(); } if (enableCheckpointCoordinator) { @@ -1604,6 +1620,21 @@ class TRunActor : public NActors::TActorBootstrapped { LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId); } + void PrepareResultFormatSettings(NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) { + ResultFormatSettings.ConstructInPlace(); + for (const auto& c : dqGraphParams.GetColumns()) { + ResultFormatSettings->Columns.push_back(c); + } + + ResultFormatSettings->ResultType = dqGraphParams.GetResultType(); + ResultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get(); + ResultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get(); + } + + void ClearResultFormatSettings() { + ResultFormatSettings.Clear(); + } + void SetupYqlCore(NYql::TYqlCoreConfig& yqlCore) const { auto flags = yqlCore.MutableFlags(); *flags = Params.Config.GetGateways().GetYqlCore().GetFlags(); @@ -2256,6 +2287,8 @@ class TRunActor : public NActors::TActorBootstrapped { NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; TMap Statistics; + TMaybe ResultFormatSettings; + // Consumers creation NActors::TActorId ReadRulesCreatorId; diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp index d96a7fee6d91..c54313b29e94 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp +++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp @@ -39,7 +39,27 @@ void TDqSerializedBatch::SetPayload(TRope&& payload) { payload.Erase(it, it + it.ContiguousSize()); } } - } +} + +void TDqSerializedBatch::ConvertToNoOOB() { + if (!IsOOB()) { + return; + } + + YQL_ENSURE(Proto.GetRaw().empty()); + Proto.SetRaw(Payload.ConvertToString()); + Payload.clear(); + switch ((NDqProto::EDataTransportVersion)Proto.GetTransportVersion()) { + case NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0: + Proto.SetTransportVersion(NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0); + break; + case NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0: + Proto.SetTransportVersion(NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0); + break; + default: + YQL_ENSURE(false, "Unexpected transport version" << Proto.GetTransportVersion()); + } +} TRope SaveForSpilling(TDqSerializedBatch&& batch) { TRope result; diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h index 27c3a814728b..948af18024d4 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.h +++ b/ydb/library/yql/dq/common/dq_serialized_batch.h @@ -50,6 +50,8 @@ struct TDqSerializedBatch { Clear(); return result; } + + void ConvertToNoOOB(); }; TRope SaveForSpilling(TDqSerializedBatch&& batch); diff --git a/ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.cpp b/ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.cpp index e033b27df33b..99ea5b81b575 100644 --- a/ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.cpp +++ b/ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.cpp @@ -221,7 +221,7 @@ struct TRuntimeTypeLoader { } TMaybe LoadPgType(const TString& pgType, ui32 /*level*/) { - auto typeId = NYql::NPg::LookupType(pgType).TypeId; + auto typeId = NYql::NPg::HasType(pgType) ? NYql::NPg::LookupType(pgType).TypeId : Max(); return Builder.NewPgType(typeId); } diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp index b4a9dc6cbc4b..cc42d445a327 100644 --- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp +++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp @@ -74,12 +74,19 @@ TString TProtoBuilder::BuildYson(TVector&& rows, writer.OnBeginList(); auto full = WriteData(std::move(rows), [&](const NYql::NUdf::TUnboxedValuePod& value) { - auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder); - writer.OnListItem(); - writer.OnRaw(rowYson); - size += rowYson.size(); - ++count; - return size <= maxBytesLimit && count <= maxRowsLimit; + bool ret = (size <= maxBytesLimit && count <= maxRowsLimit); + if (ret) { + auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder); + size += rowYson.size(); + ++count; + ret = (size <= maxBytesLimit && count <= maxRowsLimit); + if (ret) { + writer.OnListItem(); + writer.OnRaw(rowYson); + } + } + + return ret; }); if (!full) { diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index 7dd10c676384..339f67b25781 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -106,14 +106,12 @@ struct TWriteQueue { , Truncated(false) , FullResultWriterID() , ResultBuilder(resultType ? MakeHolder(resultType, columns) : nullptr) - , ResultYson() - , ResultYsonOut(new THoldingStream(MakeHolder(ResultYson))) - , ResultYsonWriter(MakeHolder(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true)) + , ResultSampleDataSize(0) + , ResultSampleData() , Issues() , BlockingActors() , QueryResponse() , WaitingAckFromFRW(false) { - ResultYsonWriter->OnBeginList(); YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << SizeLimit; YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing"); } @@ -141,19 +139,21 @@ struct TWriteQueue { bool exceedRows = false; try { TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; }); - NDq::TDqSerializedBatch dataCopy = WriteQueue.back().Data; - full = ResultBuilder->WriteYsonData(std::move(dataCopy), [this, &exceedRows](const TString& rawYson) { - if (RowsLimit && Rows + 1 > *RowsLimit) { + if (!Truncated) { + NDq::TDqSerializedBatch dataCopy = WriteQueue.back().Data; + dataCopy.ConvertToNoOOB(); + Rows += dataCopy.RowCount(); + ResultSampleDataSize += dataCopy.Size(); + + if (RowsLimit && Rows > *RowsLimit) { exceedRows = true; - return false; - } else if (ResultYsonOut->Counter() + rawYson.size() > SizeLimit) { - return false; + full = false; + } else if (ResultSampleDataSize > SizeLimit) { + full = false; } - ResultYsonWriter->OnListItem(); - ResultYsonWriter->OnRaw(rawYson); - ++Rows; - return true; - }); + + ResultSampleData.emplace_back(std::move(dataCopy.Proto)); + } } catch (...) { OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, CurrentExceptionMessage()); return; @@ -245,7 +245,7 @@ struct TWriteQueue { private: void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); - YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); + YQL_ENSURE(!ev->Get()->Record.SampleSize()); YQL_CLOG(DEBUG, ProviderDq) << "Shutting down TResultAggregator"; BlockingActors.clear(); @@ -361,17 +361,14 @@ struct TWriteQueue { YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; NDqProto::TQueryResponse result = QueryResponse->Record; - YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); + YQL_ENSURE(!result.SampleSize()); FlushCounters(result); - if (ResultYsonWriter) { - ResultYsonWriter->OnEndList(); - ResultYsonWriter.Destroy(); + for (const auto& x : ResultSampleData) { + result.AddSample()->CopyFrom(x); } - ResultYsonOut.Destroy(); - - *result.MutableYson() = ResultYson; + ResultSampleData.clear(); if (!Issues.Empty()) { NYql::IssuesToMessage(Issues, result.MutableIssues()); } @@ -429,9 +426,8 @@ struct TWriteQueue { bool Truncated; NActors::TActorId FullResultWriterID; THolder ResultBuilder; - TString ResultYson; - THolder ResultYsonOut; - THolder ResultYsonWriter; + ui64 ResultSampleDataSize; + TVector ResultSampleData; TIssues Issues; THashSet BlockingActors; THolder QueryResponse; diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index c31d6237011c..1a7e321e9e86 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -204,45 +204,6 @@ class TResultAggregator: public TResultActorBase { bool Continue; }; -class TResultPrinter: public TActor { -public: - static constexpr char ActorName[] = "YQL_DQ_RESULT_PRINTER"; - - TResultPrinter(IOutputStream& output, NThreading::TPromise& promise) - : TActor(&TResultPrinter::Handler) - , Output(output) - , Promise(promise) - { - } - -private: - STRICT_STFUNC(Handler, { HFunc(TEvQueryResponse, OnQueryResult); }) - - void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) { - if (!ev->Get()->Record.HasResultSet()&&ev->Get()->Record.GetYson().empty()) { - NYql::TIssues issues; - NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); - Cerr << issues.ToString() << Endl; - } else { - auto ysonString = !ev->Get()->Record.GetYson().empty() - ? ev->Get()->Record.GetYson() - : NYdb::FormatResultSetYson(ev->Get()->Record.GetResultSet(), NYson::EYsonFormat::Binary); - auto ysonNode = NYT::NodeFromYsonString(ysonString, NYson::EYsonType::Node); - YQL_ENSURE(ysonNode.GetType() == NYT::TNode::EType::List); - for (const auto& row : ysonNode.AsList()) { - Output << NYT::NodeToYsonString(row) << "\n"; - } - } - - Promise.SetValue(); - PassAway(); - } - -private: - IOutputStream& Output; - NThreading::TPromise& Promise; -}; - } // unnamed THolder MakeResultAggregator( diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index 38c9412a0d22..4bb2b8d8b1fe 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -600,7 +600,7 @@ class TTaskControllerImpl: public NActors::TActor { public: void OnQueryResult(TEvQueryResponse::TPtr& ev) { - YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); + YQL_ENSURE(!ev->Get()->Record.SampleSize()); FinalStat().FlushCounters(ev->Get()->Record); if (!Issues.Empty()) { IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues()); diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 37a7449d305b..e04d943aae8c 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -175,13 +175,14 @@ message TPullResponse { message TQueryResponse { reserved 4, 6; - Ydb.ResultSet ResultSet = 1; + reserved 1; repeated Ydb.Issue.IssueMessage Issues = 2; - bytes Yson = 3; + reserved 3; repeated TMetric Metric = 5; bool Truncated = 7; uint64 RowsCount = 8; NYql.NDqProto.StatusIds.StatusCode StatusCode = 9; + repeated NDqProto.TData Sample = 10; } message TDqFailure { diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index d70d7b2c6f58..1ed427ddb116 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -5,6 +5,7 @@ import "ydb/public/api/protos/ydb_operation.proto"; import "ydb/public/api/protos/ydb_value.proto"; import "ydb/library/yql/dq/actors/protos/dq_stats.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; +import "ydb/library/yql/dq/proto/dq_transport.proto"; package Yql.DqsProto; @@ -25,6 +26,7 @@ message ResponseMetric { message ExecuteQueryResult { Ydb.ResultSet result = 1; bytes yson = 2; + repeated NYql.NDqProto.TData sample = 3; } message TFile { diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.h b/ydb/library/yql/providers/dq/common/yql_dq_common.h index 8a0f582a19f1..b7ca4a955b1d 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.h @@ -10,6 +10,13 @@ namespace NYql { namespace NCommon { +struct TResultFormatSettings { + TString ResultType; + TVector Columns; + TMaybe SizeLimit; + TMaybe RowsLimit; +}; + TMaybe SqlToSExpr(const TString& query); TString GetSerializedTypeAnnotation(const NYql::TTypeAnnotationNode* typeAnn); diff --git a/ydb/library/yql/providers/dq/provider/ya.make b/ydb/library/yql/providers/dq/provider/ya.make index 0d9ec4dae573..c3580fa1eaa9 100644 --- a/ydb/library/yql/providers/dq/provider/ya.make +++ b/ydb/library/yql/providers/dq/provider/ya.make @@ -57,6 +57,7 @@ PEERDIR( ydb/library/yql/providers/dq/expr_nodes ydb/library/yql/providers/dq/opt ydb/library/yql/providers/dq/planner + ydb/library/yql/providers/dq/actors ydb/library/yql/providers/result/expr_nodes ydb/library/yql/minikql ) diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 2fb56799ab8f..21c09574ba97 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -167,7 +168,7 @@ class TDqGatewaySession: public std::enable_shared_from_this } template - void OnResponse(TPromise promise, NYdbGrpc::TGrpcStatus&& status, RespType&& resp, const THashMap& modulesMapping, bool alwaysFallback = false) { + void OnResponse(TPromise promise, NYdbGrpc::TGrpcStatus&& status, RespType&& resp, const NCommon::TResultFormatSettings& resultFormatSettings, const THashMap& modulesMapping, bool alwaysFallback = false) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(SessionId); YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback"; @@ -228,9 +229,20 @@ class TDqGatewaySession: public std::enable_shared_from_this if (!error) { Yql::DqsProto::ExecuteQueryResult queryResult; resp.operation().result().UnpackTo(&queryResult); - result.Data = queryResult.yson().empty() - ? NYdb::FormatResultSetYson(queryResult.result(), NYson::EYsonFormat::Binary) - : queryResult.yson(); + TVector rows; + for (const auto& s : queryResult.Getsample()) { + NDq::TDqSerializedBatch batch; + batch.Proto = s; + rows.emplace_back(std::move(batch)); + } + + NYql::NDqs::TProtoBuilder protoBuilder(resultFormatSettings.ResultType, resultFormatSettings.Columns); + + bool ysonTruncated = false; + result.Data = protoBuilder.BuildYson(std::move(rows), resultFormatSettings.SizeLimit.GetOrElse(Max()), + resultFormatSettings.RowsLimit.GetOrElse(Max()), &ysonTruncated); + + result.Truncated = result.Truncated || ysonTruncated; result.AddIssues(issues); result.SetSuccess(); } else { @@ -277,6 +289,7 @@ class TDqGatewaySession: public std::enable_shared_from_this TStub stub, int retry, const TDqSettings::TPtr& settings, + const NCommon::TResultFormatSettings& resultFormatSettings, const THashMap& modulesMapping, const TDqProgressWriter& progressWriter ) { @@ -285,7 +298,8 @@ class TDqGatewaySession: public std::enable_shared_from_this const auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default); const auto alwaysFallback = EFallbackPolicy::Always == fallbackPolicy; auto self = weak_from_this(); - auto callback = [self, promise, sessionId = SessionId, alwaysFallback, modulesMapping](NYdbGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { + auto callback = [self, promise, sessionId = SessionId, alwaysFallback, resultFormatSettings, modulesMapping]( + NYdbGrpc::TGrpcStatus&& status, TResponse&& resp) mutable { auto this_ = self.lock(); if (!this_) { YQL_CLOG(DEBUG, ProviderDq) << "Session was closed: " << sessionId; @@ -293,7 +307,8 @@ class TDqGatewaySession: public std::enable_shared_from_this return; } - this_->OnResponse(std::move(promise), std::move(status), std::move(resp), modulesMapping, alwaysFallback); + this_->OnResponse(std::move(promise), std::move(status), std::move(resp), resultFormatSettings, + modulesMapping, alwaysFallback); }; Service.DoRequest(queryPB, callback, stub); @@ -323,7 +338,8 @@ class TDqGatewaySession: public std::enable_shared_from_this } catch (...) { return MakeErrorFuture(std::current_exception()); } - return this_->WithRetry(queryPB, stub, retry - 1, settings, modulesMapping, progressWriter); + return this_->WithRetry(queryPB, stub, retry - 1, settings, resultFormatSettings, + modulesMapping, progressWriter); }); }); } @@ -358,6 +374,12 @@ class TDqGatewaySession: public std::enable_shared_from_this } settings->Save(queryPB); + NCommon::TResultFormatSettings resultFormatSettings; + resultFormatSettings.Columns = columns; + resultFormatSettings.ResultType = plan.ResultType; + resultFormatSettings.SizeLimit = settings->_AllResultsBytesLimit.Get(); + resultFormatSettings.RowsLimit = settings->_RowsLimitPerWrite.Get(); + YQL_CLOG(TRACE, ProviderDq) << TPlanPrinter().Print(plan); { @@ -382,7 +404,8 @@ class TDqGatewaySession: public std::enable_shared_from_this YQL_CLOG(DEBUG, ProviderDq) << "Send query of size " << queryPB.ByteSizeLong(); auto self = weak_from_this(); - return OpenSessionFuture.Apply([self, sessionId = SessionId, queryPB, retry, settings, modulesMapping, progressWriter](const TFuture& f) { + return OpenSessionFuture.Apply([self, sessionId = SessionId, queryPB, retry, settings, resultFormatSettings, modulesMapping, + progressWriter](const TFuture& f) { f.TryRethrow(); auto this_ = self.lock(); if (!this_) { @@ -395,6 +418,7 @@ class TDqGatewaySession: public std::enable_shared_from_this &Yql::DqsProto::DqService::Stub::AsyncExecuteGraph, retry, settings, + resultFormatSettings, modulesMapping, progressWriter); }); diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 0d8b487446d1..191c130937b4 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -134,8 +134,7 @@ namespace NYql::NDqs { { auto& result = ev->Get()->Record; Yql::DqsProto::ExecuteQueryResult queryResult; - queryResult.Mutableresult()->CopyFrom(result.resultset()); - queryResult.set_yson(result.yson()); + queryResult.Mutablesample()->CopyFrom(result.sample()); auto statusCode = result.GetStatusCode(); // this code guarantees that query will be considered failed unless the status is SUCCESS