Skip to content

Commit

Permalink
add gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds committed Sep 24, 2024
1 parent b6f3226 commit f4308cc
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 62 deletions.
26 changes: 17 additions & 9 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
#include <ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>

Expand Down Expand Up @@ -189,13 +190,21 @@ void Init(
}

if (protoConfig.GetRowDispatcher().GetEnabled()) {
NYql::TPqGatewayServices pqServices(
yqSharedResources->UserSpaceYdbDriver,
nullptr,
nullptr,
std::make_shared<NYql::TPqGatewayConfig>(),
nullptr);

auto rowDispatcher = NFq::NewRowDispatcherService(
protoConfig.GetRowDispatcher(),
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
tenant,
yqCounters->GetSubgroup("subsystem", "row_dispatcher"));
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
CreatePqNativeGateway(pqServices));
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
}

Expand All @@ -206,14 +215,13 @@ void Init(
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());

RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
// NYql::TPqGatewayServices pqServices(
// driver,
// nullptr,
// nullptr,
// std::make_shared<TPqGatewayConfig>(),
// nullptr
// );
// RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, CreatePqNativeGateway(pqServices), yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
NYql::TPqGatewayServices pqServices(
yqSharedResources->UserSpaceYdbDriver,
nullptr,
nullptr,
std::make_shared<NYql::TPqGatewayConfig>(),
nullptr);
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, CreatePqNativeGateway(pqServices), yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));

s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ struct TActorFactory : public IActorFactory {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters) const override {
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const override {

auto actorPtr = NFq::NewTopicSession(
topicPath,
Expand All @@ -24,7 +25,8 @@ struct TActorFactory : public IActorFactory {
partitionId,
std::move(driver),
credentialsProviderFactory,
counters
counters,
pqGateway
);
return NActors::TlsActivationContext->ExecutorThread.RegisterActor(actorPtr.release(), NActors::TMailboxType::HTSwap, Max<ui32>());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <util/generic/ptr.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>

namespace NFq::NRowDispatcher {

Expand All @@ -17,7 +18,8 @@ struct IActorFactory : public TThrRefBase {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters) const = 0;
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
};

IActorFactory::TPtr CreateActorFactory();
Expand Down
20 changes: 14 additions & 6 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
NFq::NRowDispatcher::IActorFactory::TPtr ActorFactory;
const ::NMonitoring::TDynamicCounterPtr Counters;
TRowDispatcherMetrics Metrics;
NYql::IPqGateway::TPtr PqGateway;

struct ConsumerCounters {
ui64 NewDataArrived = 0;
Expand Down Expand Up @@ -175,7 +176,8 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters);
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

void Bootstrap();

Expand Down Expand Up @@ -237,7 +239,8 @@ TRowDispatcher::TRowDispatcher(
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters)
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, YqSharedResources(yqSharedResources)
Expand All @@ -246,7 +249,8 @@ TRowDispatcher::TRowDispatcher(
, Tenant(tenant)
, ActorFactory(actorFactory)
, Counters(counters)
, Metrics(counters) {
, Metrics(counters)
, PqGateway(pqGateway) {
}

void TRowDispatcher::Bootstrap() {
Expand Down Expand Up @@ -387,7 +391,9 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
CredentialsFactory,
ev->Get()->Record.GetToken(),
source.GetAddBearerToToken()),
Counters);
Counters,
PqGateway
);
SessionInfo& sessionInfo = topicSessionInfo.Sessions[sessionActorId];
sessionInfo.Consumers[ev->Sender] = consumerInfo;
} else {
Expand Down Expand Up @@ -587,7 +593,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters)
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
{
return std::unique_ptr<NActors::IActor>(new TRowDispatcher(
config,
Expand All @@ -596,7 +603,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
credentialsFactory,
tenant,
actorFactory,
counters));
counters,
pqGateway));
}

} // namespace NFq
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/actors/core/actor.h>

#include <memory>
Expand All @@ -21,6 +22,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters);
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

} // namespace NFq
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters)
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
{
return NewRowDispatcher(
config,
Expand All @@ -24,7 +25,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
credentialsFactory,
tenant,
NFq::NRowDispatcher::CreateActorFactory(),
counters);
counters,
pqGateway);
}

} // namespace NFq
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include "events/data_plane.h"

#include <ydb/library/actors/core/actor.h>
Expand All @@ -20,6 +21,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters);
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

} // namespace NFq
23 changes: 14 additions & 9 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 PartitionId;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
std::unique_ptr<NYdb::NTopic::TTopicClient> TopicClient;
NYql::ITopicClient::TPtr TopicClient;
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
const i64 BufferSize;
TString LogPrefix;
Expand All @@ -155,6 +155,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
TMaybe<TParserInputType> CurrentParserTypes;
const ::NMonitoring::TDynamicCounterPtr Counters;
TTopicSessionMetrics Metrics;
NYql::IPqGateway::TPtr PqGateway;

