Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate stable-24-3-9-cs-2 into stable-24-3 #9675

Open
wants to merge 6 commits into
base: stable-24-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
13 changes: 7 additions & 6 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssueInteractiveTx+withSink
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssue+withSink
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
ydb/core/kqp/ut/tx KqpSinkTx.InvalidateOnError
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_GenericQuerys
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_StreamGenericQuery
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_UsesGenericQueryOnJoinWithDataShardTable
ydb/core/kqp/ut/scheme KqpOlapScheme.DropTable
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
ydb/core/kqp/ut/scheme [14/50]*
Expand All @@ -31,6 +29,7 @@ ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [38/50]*
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpUpdate
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
ydb/core/persqueue/ut [37/40] chunk chunk
ydb/core/persqueue/ut [38/40] chunk chunk
ydb/core/persqueue/ut TPQTest.*DirectRead*
Expand Down Expand Up @@ -106,4 +105,6 @@ ydb/tests/functional/tenants test_storage_config.py.TestStorageConfig.*
ydb/tests/functional/tenants test_tenants.py.*
ydb/tests/functional/ydb_cli test_ydb_impex.py.TestImpex.test_big_dataset*
ydb/tests/tools/pq_read/test test_timeout.py.TestTimeout.test_timeout
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/tests/functional/rename [test_rename.py */10] chunk chunk
2 changes: 2 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ struct TKikimrEvents : TEvents {
ES_TX_BACKGROUND = 4256,
ES_SS_BG_TASKS = 4257,
ES_LIMITER = 4258,
//ES_MEMORY = 4259, NB. exists in main
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
};
};

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@
#include <ydb/core/tx/limiter/usage/config.h>
#include <ydb/core/tx/limiter/usage/service.h>

#include <ydb/core/tx/limiter/grouped_memory/usage/config.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>

#include <ydb/core/backup/controller/tablet.h>

#include <ydb/services/ext_index/common/config.h>
Expand Down Expand Up @@ -2180,6 +2183,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}

TGroupedMemoryLimiterInitializer::TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}

void TGroupedMemoryLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
NOlap::NGroupedMemoryManager::TConfig serviceConfig;
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetGroupedMemoryLimiterConfig()));

if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_GROUPED_MEMORY_LIMITER");

auto service = NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::CreateService(serviceConfig, countersGroup);

