Skip to content

Commit

Permalink
Extracted yson result formatting from DQ service node (#7563)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn authored Aug 10, 2024
1 parent 7cf1acd commit f93ea93
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 90 deletions.
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/actors/result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ class TResultWriter : public NActors::TActorBootstrapped<TResultWriter> {
Finished = true;
NYql::NDqProto::TQueryResponse queryResult(ev->Get()->Record);

*queryResult.MutableYson() = ResultBuilder->BuildYson(std::move(Head));
for (const auto& x : Head) {
queryResult.AddSample()->CopyFrom(x.Proto);
}

Head.clear();
if (!Issues.Empty()) {
IssuesToMessage(Issues, queryResult.MutableIssues());
Expand Down
39 changes: 36 additions & 3 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,21 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
<< ". " << it->second.Index << " response. Issues count: " << result.IssuesSize()
<< ". Rows count: " << result.GetRowsCount());

queryResult.Data = result.yson();
TVector<NDq::TDqSerializedBatch> rows;
for (const auto& s : result.GetSample()) {
NDq::TDqSerializedBatch batch;
batch.Proto = s;
rows.emplace_back(std::move(batch));
}

TProtoBuilder protoBuilder(ResultFormatSettings->ResultType, ResultFormatSettings->Columns);

bool ysonTruncated = false;
queryResult.Data = protoBuilder.BuildYson(std::move(rows), ResultFormatSettings->SizeLimit.GetOrElse(Max<ui64>()),
ResultFormatSettings->RowsLimit.GetOrElse(Max<ui64>()), &ysonTruncated);

queryResult.RowsCount = result.GetRowsCount();
queryResult.Truncated = result.GetTruncated() || ysonTruncated;

TIssues issues;
IssuesFromMessage(result.GetIssues(), issues);
Expand Down Expand Up @@ -1294,8 +1308,6 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
}

queryResult.AddIssues(issues);
queryResult.Truncated = result.GetTruncated();
queryResult.RowsCount = result.GetRowsCount();
it->second.Result.SetValue(queryResult);
EvalInfos.erase(it);
}
Expand Down Expand Up @@ -1515,6 +1527,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
*request.MutableSettings() = dqGraphParams.GetSettings();
*request.MutableSecureParams() = dqGraphParams.GetSecureParams();
*request.MutableColumns() = dqGraphParams.GetColumns();
PrepareResultFormatSettings(dqGraphParams, *dqConfiguration);
NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram());
Send(info.ExecuterId, new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId));
LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId);
Expand Down Expand Up @@ -1552,9 +1565,12 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
CreateResultWriter(
ExecuterId, dqGraphParams.GetResultType(),
writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit));

PrepareResultFormatSettings(dqGraphParams, *dqConfiguration);
} else {
LOG_D("ResultWriter was NOT CREATED since ResultType is empty");
resultId = ExecuterId;
ClearResultFormatSettings();
}

if (enableCheckpointCoordinator) {
Expand Down Expand Up @@ -1604,6 +1620,21 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId);
}

void PrepareResultFormatSettings(NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) {
ResultFormatSettings.ConstructInPlace();
for (const auto& c : dqGraphParams.GetColumns()) {
ResultFormatSettings->Columns.push_back(c);
}

ResultFormatSettings->ResultType = dqGraphParams.GetResultType();
ResultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get();
ResultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get();
}

void ClearResultFormatSettings() {
ResultFormatSettings.Clear();
}

void SetupYqlCore(NYql::TYqlCoreConfig& yqlCore) const {
auto flags = yqlCore.MutableFlags();
*flags = Params.Config.GetGateways().GetYqlCore().GetFlags();
Expand Down Expand Up @@ -2256,6 +2287,8 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
TMap<TString, TString> Statistics;

TMaybe<NCommon::TResultFormatSettings> ResultFormatSettings;

// Consumers creation
NActors::TActorId ReadRulesCreatorId;

Expand Down
22 changes: 21 additions & 1 deletion ydb/library/yql/dq/common/dq_serialized_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,27 @@ void TDqSerializedBatch::SetPayload(TRope&& payload) {
payload.Erase(it, it + it.ContiguousSize());
}
}
}
}

void TDqSerializedBatch::ConvertToNoOOB() {
if (!IsOOB()) {
return;
}

YQL_ENSURE(Proto.GetRaw().empty());
Proto.SetRaw(Payload.ConvertToString());
Payload.clear();
switch ((NDqProto::EDataTransportVersion)Proto.GetTransportVersion()) {
case NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0:
Proto.SetTransportVersion(NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0);
break;
case NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0:
Proto.SetTransportVersion(NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0);
break;
default:
YQL_ENSURE(false, "Unexpected transport version" << Proto.GetTransportVersion());
}
}

