From 131b40de47f21aa79faa894dcb8f17f504c71066 Mon Sep 17 00:00:00 2001 From: Sergey Uzhakov Date: Sun, 15 Sep 2024 14:18:41 +0300 Subject: [PATCH] YQ kqprun pass allow local files into runtime listing (#7844) --- .../external_source_factory.cpp | 5 ++-- .../external_source_factory.h | 3 ++- ydb/core/external_sources/object_storage.cpp | 26 +++++++++++-------- ydb/core/external_sources/object_storage.h | 3 ++- .../external_sources/object_storage_ut.cpp | 6 ++--- ydb/core/fq/libs/init/init.cpp | 2 +- .../kqp/compute_actor/kqp_compute_actor.cpp | 2 +- ydb/core/kqp/host/kqp_host.cpp | 3 ++- ydb/core/tx/schemeshard/schemeshard_impl.cpp | 5 +++- .../s3/actors/yql_s3_actors_factory_impl.cpp | 7 ++--- .../s3/actors/yql_s3_raw_read_actor.cpp | 14 +++++++--- .../s3/actors/yql_s3_raw_read_actor.h | 3 ++- .../providers/s3/actors/yql_s3_read_actor.cpp | 17 +++++++----- .../providers/s3/actors/yql_s3_read_actor.h | 6 +++-- .../s3/actors/yql_s3_source_factory.h | 5 ++-- .../s3/actors/yql_s3_source_queue.cpp | 15 +++++++---- .../providers/s3/actors/yql_s3_source_queue.h | 3 ++- .../actors_factory/yql_s3_actors_factory.cpp | 4 ++- .../s3/actors_factory/yql_s3_actors_factory.h | 3 ++- .../s3/provider/yql_s3_dq_integration.cpp | 3 ++- 20 files changed, 86 insertions(+), 49 deletions(-) diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index c0be11d62eab..ee6884383f63 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -36,12 +36,13 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector credentialsFactory, - bool enableInfer) { + bool enableInfer, + bool allowLocalFiles) { std::vector hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end()); return MakeIntrusive(TMap{ { ToString(NYql::EDatabaseType::ObjectStorage), - CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer) + CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles) }, { ToString(NYql::EDatabaseType::ClickHouse), diff --git a/ydb/core/external_sources/external_source_factory.h b/ydb/core/external_sources/external_source_factory.h index 7b49e0e74544..27b5ebd8e0ec 100644 --- a/ydb/core/external_sources/external_source_factory.h +++ b/ydb/core/external_sources/external_source_factory.h @@ -15,6 +15,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector credentialsFactory = nullptr, - bool enableInfer = false); + bool enableInfer = false, + bool allowLocalFiles = false); } diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index f496628abf4b..b87652320112 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -41,12 +41,14 @@ struct TObjectStorageExternalSource : public IExternalSource { NActors::TActorSystem* actorSystem, size_t pathsLimit, std::shared_ptr credentialsFactory, - bool enableInfer) + bool enableInfer, + bool allowLocalFiles) : HostnamePatterns(hostnamePatterns) , PathsLimit(pathsLimit) , ActorSystem(actorSystem) , CredentialsFactory(std::move(credentialsFactory)) , EnableInfer(enableInfer) + , AllowLocalFiles(allowLocalFiles) {} virtual TString Pack(const NKikimrExternalSources::TSchema& schema, @@ -341,14 +343,14 @@ struct TObjectStorageExternalSource : public IExternalSource { NYql::NS3Lister::TListingRequest request { .Url = meta->DataSourceLocation, - .Credentials = credentials - }; - TVector requests; - - if (!projection) { - auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request); - if (error) { - throw yexception() << *error; + .Credentials = credentials, + .Pattern = effectiveFilePattern, + }, Nothing(), AllowLocalFiles); + auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture& listResFut) { + auto& listRes = listResFut.GetValue(); + if (std::holds_alternative(listRes)) { + auto& error = std::get(listRes); + throw yexception() << error.Issues.ToString(); } requests.push_back(request); } else { @@ -804,6 +806,7 @@ struct TObjectStorageExternalSource : public IExternalSource { NActors::TActorSystem* ActorSystem = nullptr; std::shared_ptr CredentialsFactory; const bool EnableInfer = false; + const bool AllowLocalFiles; }; } @@ -813,8 +816,9 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector credentialsFactory, - bool enableInfer) { - return MakeIntrusive(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer); + bool enableInfer, + bool allowLocalFiles) { + return MakeIntrusive(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles); } NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location) { diff --git a/ydb/core/external_sources/object_storage.h b/ydb/core/external_sources/object_storage.h index ae99009e5cde..74de7a69eb87 100644 --- a/ydb/core/external_sources/object_storage.h +++ b/ydb/core/external_sources/object_storage.h @@ -13,7 +13,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector credentialsFactory, - bool enableInfer); + bool enableInfer, + bool allowLocalFiles); NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location); diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp index 887d1c1d4bbf..b37a12f38704 100644 --- a/ydb/core/external_sources/object_storage_ut.cpp +++ b/ydb/core/external_sources/object_storage_ut.cpp @@ -8,14 +8,14 @@ namespace NKikimr { Y_UNIT_TEST_SUITE(ObjectStorageTest) { Y_UNIT_TEST(SuccessValidation) { - auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false); + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); NKikimrExternalSources::TSchema schema; NKikimrExternalSources::TGeneral general; UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general)); } Y_UNIT_TEST(FailedCreate) { - auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false); + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); NKikimrExternalSources::TSchema schema; NKikimrExternalSources::TGeneral general; general.mutable_attributes()->insert({"a", "b"}); @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) { } Y_UNIT_TEST(FailedValidation) { - auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false); + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); NKikimrExternalSources::TSchema schema; NKikimrExternalSources::TGeneral general; general.mutable_attributes()->insert({"projection.h", "b"}); diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index acd5518a98cf..18054bb09fa5 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -227,7 +227,7 @@ void Init( RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg, - yqCounters->GetSubgroup("subsystem", "S3ReadActor")); + yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles()); s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 74b0f6f14f79..cc4502ca840c 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( if (federatedQuerySetup) { auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); - s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy); + s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig, nullptr, federatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles()); s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy); if (federatedQuerySetup->ConnectorClient) { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index dddb3f725f7a..54ecd955cf73 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1072,7 +1072,8 @@ class TKqpHost : public IKqpHost { ActorSystem, FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(), FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr, - Config->FeatureFlags.GetEnableExternalSourceSchemaInference()); + Config->FeatureFlags.GetEnableExternalSourceSchemaInference(), + FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles()); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 4cb4ddd93d22..4d0d3d321384 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6977,7 +6977,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory( std::vector(hostnamePatterns.begin(), hostnamePatterns.end()), nullptr, - appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit() + appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit(), + nullptr, + appConfig.GetFeatureFlags().GetEnableExternalSourceSchemaInference(), + appConfig.GetQueryServiceConfig().GetS3().GetAllowLocalFiles() ); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp index 7affd433e1a8..202cbebf3d21 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp @@ -64,16 +64,17 @@ namespace NYql::NDq { IHTTPGateway::TPtr gateway, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, - ::NMonitoring::TDynamicCounterPtr counters) override { + ::NMonitoring::TDynamicCounterPtr counters, + bool allowLocalFiles) override { #if defined(_linux_) || defined(_darwin_) NDB::registerFormats(); factory.RegisterSource("S3Source", - [credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { + [credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams, args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, - counters, args.TaskCounters, args.MemoryQuotaManager); + counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles); }); #else Y_UNUSED(factory); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp index d338c1dc2bdc..8dc2e0c73501 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp @@ -59,7 +59,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID NActors::TActorId fileQueueActor, ui64 fileQueueBatchSizeLimit, ui64 fileQueueBatchObjectCountLimit, - ui64 fileQueueConsumersCountDelta) + ui64 fileQueueConsumersCountDelta, + bool allowLocalFiles) : ReadActorFactoryCfg(readActorFactoryCfg) , Gateway(std::move(gateway)) , HolderFactory(holderFactory) @@ -76,6 +77,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID , FileQueueActor(fileQueueActor) , AddPathIndex(addPathIndex) , SizeLimit(sizeLimit) + , AllowLocalFiles(allowLocalFiles) , Counters(counters) , TaskCounters(taskCounters) , FileSizeLimit(fileSizeLimit) @@ -116,7 +118,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID AuthInfo, Pattern, PatternVariant, - NYql::NS3Lister::ES3PatternType::Wildcard)); + NYql::NS3Lister::ES3PatternType::Wildcard, + AllowLocalFiles)); } LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex << ", FileQueue: " << FileQueueActor << (UseRuntimeListing ? " (remote)" : " (local")); @@ -466,6 +469,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID const bool AddPathIndex; const ui64 SizeLimit; TDuration CpuTime; + const bool AllowLocalFiles; std::queue> Blocks; @@ -520,7 +524,8 @@ std::pair CreateRawRead NActors::TActorId fileQueueActor, ui64 fileQueueBatchSizeLimit, ui64 fileQueueBatchObjectCountLimit, - ui64 fileQueueConsumersCountDelta + ui64 fileQueueConsumersCountDelta, + bool allowLocalFiles ) { const auto actor = new TS3ReadActor( inputIndex, @@ -546,7 +551,8 @@ std::pair CreateRawRead fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, - fileQueueConsumersCountDelta + fileQueueConsumersCountDelta, + allowLocalFiles ); return {actor, actor}; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h index 3c76d44bf7d8..c30817fd1fde 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h @@ -36,7 +36,8 @@ std::pair CreateRawRead NActors::TActorId fileQueueActor, ui64 fileQueueBatchSizeLimit, ui64 fileQueueBatchObjectCountLimit, - ui64 fileQueueConsumersCountDelta + ui64 fileQueueConsumersCountDelta, + bool allowLocalFiles ); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 8892e3746de6..80c95c048255 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1298,7 +1298,8 @@ class TS3StreamReadActor : public TActorBootstrapped, public ui64 fileQueueBatchObjectCountLimit, ui64 fileQueueConsumersCountDelta, bool asyncDecoding, - bool asyncDecompressing + bool asyncDecompressing, + bool allowLocalFiles ) : ReadActorFactoryCfg(readActorFactoryCfg) , Gateway(std::move(gateway)) , HolderFactory(holderFactory) @@ -1325,7 +1326,8 @@ class TS3StreamReadActor : public TActorBootstrapped, public , FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit) , FileQueueConsumersCountDelta(fileQueueConsumersCountDelta) , AsyncDecoding(asyncDecoding) - , AsyncDecompressing(asyncDecompressing) { + , AsyncDecompressing(asyncDecompressing) + , AllowLocalFiles(allowLocalFiles) { if (Counters) { QueueDataSize = Counters->GetCounter("QueueDataSize"); QueueDataLimit = Counters->GetCounter("QueueDataLimit"); @@ -1397,7 +1399,8 @@ class TS3StreamReadActor : public TActorBootstrapped, public AuthInfo, Pattern, PatternVariant, - ES3PatternType::Wildcard)); + ES3PatternType::Wildcard, + AllowLocalFiles)); } FileQueueEvents.Init(TxId, SelfId(), SelfId()); FileQueueEvents.OnNewRecipientId(FileQueueActor); @@ -1904,6 +1907,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public ui64 FileQueueConsumersCountDelta; const bool AsyncDecoding; const bool AsyncDecompressing; + const bool AllowLocalFiles; bool IsCurrentBatchEmpty = false; bool IsFileQueueEmpty = false; bool IsWaitingFileQueueResponse = false; @@ -2067,7 +2071,8 @@ std::pair CreateS3ReadActor( const TS3ReadActorFactoryConfig& cfg, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, - IMemoryQuotaManager::TPtr memoryQuotaManager) + IMemoryQuotaManager::TPtr memoryQuotaManager, + bool allowLocalFiles) { const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); @@ -2257,7 +2262,7 @@ std::pair CreateS3ReadActor( std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy, cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager, params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta, - params.GetAsyncDecoding(), params.GetAsyncDecompressing()); + params.GetAsyncDecoding(), params.GetAsyncDecompressing(), allowLocalFiles); return {actor, actor}; } else { @@ -2268,7 +2273,7 @@ std::pair CreateS3ReadActor( return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy, cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, - params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta); + params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta, allowLocalFiles); } } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 2b808254c213..d3a6a5339629 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -31,7 +31,8 @@ NActors::IActor* CreateS3FileQueueActor( TS3Credentials::TAuthInfo authInfo, TString pattern, NYql::NS3Lister::ES3PatternVariant patternVariant, - NS3Lister::ES3PatternType patternType); + NS3Lister::ES3PatternType patternType, + bool allowLocalFiles); std::pair CreateS3ReadActor( const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, @@ -50,6 +51,7 @@ std::pair CreateS3ReadA const TS3ReadActorFactoryConfig& cfg, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, - IMemoryQuotaManager::TPtr memoryQuotaManager); + IMemoryQuotaManager::TPtr memoryQuotaManager, + bool allowLocalFiles); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index a0f6030f33b3..12a3ca3a79d4 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -10,9 +10,10 @@ inline void RegisterS3ReadActorFactory( IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(), const TS3ReadActorFactoryConfig& factoryConfig = {}, - ::NMonitoring::TDynamicCounterPtr counters = nullptr) { + ::NMonitoring::TDynamicCounterPtr counters = nullptr, + bool allowLocalFiles = false) { CreateS3ActorsFactory()->RegisterS3ReadActorFactory( - factory, credentialsFactory, gateway, retryPolicy, factoryConfig, counters + factory, credentialsFactory, gateway, retryPolicy, factoryConfig, counters, allowLocalFiles ); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp index a91a90d85a64..c144c536c724 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp @@ -176,7 +176,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped TS3Credentials::TAuthInfo authInfo, TString pattern, NS3Lister::ES3PatternVariant patternVariant, - NS3Lister::ES3PatternType patternType) + NS3Lister::ES3PatternType patternType, + bool allowLocalFiles) : TxId(std::move(txId)) , PrefetchSize(prefetchSize) , FileSizeLimit(fileSizeLimit) @@ -192,7 +193,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped , AuthInfo(std::move(authInfo)) , Pattern(std::move(pattern)) , PatternVariant(patternVariant) - , PatternType(patternType) { + , PatternType(patternType) + , AllowLocalFiles(allowLocalFiles) { for (size_t i = 0; i < paths.size(); ++i) { NS3::FileQueue::TObjectPath object; object.SetPath(paths[i].Path); @@ -500,7 +502,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped PatternType, object.GetPath()}, Nothing(), - false, + AllowLocalFiles, NActors::TActivationContext::ActorSystem()); Fetch(); return true; @@ -621,6 +623,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped const TString Pattern; const NS3Lister::ES3PatternVariant PatternVariant; const NS3Lister::ES3PatternType PatternType; + const bool AllowLocalFiles; static constexpr TDuration PoisonTimeout = TDuration::Hours(3); static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3); @@ -642,7 +645,8 @@ NActors::IActor* CreateS3FileQueueActor( TS3Credentials::TAuthInfo authInfo, TString pattern, NS3Lister::ES3PatternVariant patternVariant, - NS3Lister::ES3PatternType patternType) { + NS3Lister::ES3PatternType patternType, + bool allowLocalFiles) { return new TS3FileQueueActor( txId, paths, @@ -659,7 +663,8 @@ NActors::IActor* CreateS3FileQueueActor( authInfo, pattern, patternVariant, - patternType + patternType, + allowLocalFiles ); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h index 12a90ffa3faa..2239b5f9f414 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h @@ -27,6 +27,7 @@ NActors::IActor* CreateS3FileQueueActor( TS3Credentials::TAuthInfo authInfo, TString pattern, NYql::NS3Lister::ES3PatternVariant patternVariant, - NS3Lister::ES3PatternType patternType); + NS3Lister::ES3PatternType patternType, + bool allowLocalFiles); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.cpp b/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.cpp index 3b14babb1dc3..389ddb3ec875 100644 --- a/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.cpp +++ b/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.cpp @@ -45,7 +45,8 @@ namespace NYql::NDq { IHTTPGateway::TPtr gateway, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& factoryConfig = {}, - ::NMonitoring::TDynamicCounterPtr counters = nullptr) override { + ::NMonitoring::TDynamicCounterPtr counters = nullptr, + bool allowLocalFiles = false) override { Y_UNUSED(factory); Y_UNUSED(credentialsFactory); @@ -53,6 +54,7 @@ namespace NYql::NDq { Y_UNUSED(retryPolicy); Y_UNUSED(factoryConfig); Y_UNUSED(counters); + Y_UNUSED(allowLocalFiles); } }; diff --git a/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h b/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h index 7bb632569075..5e52fc3330ba 100644 --- a/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h +++ b/ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h @@ -48,7 +48,8 @@ namespace NYql::NDq { IHTTPGateway::TPtr gateway, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg = {}, - ::NMonitoring::TDynamicCounterPtr counters = nullptr) = 0; + ::NMonitoring::TDynamicCounterPtr counters = nullptr, + bool allowLocalFiles = false) = 0; }; std::shared_ptr CreateDefaultS3ActorsFactory(); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 58228a557d1a..0cd8905a9912 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -546,7 +546,8 @@ class TS3DqIntegration: public TDqIntegrationBase { GetAuthInfo(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)), pathPattern, pathPatternVariant, - NS3Lister::ES3PatternType::Wildcard + NS3Lister::ES3PatternType::Wildcard, + State_->Configuration->AllowLocalFiles ), NActors::TMailboxType::HTSwap, State_->ExecutorPoolId