Skip to content

Commit

Permalink
YQ kqprun pass allow local files into runtime listing (ydb-platform#7844
Browse files Browse the repository at this point in the history
)
  • Loading branch information
uzhastik committed Sep 15, 2024
1 parent cf25aff commit 131b40d
Show file tree
Hide file tree
Showing 20 changed files with 86 additions and 49 deletions.
5 changes: 3 additions & 2 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
bool enableInfer,
bool allowLocalFiles) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
ToString(NYql::EDatabaseType::ObjectStorage),
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles)
},
{
ToString(NYql::EDatabaseType::ClickHouse),
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
NActors::TActorSystem* actorSystem = nullptr,
size_t pathsLimit = 50000,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
bool enableInfer = false);
bool enableInfer = false,
bool allowLocalFiles = false);

}
26 changes: 15 additions & 11 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer)
bool enableInfer,
bool allowLocalFiles)
: HostnamePatterns(hostnamePatterns)
, PathsLimit(pathsLimit)
, ActorSystem(actorSystem)
, CredentialsFactory(std::move(credentialsFactory))
, EnableInfer(enableInfer)
, AllowLocalFiles(allowLocalFiles)
{}

virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
Expand Down Expand Up @@ -341,14 +343,14 @@ struct TObjectStorageExternalSource : public IExternalSource {

NYql::NS3Lister::TListingRequest request {
.Url = meta->DataSourceLocation,
.Credentials = credentials
};
TVector<NYql::NS3Lister::TListingRequest> requests;

if (!projection) {
auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request);
if (error) {
throw yexception() << *error;
.Credentials = credentials,
.Pattern = effectiveFilePattern,
}, Nothing(), AllowLocalFiles);
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
throw yexception() << error.Issues.ToString();
}
requests.push_back(request);
} else {
Expand Down Expand Up @@ -804,6 +806,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
NActors::TActorSystem* ActorSystem = nullptr;
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
const bool EnableInfer = false;
const bool AllowLocalFiles;
};

}
Expand All @@ -813,8 +816,9 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
bool enableInfer,
bool allowLocalFiles) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer);
bool enableInfer,
bool allowLocalFiles);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location);

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ namespace NKikimr {

Y_UNIT_TEST_SUITE(ObjectStorageTest) {
Y_UNIT_TEST(SuccessValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
}

Y_UNIT_TEST(FailedCreate) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"a", "b"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
}

Y_UNIT_TEST(FailedValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"projection.h", "b"});
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void Init(
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);

s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
httpGateway, s3HttpRetryPolicy);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(

if (federatedQuerySetup) {
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig, nullptr, federatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);

if (federatedQuerySetup->ConnectorClient) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,8 @@ class TKqpHost : public IKqpHost {
ActorSystem,
FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(),
FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr,
Config->FeatureFlags.GetEnableExternalSourceSchemaInference());
Config->FeatureFlags.GetEnableExternalSourceSchemaInference(),
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
}
}

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6977,7 +6977,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(
std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()),
nullptr,
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit()
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit(),
nullptr,
appConfig.GetFeatureFlags().GetEnableExternalSourceSchemaInference(),
appConfig.GetQueryServiceConfig().GetS3().GetAllowLocalFiles()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,17 @@ namespace NYql::NDq {
IHTTPGateway::TPtr gateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters) override {
::NMonitoring::TDynamicCounterPtr counters,
bool allowLocalFiles) override {

#if defined(_linux_) || defined(_darwin_)
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
[credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
[credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams,
args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
counters, args.TaskCounters, args.MemoryQuotaManager);
counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles);
});
#else
Y_UNUSED(factory);
Expand Down
14 changes: 10 additions & 4 deletions ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
NActors::TActorId fileQueueActor,
ui64 fileQueueBatchSizeLimit,
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta)
ui64 fileQueueConsumersCountDelta,
bool allowLocalFiles)
: ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
Expand All @@ -76,6 +77,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
, FileQueueActor(fileQueueActor)
, AddPathIndex(addPathIndex)
, SizeLimit(sizeLimit)
, AllowLocalFiles(allowLocalFiles)
, Counters(counters)
, TaskCounters(taskCounters)
, FileSizeLimit(fileSizeLimit)
Expand Down Expand Up @@ -116,7 +118,8 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
AuthInfo,
Pattern,
PatternVariant,
NYql::NS3Lister::ES3PatternType::Wildcard));
NYql::NS3Lister::ES3PatternType::Wildcard,
AllowLocalFiles));
}

LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex << ", FileQueue: " << FileQueueActor << (UseRuntimeListing ? " (remote)" : " (local"));
Expand Down Expand Up @@ -466,6 +469,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
const bool AddPathIndex;
const ui64 SizeLimit;
TDuration CpuTime;
const bool AllowLocalFiles;

std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;

Expand Down Expand Up @@ -520,7 +524,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
NActors::TActorId fileQueueActor,
ui64 fileQueueBatchSizeLimit,
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta
ui64 fileQueueConsumersCountDelta,
bool allowLocalFiles
) {
const auto actor = new TS3ReadActor(
inputIndex,
Expand All @@ -546,7 +551,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
fileQueueActor,
fileQueueBatchSizeLimit,
fileQueueBatchObjectCountLimit,
fileQueueConsumersCountDelta
fileQueueConsumersCountDelta,
allowLocalFiles
);

return {actor, actor};
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
NActors::TActorId fileQueueActor,
ui64 fileQueueBatchSizeLimit,
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta
ui64 fileQueueConsumersCountDelta,
bool allowLocalFiles
);

} // namespace NYql::NDq
17 changes: 11 additions & 6 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta,
bool asyncDecoding,
bool asyncDecompressing
bool asyncDecompressing,
bool allowLocalFiles
) : ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
Expand All @@ -1325,7 +1326,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit)
, FileQueueConsumersCountDelta(fileQueueConsumersCountDelta)
, AsyncDecoding(asyncDecoding)
, AsyncDecompressing(asyncDecompressing) {
, AsyncDecompressing(asyncDecompressing)
, AllowLocalFiles(allowLocalFiles) {
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
Expand Down Expand Up @@ -1397,7 +1399,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
AuthInfo,
Pattern,
PatternVariant,
ES3PatternType::Wildcard));
ES3PatternType::Wildcard,
AllowLocalFiles));
}
FileQueueEvents.Init(TxId, SelfId(), SelfId());
FileQueueEvents.OnNewRecipientId(FileQueueActor);
Expand Down Expand Up @@ -1904,6 +1907,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
ui64 FileQueueConsumersCountDelta;
const bool AsyncDecoding;
const bool AsyncDecompressing;
const bool AllowLocalFiles;
bool IsCurrentBatchEmpty = false;
bool IsFileQueueEmpty = false;
bool IsWaitingFileQueueResponse = false;
Expand Down Expand Up @@ -2067,7 +2071,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
IMemoryQuotaManager::TPtr memoryQuotaManager)
IMemoryQuotaManager::TPtr memoryQuotaManager,
bool allowLocalFiles)
{
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();

Expand Down Expand Up @@ -2257,7 +2262,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
params.GetAsyncDecoding(), params.GetAsyncDecompressing());
params.GetAsyncDecoding(), params.GetAsyncDecompressing(), allowLocalFiles);

return {actor, actor};
} else {
Expand All @@ -2268,7 +2273,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta, allowLocalFiles);
}
}

Expand Down
6 changes: 4 additions & 2 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ NActors::IActor* CreateS3FileQueueActor(
TS3Credentials::TAuthInfo authInfo,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
NS3Lister::ES3PatternType patternType,
bool allowLocalFiles);

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor(
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
Expand All @@ -50,6 +51,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
const TS3ReadActorFactoryConfig& cfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
IMemoryQuotaManager::TPtr memoryQuotaManager);
IMemoryQuotaManager::TPtr memoryQuotaManager,
bool allowLocalFiles);

} // namespace NYql::NDq
Loading

0 comments on commit 131b40d

Please sign in to comment.