TRope SaveForSpilling(TDqSerializedBatch&& batch) {
TRope result;
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/dq/common/dq_serialized_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct TDqSerializedBatch {
Clear();
return result;
}

void ConvertToNoOOB();
};

TRope SaveForSpilling(TDqSerializedBatch&& batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ struct TRuntimeTypeLoader {
}

TMaybe<TType> LoadPgType(const TString& pgType, ui32 /*level*/) {
auto typeId = NYql::NPg::LookupType(pgType).TypeId;
auto typeId = NYql::NPg::HasType(pgType) ? NYql::NPg::LookupType(pgType).TypeId : Max<ui32>();
return Builder.NewPgType(typeId);
}

Expand Down
19 changes: 13 additions & 6 deletions ydb/library/yql/providers/dq/actors/proto_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,19 @@ TString TProtoBuilder::BuildYson(TVector<NYql::NDq::TDqSerializedBatch>&& rows,
writer.OnBeginList();

auto full = WriteData(std::move(rows), [&](const NYql::NUdf::TUnboxedValuePod& value) {
auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder);
writer.OnListItem();
writer.OnRaw(rowYson);
size += rowYson.size();
++count;
return size <= maxBytesLimit && count <= maxRowsLimit;
bool ret = (size <= maxBytesLimit && count <= maxRowsLimit);
if (ret) {
auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder);
size += rowYson.size();
++count;
ret = (size <= maxBytesLimit && count <= maxRowsLimit);
if (ret) {
writer.OnListItem();
writer.OnRaw(rowYson);
}
}

return ret;
});