setup->LocalServices.push_back(std::make_pair(
NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}

TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,12 @@ class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
public:
TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompConveyorInitializer: public IKikimrServicesInitializer {
public:
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
}

if (serviceMask.EnableGroupedMemoryLimiter) {
sil->AddServiceInitializer(new TGroupedMemoryLimiterInitializer(runConfig));
}

if (serviceMask.EnableScanConveyor) {
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/service_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ union TBasicKikimrServicesMask {
bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
bool EnableCompDiskLimiter:1;
bool EnableGroupedMemoryLimiter:1;
};

struct {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ PEERDIR(
ydb/core/tx/coordinator
ydb/core/tx/conveyor/service
ydb/core/tx/limiter/service
ydb/core/tx/limiter/grouped_memory/usage
ydb/core/tx/datashard
ydb/core/tx/long_tx_service
ydb/core/tx/long_tx_service/public
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/constructor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#include "constructor.h"
#include <ydb/core/formats/arrow/accessor/plain/constructor.h>

namespace NKikimr::NArrow::NAccessor {

TConstructorContainer TConstructorContainer::GetDefaultConstructor() {
static std::shared_ptr<IConstructor> result = std::make_shared<NPlain::TConstructor>();
return result;
}

}
75 changes: 75 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/constructor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once

#include <ydb/library/formats/arrow/protos/accessor.pb.h>
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
#include <ydb/library/formats/arrow/accessor/common/chunk_data.h>
#include <ydb/services/bg_tasks/abstract/interface.h>

#include <library/cpp/object_factory/object_factory.h>

namespace NKikimr::NArrow::NAccessor {

class IConstructor {
public:
using TFactory = NObjectFactory::TObjectFactory<IConstructor, TString>;
using TProto = NKikimrArrowAccessorProto::TConstructor;

private:
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const = 0;
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstructDefault(
const TChunkConstructionData& externalInfo) const = 0;
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const = 0;
virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) = 0;
virtual std::shared_ptr<arrow::Schema> DoGetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const = 0;
virtual TString DoDebugString() const {
return "";
}

public:
virtual ~IConstructor() = default;

TString DebugString() const {
return TStringBuilder() << GetClassName() << ":" << DoDebugString();
}

TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> Construct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const {
return DoConstruct(originalData, externalInfo);
}

TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> ConstructDefault(const TChunkConstructionData& externalInfo) const {
return DoConstructDefault(externalInfo);
}

bool DeserializeFromProto(const NKikimrArrowAccessorProto::TConstructor& proto) {
return DoDeserializeFromProto(proto);
}

NKikimrArrowAccessorProto::TConstructor SerializeToProto() const {
return DoSerializeToProto();
}

void SerializeToProto(NKikimrArrowAccessorProto::TConstructor& proto) const {
proto = DoSerializeToProto();
}

std::shared_ptr<arrow::Schema> GetExpectedSchema(const std::shared_ptr<arrow::Field>& resultColumn) const {
AFL_VERIFY(resultColumn);
return DoGetExpectedSchema(resultColumn);
}

virtual TString GetClassName() const = 0;
};

class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>;

public:
using TBase::TBase;

static TConstructorContainer GetDefaultConstructor();
};

} // namespace NKikimr::NArrow::NAccessor
16 changes: 16 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/request.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "request.h"

namespace NKikimr::NArrow::NAccessor {

TConclusionStatus TRequestedConstructorContainer::DeserializeFromRequest(NYql::TFeaturesExtractor& features) {
const std::optional<TString> className = features.Extract("DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME");
if (!className) {
return TConclusionStatus::Success();
}
if (!TBase::Initialize(*className)) {
return TConclusionStatus::Fail("don't know anything about class_name=" + *className);
}
return TBase::GetObjectPtr()->DeserializeFromRequest(features);
}

}
57 changes: 57 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once
#include "constructor.h"

#include <ydb/library/formats/arrow/protos/accessor.pb.h>
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/services/metadata/abstract/request_features.h>

#include <library/cpp/object_factory/object_factory.h>

namespace NKikimr::NArrow::NAccessor {

class IRequestedConstructor {
public:
using TFactory = NObjectFactory::TObjectFactory<IRequestedConstructor, TString>;
using TProto = NKikimrArrowAccessorProto::TRequestedConstructor;
private:
virtual TConclusion<NArrow::NAccessor::TConstructorContainer> DoBuildConstructor() const = 0;
virtual NKikimrArrowAccessorProto::TRequestedConstructor DoSerializeToProto() const = 0;
virtual bool DoDeserializeFromProto(const NKikimrArrowAccessorProto::TRequestedConstructor& proto) = 0;
virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) = 0;

public:
virtual ~IRequestedConstructor() = default;

NKikimrArrowAccessorProto::TRequestedConstructor SerializeToProto() const {
return DoSerializeToProto();
}

void SerializeToProto(NKikimrArrowAccessorProto::TRequestedConstructor& proto) const {
proto = DoSerializeToProto();
}

bool DeserializeFromProto(const NKikimrArrowAccessorProto::TRequestedConstructor& proto) {
return DoDeserializeFromProto(proto);
}

TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features) {
return DoDeserializeFromRequest(features);
}

TConclusion<TConstructorContainer> BuildConstructor() const {
return DoBuildConstructor();
}

virtual TString GetClassName() const = 0;
};

class TRequestedConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IRequestedConstructor> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IRequestedConstructor>;

public:
using TBase::TBase;
TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features);
};

} // namespace NKikimr::NArrow::NAccessor
17 changes: 17 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
LIBRARY()

PEERDIR(
contrib/libs/apache/arrow
ydb/library/conclusion
ydb/services/metadata/abstract
ydb/library/formats/arrow/accessor/abstract
ydb/library/formats/arrow/accessor/common
ydb/library/formats/arrow/protos
)

SRCS(
constructor.cpp
request.cpp
)

END()
46 changes: 46 additions & 0 deletions ydb/core/formats/arrow/accessor/composite_serial/accessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "accessor.h"

namespace NKikimr::NArrow::NAccessor {

namespace {
class TSerializedChunkAccessor {
private:
const std::vector<TDeserializeChunkedArray::TChunk>& Chunks;
const std::shared_ptr<TColumnLoader>& Loader;
std::optional<IChunkedArray::TLocalChunkedArrayAddress>& Result;

public:
TSerializedChunkAccessor(const std::vector<TDeserializeChunkedArray::TChunk>& chunks, const std::shared_ptr<TColumnLoader>& loader,
std::optional<IChunkedArray::TLocalChunkedArrayAddress>& result)
: Chunks(chunks)
, Loader(loader)
, Result(result) {
}
ui64 GetChunksCount() const {
return Chunks.size();
}
ui64 GetChunkLength(const ui32 idx) const {
return Chunks[idx].GetRecordsCount();
}
void OnArray(const ui32 chunkIdx, const ui32 startPosition) const {
Result = IChunkedArray::TLocalChunkedArrayAddress(Chunks[chunkIdx].GetArrayVerified(Loader), startPosition, chunkIdx);
}
};
} // namespace

IChunkedArray::TLocalDataAddress TDeserializeChunkedArray::DoGetLocalData(
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const {
AFL_VERIFY(false);
return IChunkedArray::TLocalDataAddress(nullptr, 0, 0);
}

IChunkedArray::TLocalChunkedArrayAddress TDeserializeChunkedArray::DoGetLocalChunkedArray(
const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const {
std::optional<IChunkedArray::TLocalChunkedArrayAddress> result;
TSerializedChunkAccessor accessor(Chunks, Loader, result);
SelectChunk(chunkCurrent, position, accessor);
AFL_VERIFY(result);
return *result;
}

} // namespace NKikimr::NArrow::NAccessor
Loading
Loading