public:
explicit TTopicSession(
Expand All @@ -164,7 +165,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters);
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

void Bootstrap();
void PassAway() override;
Expand All @@ -173,7 +175,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {

private:
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const;
NYdb::NTopic::TTopicClient& GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams);
NYql::ITopicClient& GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams);
NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const;
void CreateTopicSession();
void CloseTopicSession();
Expand Down Expand Up @@ -244,7 +246,8 @@ TTopicSession::TTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters)
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
: TopicPath(topicPath)
, RowDispatcherActorId(rowDispatcherActorId)
, PartitionId(partitionId)
Expand All @@ -254,6 +257,7 @@ TTopicSession::TTopicSession(
, LogPrefix("TopicSession")
, Config(config)
, Counters(counters)
, PqGateway(pqGateway)
{
}

Expand Down Expand Up @@ -302,9 +306,9 @@ NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(const N
return opts;
}

NYdb::NTopic::TTopicClient& TTopicSession::GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
NYql::ITopicClient& TTopicSession::GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
if (!TopicClient) {
TopicClient = std::make_unique<NYdb::NTopic::TTopicClient>(Driver, GetTopicClientSettings(sourceParams));
TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings(sourceParams));
}
return *TopicClient;
}
Expand Down Expand Up @@ -718,7 +722,7 @@ void TTopicSession::StopReadSession() {
ReadSession->Close(TDuration::Zero());
ReadSession.reset();
}
TopicClient.reset();
TopicClient.Reset();
}

void TTopicSession::SendDataArrived(ClientsInfo& info) {
Expand Down Expand Up @@ -769,8 +773,9 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters) {
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, counters));
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) {
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway));
}

} // namespace NFq
6 changes: 5 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/topic_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>

#include <ydb/library/security/ydb_credentials_provider_factory.h>

#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>

#include <ydb/library/actors/core/actor.h>

#include <memory>
Expand All @@ -19,6 +22,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters);
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

} // namespace NFq
14 changes: 12 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/core/testlib/basics/helpers.h>
#include <ydb/core/testlib/actor_helpers.h>
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>

namespace {

Expand All @@ -32,7 +33,8 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
ui32 /*partitionId*/,
NYdb::TDriver /*driver*/,
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
const ::NMonitoring::TDynamicCounterPtr& /*counters*/) const override {
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
const NYql::IPqGateway::TPtr& /*pqGateway*/) const override {
auto actorId = Runtime.AllocateEdgeActor();
ActorIds.push(actorId);
return actorId;
Expand Down Expand Up @@ -70,6 +72,13 @@ class TFixture : public NUnitTest::TBaseFixture {
ReadActorId1 = Runtime.AllocateEdgeActor();
ReadActorId2 = Runtime.AllocateEdgeActor();
TestActorFactory = MakeIntrusive<TTestActorFactory>(Runtime);

NYql::TPqGatewayServices pqServices(
yqSharedResources->UserSpaceYdbDriver,
nullptr,
nullptr,
std::make_shared<NYql::TPqGatewayConfig>(),
nullptr);

RowDispatcher = Runtime.Register(NewRowDispatcher(
config,
Expand All @@ -78,7 +87,8 @@ class TFixture : public NUnitTest::TBaseFixture {
credentialsFactory,
"Tenant",
TestActorFactory,
MakeIntrusive<NMonitoring::TDynamicCounters>()
MakeIntrusive<NMonitoring::TDynamicCounters>(),
CreatePqNativeGateway(pqServices)
).release());

Runtime.EnableScheduleForActor(RowDispatcher);
Expand Down
15 changes: 14 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>

#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>

namespace {

using namespace NKikimr;
Expand Down Expand Up @@ -42,14 +44,25 @@ class TFixture : public NUnitTest::TBaseFixture {
Config.SetSendStatusPeriodSec(2);
Config.SetWithoutConsumer(true);

auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));

NYql::TPqGatewayServices pqServices(
yqSharedResources->UserSpaceYdbDriver,
nullptr,
nullptr,
std::make_shared<NYql::TPqGatewayConfig>(),
nullptr);

TopicSession = Runtime.Register(NewTopicSession(
topicPath,
Config,
RowDispatcherActorId,
0,
Driver,
CredentialsProviderFactory,
MakeIntrusive<NMonitoring::TDynamicCounters>()
MakeIntrusive<NMonitoring::TDynamicCounters>(),
CreatePqNativeGateway(pqServices)
).release());
Runtime.EnableScheduleForActor(TopicSession);

Expand Down
Loading

0 comments on commit f4308cc

Please sign in to comment.