if (!full) {
Expand Down
48 changes: 22 additions & 26 deletions ydb/library/yql/providers/dq/actors/result_actor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,12 @@ struct TWriteQueue {
, Truncated(false)
, FullResultWriterID()
, ResultBuilder(resultType ? MakeHolder<TProtoBuilder>(resultType, columns) : nullptr)
, ResultYson()
, ResultYsonOut(new THoldingStream<TCountingOutput>(MakeHolder<TStringOutput>(ResultYson)))
, ResultYsonWriter(MakeHolder<NYson::TYsonWriter>(ResultYsonOut.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true))
, ResultSampleDataSize(0)
, ResultSampleData()
, Issues()
, BlockingActors()
, QueryResponse()
, WaitingAckFromFRW(false) {
ResultYsonWriter->OnBeginList();
YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << SizeLimit;
YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << (RowsLimit.Defined() ? ToString(RowsLimit.GetRef()) : "nothing");
}
Expand Down Expand Up @@ -141,19 +139,21 @@ struct TWriteQueue {
bool exceedRows = false;
try {
TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; });
NDq::TDqSerializedBatch dataCopy = WriteQueue.back().Data;
full = ResultBuilder->WriteYsonData(std::move(dataCopy), [this, &exceedRows](const TString& rawYson) {
if (RowsLimit && Rows + 1 > *RowsLimit) {
if (!Truncated) {
NDq::TDqSerializedBatch dataCopy = WriteQueue.back().Data;
dataCopy.ConvertToNoOOB();
Rows += dataCopy.RowCount();
ResultSampleDataSize += dataCopy.Size();

if (RowsLimit && Rows > *RowsLimit) {
exceedRows = true;
return false;
} else if (ResultYsonOut->Counter() + rawYson.size() > SizeLimit) {
return false;
full = false;
} else if (ResultSampleDataSize > SizeLimit) {
full = false;
}
ResultYsonWriter->OnListItem();
ResultYsonWriter->OnRaw(rawYson);
++Rows;
return true;
});

ResultSampleData.emplace_back(std::move(dataCopy.Proto));
}
} catch (...) {
OnError(NYql::NDqProto::StatusIds::UNSUPPORTED, CurrentExceptionMessage());
return;
Expand Down Expand Up @@ -245,7 +245,7 @@ struct TWriteQueue {
private:
void OnQueryResult(TEvQueryResponse::TPtr& ev, const NActors::TActorContext&) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
YQL_ENSURE(!ev->Get()->Record.SampleSize());
YQL_CLOG(DEBUG, ProviderDq) << "Shutting down TResultAggregator";

BlockingActors.clear();
Expand Down Expand Up @@ -361,17 +361,14 @@ struct TWriteQueue {
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
NDqProto::TQueryResponse result = QueryResponse->Record;

YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty());
YQL_ENSURE(!result.SampleSize());
FlushCounters(result);

if (ResultYsonWriter) {
ResultYsonWriter->OnEndList();
ResultYsonWriter.Destroy();
for (const auto& x : ResultSampleData) {
result.AddSample()->CopyFrom(x);
}
ResultYsonOut.Destroy();

*result.MutableYson() = ResultYson;

ResultSampleData.clear();
if (!Issues.Empty()) {
NYql::IssuesToMessage(Issues, result.MutableIssues());
}
Expand Down Expand Up @@ -429,9 +426,8 @@ struct TWriteQueue {
bool Truncated;
NActors::TActorId FullResultWriterID;
THolder<TProtoBuilder> ResultBuilder;
TString ResultYson;
THolder<TCountingOutput> ResultYsonOut;
THolder<NYson::TYsonWriter> ResultYsonWriter;
ui64 ResultSampleDataSize;
TVector<NDqProto::TData> ResultSampleData;
TIssues Issues;
THashSet<NActors::TActorId> BlockingActors;
THolder<TEvQueryResponse> QueryResponse;
Expand Down
39 changes: 0 additions & 39 deletions ydb/library/yql/providers/dq/actors/result_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,45 +204,6 @@ class TResultAggregator: public TResultActorBase<TResultAggregator> {
bool Continue;
};

class TResultPrinter: public TActor<TResultPrinter> {
public:
static constexpr char ActorName[] = "YQL_DQ_RESULT_PRINTER";

TResultPrinter(IOutputStream& output, NThreading::TPromise<void>& promise)
: TActor<TResultPrinter>(&TResultPrinter::Handler)
, Output(output)
, Promise(promise)
{
}

private:
STRICT_STFUNC(Handler, { HFunc(TEvQueryResponse, OnQueryResult); })

void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) {
if (!ev->Get()->Record.HasResultSet()&&ev->Get()->Record.GetYson().empty()) {
NYql::TIssues issues;
NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
Cerr << issues.ToString() << Endl;
} else {
auto ysonString = !ev->Get()->Record.GetYson().empty()
? ev->Get()->Record.GetYson()
: NYdb::FormatResultSetYson(ev->Get()->Record.GetResultSet(), NYson::EYsonFormat::Binary);
auto ysonNode = NYT::NodeFromYsonString(ysonString, NYson::EYsonType::Node);
YQL_ENSURE(ysonNode.GetType() == NYT::TNode::EType::List);
for (const auto& row : ysonNode.AsList()) {
Output << NYT::NodeToYsonString(row) << "\n";
}
}

Promise.SetValue();
PassAway();
}

private:
IOutputStream& Output;
NThreading::TPromise<void>& Promise;
};

} // unnamed

THolder<NActors::IActor> MakeResultAggregator(
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/actors/task_controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ class TTaskControllerImpl: public NActors::TActor<TDerived> {

public:
void OnQueryResult(TEvQueryResponse::TPtr& ev) {
YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
YQL_ENSURE(!ev->Get()->Record.SampleSize());
FinalStat().FlushCounters(ev->Get()->Record);
if (!Issues.Empty()) {
IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues());
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/providers/dq/api/protos/dqs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,14 @@ message TPullResponse {

message TQueryResponse {
reserved 4, 6;
Ydb.ResultSet ResultSet = 1;
reserved 1;
repeated Ydb.Issue.IssueMessage Issues = 2;
bytes Yson = 3;
reserved 3;
repeated TMetric Metric = 5;
bool Truncated = 7;
uint64 RowsCount = 8;
NYql.NDqProto.StatusIds.StatusCode StatusCode = 9;
repeated NDqProto.TData Sample = 10;
}

message TDqFailure {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/dq/api/protos/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "ydb/public/api/protos/ydb_operation.proto";
import "ydb/public/api/protos/ydb_value.proto";
import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
import "ydb/library/yql/dq/proto/dq_transport.proto";

package Yql.DqsProto;

Expand All @@ -25,6 +26,7 @@ message ResponseMetric {
message ExecuteQueryResult {
Ydb.ResultSet result = 1;
bytes yson = 2;
repeated NYql.NDqProto.TData sample = 3;
}

message TFile {
Expand Down
7 changes: 7 additions & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
namespace NYql {
namespace NCommon {

struct TResultFormatSettings {
TString ResultType;
TVector<TString> Columns;
TMaybe<ui64> SizeLimit;
TMaybe<ui64> RowsLimit;
};

TMaybe<TString> SqlToSExpr(const TString& query);

TString GetSerializedTypeAnnotation(const NYql::TTypeAnnotationNode* typeAnn);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/provider/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ PEERDIR(
ydb/library/yql/providers/dq/expr_nodes
ydb/library/yql/providers/dq/opt
ydb/library/yql/providers/dq/planner
ydb/library/yql/providers/dq/actors
ydb/library/yql/providers/result/expr_nodes
ydb/library/yql/minikql
)
Expand Down
Loading

0 comments on commit f93ea93

Please sign in to comment.