diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 2633b343e18e..92920ca80acc 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -157,6 +157,110 @@ Y_UNIT_TEST_SUITE(KqpIndexMetadata) { } } + void TestNoReadFromMainTableBeforeJoin(bool UseExtractPredicates) { + using namespace NYql; + using namespace NYql::NNodes; + + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetPredicateExtract20(UseExtractPredicates); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + + auto& server = kikimr.GetTestServer(); + auto gateway = GetIcGateway(server); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + const TString createTableSql(R"( + --!syntax_v1 + CREATE TABLE `/Root/tg` ( + id Utf8, b Utf8, am Decimal(22, 9), cur Utf8, pa_id Utf8, system_date Timestamp, status Utf8, type Utf8, product Utf8, + PRIMARY KEY (b, id), + INDEX tg_index GLOBAL SYNC ON (`b`, `pa_id`, `system_date`, `id`) + COVER(status, type, product, am) + );)"); + auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + // core optimizer should inject CoExtractMembert over KqlReadTableIndex with columns set based on ORDER BY + // after that KqlReadTableIndex has all columns present in index and should be rewriten in to index read + // limit must be applied in to this (index read) stage. + // As result we have limited numbers of lookups from main table + + { + const TString query(Q1_(R"( + --!syntax_v1 + + DECLARE $b_1 AS Text; + DECLARE $pa_id_1 AS Text; + DECLARE $b_2 AS Text; + DECLARE $constant_param_1 AS Timestamp; + DECLARE $constant_param_2 AS Text; + DECLARE $type_1 AS List; + DECLARE $status_1 AS Text; + DECLARE $am_1 AS Decimal(22, 9); + + SELECT * + FROM tg + WHERE (`tg`.`b` = $b_1) AND (`tg`.`id` IN ( + SELECT `id` + FROM ( + SELECT * + FROM `/Root/tg` VIEW tg_index AS tg + WHERE (`tg`.`pa_id` = $pa_id_1) + AND (`tg`.`b` = $b_2) + AND ((`tg`.`system_date`, `tg`.`id`) <= ($constant_param_1, $constant_param_2)) + AND (`tg`.`type` NOT IN $type_1) + AND (`tg`.`status` <> $status_1) + AND (`tg`.`am` <> $am_1) + ORDER BY + `tg`.`b` DESC, + `tg`.`pa_id` DESC, + `tg`.`system_date` DESC, + `tg`.`id` DESC + LIMIT 11) + )) + ORDER BY + `tg`.`system_date` DESC, + `tg`.`id` DESC + + )")); + + auto explainResult = session.ExplainDataQuery(query).GetValueSync(); + UNIT_ASSERT_C(explainResult.IsSuccess(), explainResult.GetIssues().ToString()); + + Cerr << explainResult.GetAst() << Endl; + UNIT_ASSERT_C(explainResult.GetAst().Contains("'('\"Reverse\")"), explainResult.GetAst()); + UNIT_ASSERT_C(explainResult.GetAst().Contains("'('\"Sorted\")"), explainResult.GetAst()); + + NJson::TJsonValue plan; + NJson::ReadJsonTree(explainResult.GetPlan(), &plan, true); + UNIT_ASSERT(ValidatePlanNodeIds(plan)); + Cerr << plan << Endl; + auto mainTableAccess = CountPlanNodesByKv(plan, "Table", "tg"); + UNIT_ASSERT_VALUES_EQUAL(mainTableAccess, 1); + + auto indexTableAccess = CountPlanNodesByKv(plan, "Table", "tg/tg_index/indexImplTable"); + UNIT_ASSERT_VALUES_EQUAL(indexTableAccess, 1); + + auto filterOnIndex = CountPlanNodesByKv(plan, "Node Type", "Limit-Filter-TablePointLookup"); + UNIT_ASSERT_VALUES_EQUAL(filterOnIndex, 1); + + auto limitFilterNode = FindPlanNodeByKv(plan, "Node Type", "Limit-Filter-TablePointLookup"); + auto val = FindPlanNodes(limitFilterNode, "Limit"); + UNIT_ASSERT_VALUES_EQUAL(val.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(val[0], "11"); + } + } + + Y_UNIT_TEST_TWIN(TestNoReadFromMainTableBeforeJoin, ExtractPredicate) { + TestNoReadFromMainTableBeforeJoin(ExtractPredicate); + } + Y_UNIT_TEST(HandleWriteOnlyIndex) { using namespace NYql; using namespace NYql::NNodes; diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 325555f6444b..8d8bf4923dc9 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3871,10 +3871,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST_TWIN(ComplexLookupLimit, NewPredicateExtract) { - if (NewPredicateExtract) { - return; - } - TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetPredicateExtract20(NewPredicateExtract); diff --git a/ydb/core/protos/tx.proto b/ydb/core/protos/tx.proto index eaafd0ef31e0..7956786702c4 100644 --- a/ydb/core/protos/tx.proto +++ b/ydb/core/protos/tx.proto @@ -131,6 +131,13 @@ message TEvUpdatedLastStep { optional fixed64 LastStep = 3; } +// Time cast to coordinators +// Notifies coordinator that clients are waiting for some specific steps +message TEvRequirePlanSteps { + optional fixed64 CoordinatorID = 1; + repeated uint64 PlanSteps = 2 [packed = true]; +} + // coordinator to mediator message TCoordinatorTransaction { repeated uint64 AffectedSet = 1; // read and write set - joined and then filtered for concrete mediator diff --git a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp index 73a623252322..52909ae7f9a3 100644 --- a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp +++ b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp @@ -184,6 +184,19 @@ namespace NKikimr::NFlatTxCoordinator { } } + void TTxCoordinator::Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr& ev) { + auto* msg = ev->Get(); + for (ui64 step : msg->Record.GetPlanSteps()) { + // Note: we could schedule an exact volatile step here in the future + step = AlignPlanStep(step); + // Note: this is not a sibling step, but it behaves similar enough + // so we reuse the same queue here. + if (step > VolatileState.LastPlanned && PendingSiblingSteps.insert(step).second) { + SchedulePlanTickExact(step); + } + } + } + void TTxCoordinator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { auto* msg = ev->Get(); if (auto* state = Siblings.FindPtr(msg->TabletId); state && state->Subscribed) { diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp index 495114a37f32..d8118674f144 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.cpp +++ b/ydb/core/tx/coordinator/coordinator_impl.cpp @@ -272,8 +272,12 @@ void TTxCoordinator::SchedulePlanTickAligned(ui64 next) { return; } + SchedulePlanTickExact(AlignPlanStep(next)); +} + +ui64 TTxCoordinator::AlignPlanStep(ui64 step) { const ui64 resolution = Config.Resolution; - SchedulePlanTickExact((next + resolution - 1) / resolution * resolution); + return ((step + resolution - 1) / resolution * resolution); } void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx) { diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h index 5c0dde89b519..247d918bcbbc 100644 --- a/ydb/core/tx/coordinator/coordinator_impl.h +++ b/ydb/core/tx/coordinator/coordinator_impl.h @@ -651,6 +651,7 @@ class TTxCoordinator : public TActor, public TTabletExecutedFlat void SubscribeToSibling(TSiblingState& state); void UnsubscribeFromSiblings(); void Handle(TEvTxProxy::TEvUpdatedLastStep::TPtr &ev); + void Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr &ev); void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev); void Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx); @@ -674,6 +675,7 @@ class TTxCoordinator : public TActor, public TTabletExecutedFlat void SchedulePlanTick(); void SchedulePlanTickExact(ui64 next); void SchedulePlanTickAligned(ui64 next); + ui64 AlignPlanStep(ui64 step); bool RestoreMediatorInfo(TTabletId mediatorId, TVector> &planned, TTransactionContext &txc, /*TKeyBuilder &kb, */THashMap> &pushToAffected) const; void TryInitMonCounters(const TActorContext &ctx); @@ -737,6 +739,7 @@ class TTxCoordinator : public TActor, public TTabletExecutedFlat HFunc(TEvTabletPipe::TEvServerDisconnected, Handle); HFunc(TEvPrivate::TEvRestoredProcessingParams, Handle); hFunc(TEvTxProxy::TEvUpdatedLastStep, Handle); + hFunc(TEvTxProxy::TEvRequirePlanSteps, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); ) diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 4c680758ab13..bcdf22a1d2e2 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2285,9 +2285,17 @@ void TDataShard::CheckMediatorStateRestored() { // writes before the restart, and conversely don't accidentally read any // data that is definitely not replied yet. if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) { + const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step; const TRowVersion edge(GetMaxObservedStep(), Max()); SnapshotManager.PromoteImmediateWriteEdgeReplied( Min(edge, SnapshotManager.GetImmediateWriteEdge())); + // Try to ensure writes become visible sooner rather than later + if (edge.Step < writeStep) { + if (MediatorTimeCastWaitingSteps.insert(writeStep).second) { + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), writeStep)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << writeStep << " from mediator time cast"); + } + } } MediatorStateWaiting = false; diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index cce17c176a3c..8bad05bcb766 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3773,6 +3773,97 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); } + Y_UNIT_TEST(ReadIteratorLocalSnapshotThenWrite) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(100) + .SetAppConfig(app); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + // Perform a snapshot read, this will persist "reads from snapshots" flag + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` + UNION ALL + SELECT key, value + FROM `/Root/table-2` + )")), + ""); + + // Insert rows using a single-shard write + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)")); + + // Wait until mediator goes idle + size_t timecastUpdates = 0; + auto observer = [&](TTestActorRuntimeBase&, TAutoPtr& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvMediatorTimecast::TEvUpdate::EventType: { + ++timecastUpdates; + break; + } + case TEvDataShard::TEvRead::EventType: { + auto* msg = ev->Get(); + msg->Record.SetMaxRowsInResult(1); + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(observer); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + waitFor([&]{ return timecastUpdates >= 3; }, "at least 3 timecast updates"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); + + auto start = runtime.GetCurrentTime(); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33)")); + auto duration = runtime.GetCurrentTime() - start; + UNIT_ASSERT_C(duration <= TDuration::MilliSeconds(200), "UPSERT takes too much time: " << duration); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp index f6e339854622..455777f2030c 100644 --- a/ydb/core/tx/time_cast/time_cast.cpp +++ b/ydb/core/tx/time_cast/time_cast.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,21 @@ void TMediatorTimecastEntry::Update(ui64 step, ui64 *exemption, ui64 exsz) { } class TMediatorTimecastProxy : public TActor { + struct TEvPrivate { + enum EEv { + EvRetryCoordinator = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvEnd, + }; + + struct TEvRetryCoordinator : public TEventLocal { + ui64 Coordinator; + + TEvRetryCoordinator(ui64 coordinator) + : Coordinator(coordinator) + {} + }; + }; + struct TWaiter { TActorId Sender; ui64 TabletId; @@ -59,6 +75,7 @@ class TMediatorTimecastProxy : public TActor { struct TMediator { const ui32 BucketsSz; TArrayHolder Buckets; + std::vector Coordinators; TActorId PipeClient; @@ -68,6 +85,12 @@ class TMediatorTimecastProxy : public TActor { {} }; + struct TMediatorCoordinator { + std::set WaitingSteps; + bool RetryPending = false; + bool Subscribed = false; + }; + struct TTabletInfo { ui64 MediatorTabletId; TMediator* Mediator; @@ -92,6 +115,7 @@ class TMediatorTimecastProxy : public TActor { }; THashMap Mediators; // mediator tablet -> info + THashMap MediatorCoordinators; // coordinator tablet -> info THashMap Tablets; ui64 LastSeqNo = 0; @@ -103,6 +127,11 @@ class TMediatorTimecastProxy : public TActor { if (!pr.second) { Y_VERIFY(pr.first->second.BucketsSz == processing.GetTimeCastBucketsPerMediator()); } + if (pr.first->second.Coordinators.empty()) { + pr.first->second.Coordinators.assign( + processing.GetCoordinators().begin(), + processing.GetCoordinators().end()); + } return pr.first->second; } @@ -165,6 +194,8 @@ class TMediatorTimecastProxy : public TActor { void Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx); // Client requests for readstep subscriptions void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx); @@ -200,6 +231,8 @@ class TMediatorTimecastProxy : public TActor { HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + HFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + HFunc(TEvPrivate::TEvRetryCoordinator, Handle); } } }; @@ -278,6 +311,17 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr & const ui64 currentStep = bucket.Entry->Get(tabletId); if (currentStep < planStep) { bucket.Waiters.emplace(ev->Sender, tabletId, planStep); + for (ui64 coordinatorId : mediator.Coordinators) { + TMediatorCoordinator &coordinator = MediatorCoordinators[coordinatorId]; + if (coordinator.WaitingSteps.insert(planStep).second && !coordinator.RetryPending) { + Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward( + new TEvTxProxy::TEvRequirePlanSteps(coordinatorId, planStep), + coordinatorId, + !coordinator.Subscribed)); + coordinator.Subscribed = true; + } + } return; } @@ -301,6 +345,37 @@ void TMediatorTimecastProxy::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, TryResync(msg->ClientId, msg->TabletId, ctx); } +void TMediatorTimecastProxy::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) { + auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString() + << " HANDLE EvDeliveryProblem " << msg->TabletId); + auto it = MediatorCoordinators.find(msg->TabletId); + if (it != MediatorCoordinators.end()) { + Y_VERIFY_DEBUG(!it->second.RetryPending); + Schedule(TDuration::MilliSeconds(5), new TEvPrivate::TEvRetryCoordinator(msg->TabletId)); + it->second.RetryPending = true; + it->second.Subscribed = false; + } +} + +void TMediatorTimecastProxy::Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx) { + auto *msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString() + << " HANDLE EvRetryCoordinator " << msg->Coordinator); + auto it = MediatorCoordinators.find(msg->Coordinator); + if (it != MediatorCoordinators.end() && it->second.RetryPending) { + it->second.RetryPending = false; + if (!it->second.WaitingSteps.empty()) { + Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward( + new TEvTxProxy::TEvRequirePlanSteps(msg->Coordinator, it->second.WaitingSteps), + msg->Coordinator, + true)); + it->second.Subscribed = true; + } + } +} + void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString() << " HANDLE "<< ev->Get()->ToString()); @@ -314,6 +389,7 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co auto &mediator = it->second; Y_VERIFY(record.GetBucket() < mediator.BucketsSz); auto &bucket = mediator.Buckets[record.GetBucket()]; + const ui64 step = record.GetTimeBarrier(); switch (bucket.Entry.RefCount()) { case 0: break; @@ -322,7 +398,6 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co bucket.Waiters = { }; break; default: { - const ui64 step = record.GetTimeBarrier(); bucket.Entry->Update(step, nullptr, 0); THashSet> processed; // a set of processed tablets while (!bucket.Waiters.empty()) { @@ -338,6 +413,14 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co break; } } + for (ui64 coordinatorId : mediator.Coordinators) { + auto it = MediatorCoordinators.find(coordinatorId); + if (it != MediatorCoordinators.end()) { + while (!it->second.WaitingSteps.empty() && *it->second.WaitingSteps.begin() <= step) { + it->second.WaitingSteps.erase(it->second.WaitingSteps.begin()); + } + } + } } } diff --git a/ydb/core/tx/tx.h b/ydb/core/tx/tx.h index 116c79549d6a..c59b3339e772 100644 --- a/ydb/core/tx/tx.h +++ b/ydb/core/tx/tx.h @@ -18,6 +18,7 @@ struct TEvTxProxy { EvUnsubscribeReadStep, EvSubscribeLastStep, EvUnsubscribeLastStep, + EvRequirePlanSteps, EvProposeTransactionStatus = EvProposeTransaction + 1 * 512, EvAcquireReadStepResult, @@ -162,6 +163,25 @@ struct TEvTxProxy { Record.SetLastStep(lastStep); } }; + + struct TEvRequirePlanSteps + : public TEventPB + { + TEvRequirePlanSteps() = default; + + TEvRequirePlanSteps(ui64 coordinator, ui64 planStep) { + Record.SetCoordinatorID(coordinator); + Record.AddPlanSteps(planStep); + } + + TEvRequirePlanSteps(ui64 coordinator, const std::set& planSteps) { + Record.SetCoordinatorID(coordinator); + Record.MutablePlanSteps()->Reserve(planSteps.size()); + for (ui64 planStep : planSteps) { + Record.AddPlanSteps(planStep); + } + } + }; }; // basic diff --git a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp index 4e27e056944b..fa3e81da7cbd 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp @@ -1508,7 +1508,7 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) { if (!optCtx.IsSingleUsage(node->Head()) && !optCtx.IsPersistentNode(node->Head())) { return node; } -/*TODO: Enable later. Providers is not ready right now. + if (node->Head().IsCallable("Sort")) { YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content(); auto children = node->Head().ChildrenList(); @@ -1516,7 +1516,15 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) { children.emplace(++it, node->TailPtr()); return ctx.NewCallable(node->Pos(), "TopSort", std::move(children)); } -*/ + + if (node->Head().IsCallable("ExtractMembers") && node->Head().Head().IsCallable("Sort") && optCtx.IsSingleUsage(node->Head().Head())) { + YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content() << " over " << node->Head().Head().Content(); + auto children = node->Head().Head().ChildrenList(); + auto it = children.cbegin(); + children.emplace(++it, node->TailPtr()); + return ctx.ChangeChild(node->Head(), 0U, ctx.NewCallable(node->Pos(), "TopSort", std::move(children))); + } + if (node->Head().IsCallable({"Top", "TopSort"})) { YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content(); return ctx.ChangeChild(node->Head(), 1U, ctx.NewCallable(node->Pos(), "Min", {node->TailPtr(), node->Head().ChildPtr(1)})); diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 1b9af4fe1e0f..eb80faa1e825 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -2554,8 +2554,19 @@ TExprNode::TPtr OptimizeReorder(const TExprNode::TPtr& node, TExprContext& ctx) } if (count <= 1) { - YQL_CLOG(DEBUG, Core) << node->Content() << " over 0/1 literals"; - return ctx.RenameNode(*node, "AssumeSorted"); + YQL_CLOG(DEBUG, Core) << node->Content() << " over " << count << " literals."; + if constexpr (IsTop) { + return ctx.Builder(node->Pos()) + .Callable("AssumeSorted") + .Callable(0, "Take") + .Add(0, node->HeadPtr()) + .Add(1, node->ChildPtr(1)) + .Seal() + .Add(1, node->ChildPtr(2)) + .Add(2, node->ChildPtr(3)) + .Seal().Build(); + } else + return ctx.RenameNode(*node, "AssumeSorted"); } } diff --git a/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp b/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp index 1b09e09974a4..f38379da18e4 100644 --- a/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp +++ b/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp @@ -1207,48 +1207,72 @@ TExprNode::TPtr DoRebuildRangeForIndexKeys(const TStructExprType& rowType, const } TVector rests; - TMap> children; + Sort(toRebuild.begin(), toRebuild.end(), + [&](const auto& fs, const auto& sc) { + return fs.IndexRange < sc.IndexRange; + }); + TMap> byBegin; + if (!toRebuild.empty()) { + byBegin[toRebuild[0].IndexRange.Begin] = {}; + } + for (auto& current : toRebuild) { if (current.IndexRange.IsEmpty()) { YQL_ENSURE(current.Node->IsCallable("RangeRest")); rests.emplace_back(std::move(current)); } else { - children[current.IndexRange].push_back(std::move(current)); + if (byBegin.contains(current.IndexRange.Begin)) { + byBegin[current.IndexRange.Begin].push_back(current); + byBegin[current.IndexRange.End] = {}; + } else { + rests.push_back(std::move(current)); + } } } - TVector> childrenChains; - for (auto it = children.begin(); it != children.end(); ++it) { - if (!commonIndexRange) { - commonIndexRange = it->first; - childrenChains.emplace_back(std::move(it->second)); - continue; - } - if (commonIndexRange->Begin == it->first.Begin) { - YQL_ENSURE(it->first.End > commonIndexRange->End); - for (auto& asRest : childrenChains) { - rests.insert(rests.end(), asRest.begin(), asRest.end()); + TMap builtRange; + for (auto it = byBegin.rbegin(); it != byBegin.rend(); ++it) { + size_t end = 0; + size_t indexRangeEnd = 0; + TVector results; + TVector currents; + auto flush = [&] () { + if (!currents) { + return; } - childrenChains.clear(); - childrenChains.push_back(std::move(it->second)); - commonIndexRange = it->first; - continue; - } - if (commonIndexRange->End == it->first.Begin) { - commonIndexRange->End = it->first.End; - childrenChains.push_back(std::move(it->second)); - } else { - rests.insert(rests.end(), it->second.begin(), it->second.end()); + TExprNode::TPtr toAdd = MakeRangeAnd(range->Pos(), std::move(currents), ctx); + if (auto ptr = builtRange.FindPtr(end)) { + toAdd = MakeRangeAnd(range->Pos(), { toAdd, ptr->Node }, ctx); + indexRangeEnd = std::max(indexRangeEnd, ptr->IndexRange.End); + } + results.push_back(toAdd); + + indexRangeEnd = std::max(end, indexRangeEnd); + currents = {}; + }; + for (auto node : it->second) { + if (end != node.IndexRange.End) { + flush(); + end = node.IndexRange.End; + } + currents.push_back(node.Node); + } + if (currents) { + flush(); + } + if (results) { + auto& built = builtRange[it->first]; + built.Node = MakeRangeAnd(range->Pos(), std::move(results), ctx); + built.IndexRange.Begin = it->first; + built.IndexRange.End = indexRangeEnd; } } - for (auto& chain : childrenChains) { - TExprNodeList chainNodes; - for (auto& entry : chain) { - chainNodes.push_back(entry.Node); - } - rebuilt.push_back(MakeRangeAnd(range->Pos(), std::move(chainNodes), ctx)); + if (builtRange) { + auto& first = *builtRange.begin(); + rebuilt.push_back(first.second.Node); + commonIndexRange = first.second.IndexRange; } if (!rests.empty()) { @@ -1264,7 +1288,7 @@ TExprNode::TPtr DoRebuildRangeForIndexKeys(const TStructExprType& rowType, const } } - TExprNode::TPtr result = ctx.ChangeChildren(*range, std::move(rebuilt)); + TExprNode::TPtr result = rebuilt.size() == 1 ? rebuilt[0] : ctx.ChangeChildren(*range, std::move(rebuilt)); if (commonIndexRange) { resultIndexRange = *commonIndexRange; } else { @@ -2018,6 +2042,7 @@ bool TPredicateRangeExtractor::Prepare(const TExprNode::TPtr& filterLambdaNode, } return node; }, ctx, settings); + return status == IGraphTransformer::TStatus::Ok; }