Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ kqprun pass allow local files into runtime listing #7844

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
bool enableInfer,
bool allowLocalFiles) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
ToString(NYql::EDatabaseType::ObjectStorage),
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles)
},
{
ToString(NYql::EDatabaseType::ClickHouse),
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
NActors::TActorSystem* actorSystem = nullptr,
size_t pathsLimit = 50000,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
bool enableInfer = false);
bool enableInfer = false,
bool allowLocalFiles = false);

}
12 changes: 8 additions & 4 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer)
bool enableInfer,
bool allowLocalFiles)
: HostnamePatterns(hostnamePatterns)
, PathsLimit(pathsLimit)
, ActorSystem(actorSystem)
, CredentialsFactory(std::move(credentialsFactory))
, EnableInfer(enableInfer)
, AllowLocalFiles(allowLocalFiles)
{}

virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
Expand Down Expand Up @@ -320,7 +322,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
.Url = meta->DataSourceLocation,
.Credentials = credentials,
.Pattern = effectiveFilePattern,
}, Nothing(), false);
}, Nothing(), AllowLocalFiles);
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
Expand Down Expand Up @@ -613,6 +615,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
NActors::TActorSystem* ActorSystem = nullptr;
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
const bool EnableInfer = false;
const bool AllowLocalFiles;
};

}
Expand All @@ -622,8 +625,9 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
bool enableInfer,
bool allowLocalFiles) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer);
bool enableInfer,
bool allowLocalFiles);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ namespace NKikimr {

Y_UNIT_TEST_SUITE(ObjectStorageTest) {
Y_UNIT_TEST(SuccessValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
}

Y_UNIT_TEST(FailedCreate) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"a", "b"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
}

Y_UNIT_TEST(FailedValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"projection.h", "b"});
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void Init(
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);

s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
httpGateway, s3HttpRetryPolicy);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(

if (federatedQuerySetup) {
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig);
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig, nullptr, federatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);

if (federatedQuerySetup->ConnectorClient) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,8 @@ class TKqpHost : public IKqpHost {
ActorSystem,
FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(),
FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr,
Config->FeatureFlags.GetEnableExternalSourceSchemaInference());
Config->FeatureFlags.GetEnableExternalSourceSchemaInference(),
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
}
}

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6978,7 +6978,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(
std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()),
nullptr,
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit()
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit(),
nullptr,
appConfig.GetFeatureFlags().GetEnableExternalSourceSchemaInference(),
appConfig.GetQueryServiceConfig().GetS3().GetAllowLocalFiles()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,17 @@ namespace NYql::NDq {
IHTTPGateway::TPtr gateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters) override {
::NMonitoring::TDynamicCounterPtr counters,
bool allowLocalFiles) override {

#if defined(_linux_) || defined(_darwin_)
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
[credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
[credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams,
args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
counters, args.TaskCounters, args.MemoryQuotaManager);
counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles);
});
#else
Y_UNUSED(factory);
Expand Down
14 changes: 10 additions & 4 deletions ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
NActors::TActorId fileQueueActor,
ui64 fileQueueBatchSizeLimit,
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta)
ui64 fileQueueConsumersCountDelta,
bool allowLocalFiles)
: ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
Expand All @@ -76,6 +77,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
, FileQueueActor(fileQueueActor)
, AddPathIndex(addPathIndex)
, SizeLimit(sizeLimit)
, AllowLocalFiles(allowLocalFiles)
, Counters(counters)
, TaskCounters(taskCounters)
, FileSizeLimit(fileSizeLimit)
Expand Down Expand Up @@ -116,7 +118,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
Credentials,
Pattern,
PatternVariant,
NYql::NS3Lister::ES3PatternType::Wildcard));
NYql::NS3Lister::ES3PatternType::Wildcard,
AllowLocalFiles));
}

LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex << ", FileQueue: " << FileQueueActor << (UseRuntimeListing ? " (remote)" : " (local"));
Expand Down Expand Up @@ -467,6 +470,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
const bool AddPathIndex;
const ui64 SizeLimit;
TDuration CpuTime;
const bool AllowLocalFiles;

std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;

Expand Down Expand Up @@ -521,7 +525,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
NActors::TActorId fileQueueActor,
ui64 fileQueueBatchSizeLimit,
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta
ui64 fileQueueConsumersCountDelta,
bool allowLocalFiles
) {
const auto actor = new TS3ReadActor(
inputIndex,
Expand All @@ -547,7 +552,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
fileQueueActor,
fileQueueBatchSizeLimit,
fileQueueBatchObjectCountLimit,
fileQueueConsumersCountDelta
fileQueueConsumersCountDelta,
allowLocalFiles
);

return {actor, actor};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
NActors::TActorId fileQueueActor,
ui64 fileQueueBatchSizeLimit,
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta
ui64 fileQueueConsumersCountDelta,
bool allowLocalFiles
);

} // namespace NYql::NDq
17 changes: 11 additions & 6 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta,
bool asyncDecoding,
bool asyncDecompressing
bool asyncDecompressing,
bool allowLocalFiles
) : ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
Expand All @@ -1319,7 +1320,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit)
, FileQueueConsumersCountDelta(fileQueueConsumersCountDelta)
, AsyncDecoding(asyncDecoding)
, AsyncDecompressing(asyncDecompressing) {
, AsyncDecompressing(asyncDecompressing)
, AllowLocalFiles(allowLocalFiles) {
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
Expand Down Expand Up @@ -1396,7 +1398,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
Credentials,
Pattern,
PatternVariant,
ES3PatternType::Wildcard));
ES3PatternType::Wildcard,
AllowLocalFiles));
}
FileQueueEvents.Init(TxId, SelfId(), SelfId());
FileQueueEvents.OnNewRecipientId(FileQueueActor);
Expand Down Expand Up @@ -1904,6 +1907,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
ui64 FileQueueConsumersCountDelta;
const bool AsyncDecoding;
const bool AsyncDecompressing;
const bool AllowLocalFiles;
bool IsCurrentBatchEmpty = false;
bool IsFileQueueEmpty = false;
bool IsWaitingFileQueueResponse = false;
Expand Down Expand Up @@ -2067,7 +2071,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
IMemoryQuotaManager::TPtr memoryQuotaManager)
IMemoryQuotaManager::TPtr memoryQuotaManager,
bool allowLocalFiles)
{
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();

Expand Down Expand Up @@ -2257,7 +2262,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
params.GetAsyncDecoding(), params.GetAsyncDecompressing());
params.GetAsyncDecoding(), params.GetAsyncDecompressing(), allowLocalFiles);

return {actor, actor};
} else {
Expand All @@ -2268,7 +2273,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta, allowLocalFiles);
}
}

Expand Down
6 changes: 4 additions & 2 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ NActors::IActor* CreateS3FileQueueActor(
const TS3Credentials& credentials,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
NS3Lister::ES3PatternType patternType,
bool allowLocalFiles);

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor(
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
Expand All @@ -48,6 +49,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
IMemoryQuotaManager::TPtr memoryQuotaManager);
IMemoryQuotaManager::TPtr memoryQuotaManager,
bool allowLocalFiles);

} // namespace NYql::NDq
5 changes: 3 additions & 2 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ void RegisterS3ReadActorFactory(
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
const TS3ReadActorFactoryConfig& factoryConfig = {},
::NMonitoring::TDynamicCounterPtr counters = nullptr) {
::NMonitoring::TDynamicCounterPtr counters = nullptr,
bool allowLocalFiles = false) {
CreateS3ActorsFactory()->RegisterS3ReadActorFactory(
factory, credentialsFactory, gateway, retryPolicy, factoryConfig, counters
factory, credentialsFactory, gateway, retryPolicy, factoryConfig, counters, allowLocalFiles
);
}

Expand Down
Loading
Loading