Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataplane api for mounting unmounting topics #1457

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/redpanda-data/benthos/v4 v4.35.0
github.com/redpanda-data/common-go/api v0.0.0-20240918135346-6c838a508d64
github.com/redpanda-data/common-go/net v0.1.1-0.20240429123545-4da3d2b371f7
github.com/redpanda-data/common-go/rpadmin v0.1.3
github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240924011720-7c27dd07c8df
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.32.0
Expand Down
2 changes: 2 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ github.com/redpanda-data/common-go/net v0.1.1-0.20240429123545-4da3d2b371f7 h1:M
github.com/redpanda-data/common-go/net v0.1.1-0.20240429123545-4da3d2b371f7/go.mod h1:UJIi/yUxGOBYXUrfUsOkxfYxcb/ll7mZrwae/i+U2kc=
github.com/redpanda-data/common-go/rpadmin v0.1.3 h1:JRdr4rHcdr+A0hHr+viJYnPm+dP01bsgVUoQLM7Kz44=
github.com/redpanda-data/common-go/rpadmin v0.1.3/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4=
github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240924011720-7c27dd07c8df h1:xIOCcyLOVi/7GlzizOeQ0UyXpCprDV1mhlmNqDCyPXQ=
github.com/redpanda-data/common-go/rpadmin v0.1.7-0.20240924011720-7c27dd07c8df/go.mod h1:I7umqhnMhIOSEnIA3fvLtdQU7QO/SbWGCwFfFDs3De4=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rickb777/period v1.0.6 h1:f4TcHBtL/4qa4D44eqgxs7785/kfLKUjRI7XYI2HCvk=
github.com/rickb777/period v1.0.6/go.mod h1:TKkPHI/WSyjjVdeVCyqwBoQg0Cdb/jRvnc8FFdq2cgw=
Expand Down
58 changes: 46 additions & 12 deletions backend/pkg/api/connect/service/topic/v1alpha2/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,27 @@ package topic
import (
"fmt"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/twmb/franz-go/pkg/kmsg"

common "github.com/redpanda-data/console/backend/pkg/api/connect/service/common/v1alpha2"
v1alpha2 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha2"
)

type kafkaClientMapper struct {
type mapper struct {
commonKafkaClientMapper common.KafkaClientMapper
}

// createTopicRequestToKafka maps the proto request to create a topic into a kmsg.CreateTopicsRequestTopic.
func (k *kafkaClientMapper) createTopicRequestToKafka(req *v1alpha2.CreateTopicRequest) *kmsg.CreateTopicsRequest {
func (k *mapper) createTopicRequestToKafka(req *v1alpha2.CreateTopicRequest) *kmsg.CreateTopicsRequest {
kafkaReq := kmsg.NewCreateTopicsRequest()
kafkaReq.ValidateOnly = req.ValidateOnly
kafkaReq.Topics = []kmsg.CreateTopicsRequestTopic{k.createTopicRequestTopicToKafka(req.Topic)}
return &kafkaReq
}

// createTopicRequestToKafka maps the proto message for creating a topic to kmsg.CreateTopicsRequestTopic.
func (*kafkaClientMapper) createTopicRequestTopicToKafka(topicReq *v1alpha2.CreateTopicRequest_Topic) kmsg.CreateTopicsRequestTopic {
func (*mapper) createTopicRequestTopicToKafka(topicReq *v1alpha2.CreateTopicRequest_Topic) kmsg.CreateTopicsRequestTopic {
partitionCount := int32(-1)
if topicReq.PartitionCount != nil {
partitionCount = *topicReq.PartitionCount
Expand Down Expand Up @@ -68,15 +69,15 @@ func (*kafkaClientMapper) createTopicRequestTopicToKafka(topicReq *v1alpha2.Crea
return req
}

func (*kafkaClientMapper) createTopicResponseTopicToProto(topic kmsg.CreateTopicsResponseTopic) *v1alpha2.CreateTopicResponse {
func (*mapper) createTopicResponseTopicToProto(topic kmsg.CreateTopicsResponseTopic) *v1alpha2.CreateTopicResponse {
return &v1alpha2.CreateTopicResponse{
Name: topic.Topic,
PartitionCount: topic.NumPartitions,
ReplicationFactor: int32(topic.ReplicationFactor),
}
}

func (*kafkaClientMapper) describeTopicConfigsToKafka(req *v1alpha2.GetTopicConfigurationsRequest) kmsg.DescribeConfigsRequest {
func (*mapper) describeTopicConfigsToKafka(req *v1alpha2.GetTopicConfigurationsRequest) kmsg.DescribeConfigsRequest {
configResource := kmsg.NewDescribeConfigsRequestResource()
configResource.ResourceType = kmsg.ConfigResourceTypeTopic
configResource.ResourceName = req.TopicName
Expand All @@ -90,7 +91,7 @@ func (*kafkaClientMapper) describeTopicConfigsToKafka(req *v1alpha2.GetTopicConf
return kafkaReq
}

func (k *kafkaClientMapper) describeTopicConfigsToProto(resources []kmsg.DescribeConfigsResponseResourceConfig) ([]*v1alpha2.Topic_Configuration, error) {
func (k *mapper) describeTopicConfigsToProto(resources []kmsg.DescribeConfigsResponseResourceConfig) ([]*v1alpha2.Topic_Configuration, error) {
mappedResources := make([]*v1alpha2.Topic_Configuration, len(resources))
for i, resource := range resources {
configType, err := k.commonKafkaClientMapper.ConfigTypeToProto(resource.ConfigType)
Expand Down Expand Up @@ -127,7 +128,7 @@ func (k *kafkaClientMapper) describeTopicConfigsToProto(resources []kmsg.Describ
return mappedResources, nil
}

func (*kafkaClientMapper) deleteTopicToKmsg(req *v1alpha2.DeleteTopicRequest) kmsg.DeleteTopicsRequest {
func (*mapper) deleteTopicToKmsg(req *v1alpha2.DeleteTopicRequest) kmsg.DeleteTopicsRequest {
kafkaReq := kmsg.NewDeleteTopicsRequest()
kafkaReq.TopicNames = []string{req.Name}
kafkaReq.Topics = []kmsg.DeleteTopicsRequestTopic{
Expand All @@ -139,7 +140,7 @@ func (*kafkaClientMapper) deleteTopicToKmsg(req *v1alpha2.DeleteTopicRequest) km
return kafkaReq
}

func (k *kafkaClientMapper) updateTopicConfigsToKafka(req *v1alpha2.UpdateTopicConfigurationsRequest) (*kmsg.IncrementalAlterConfigsRequest, error) {
func (k *mapper) updateTopicConfigsToKafka(req *v1alpha2.UpdateTopicConfigurationsRequest) (*kmsg.IncrementalAlterConfigsRequest, error) {
// We only have one resource (a single topic) whose configs we want to update incrementally
// The API allows to add many, independent resources of different types. Because we always only
// want to patch configs for a single Kafka topic, we can simplify the mapping here by hardcoding
Expand Down Expand Up @@ -170,7 +171,7 @@ func (k *kafkaClientMapper) updateTopicConfigsToKafka(req *v1alpha2.UpdateTopicC
return &kafkaReq, nil
}

func (k *kafkaClientMapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse) []*v1alpha2.ListTopicsResponse_Topic {
func (k *mapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse) []*v1alpha2.ListTopicsResponse_Topic {
topics := make([]*v1alpha2.ListTopicsResponse_Topic, len(metadata.Topics))
for i, topicMetadata := range metadata.Topics {
topics[i] = k.kafkaTopicMetadataToProto(topicMetadata)
Expand All @@ -179,7 +180,7 @@ func (k *kafkaClientMapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse
return topics
}

func (*kafkaClientMapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataResponseTopic) *v1alpha2.ListTopicsResponse_Topic {
func (*mapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataResponseTopic) *v1alpha2.ListTopicsResponse_Topic {
// We iterate through all partitions to figure out the replication factor,
// in case we get an error for the first partitions
replicationFactor := -1
Expand All @@ -197,7 +198,7 @@ func (*kafkaClientMapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataR
}
}

func (k *kafkaClientMapper) setTopicConfigurationsToKafka(req *v1alpha2.SetTopicConfigurationsRequest) *kmsg.AlterConfigsRequest {
func (k *mapper) setTopicConfigurationsToKafka(req *v1alpha2.SetTopicConfigurationsRequest) *kmsg.AlterConfigsRequest {
alterConfigResource := kmsg.NewAlterConfigsRequestResource()
alterConfigResource.ResourceType = kmsg.ConfigResourceTypeTopic
alterConfigResource.ResourceName = req.TopicName
Expand All @@ -212,10 +213,43 @@ func (k *kafkaClientMapper) setTopicConfigurationsToKafka(req *v1alpha2.SetTopic
return &kafkaReq
}

func (*kafkaClientMapper) setTopicConfigurationsResourceToKafka(req *v1alpha2.SetTopicConfigurationsRequest_SetConfiguration) kmsg.AlterConfigsRequestResourceConfig {
func (*mapper) setTopicConfigurationsResourceToKafka(req *v1alpha2.SetTopicConfigurationsRequest_SetConfiguration) kmsg.AlterConfigsRequestResourceConfig {
kafkaReq := kmsg.NewAlterConfigsRequestResourceConfig()
kafkaReq.Name = req.Name
kafkaReq.Value = req.Value

return kafkaReq
}

func (*mapper) topicMountRequestToAdminAPI(req *v1alpha2.MountTopicsRequest) rpadmin.MountConfiguration {
topics := make([]rpadmin.InboundTopic, len(req.Topics))
for i, topic := range req.Topics {
var alias *rpadmin.NamespacedTopic
if topic.Alias != nil {
alias = &rpadmin.NamespacedTopic{
Namespace: kmsg.StringPtr("kafka"),
Topic: *topic.Alias,
}
}

topics[i] = rpadmin.InboundTopic{
SourceTopic: rpadmin.NamespacedTopic{Topic: topic.SourceTopic},
Alias: alias,
}
}

return rpadmin.MountConfiguration{Topics: topics}
}

func (*mapper) topicUnmountRequestToAdminAPI(req *v1alpha2.UnmountTopicsRequest) rpadmin.UnmountConfiguration {
namespacedTopics := make([]rpadmin.NamespacedTopic, len(req.Topics))
for i, topic := range req.Topics {
namespacedTopics[i] = rpadmin.NamespacedTopic{
Topic: topic,
}
}

return rpadmin.UnmountConfiguration{
Topics: namespacedTopics,
}
}
4 changes: 2 additions & 2 deletions backend/pkg/api/connect/service/topic/v1alpha2/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDeleteTopicRequestToKafka(t *testing.T) {
},
}

kafkaMapper := kafkaClientMapper{}
kafkaMapper := mapper{}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestDescribeTopicConfigsToKafka(t *testing.T) {
},
}

kafkaMapper := kafkaClientMapper{}
kafkaMapper := mapper{}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
59 changes: 49 additions & 10 deletions backend/pkg/api/connect/service/topic/v1alpha2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,20 @@ import (
"github.com/redpanda-data/console/backend/pkg/console"
v1alpha2 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha2"
"github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha2/dataplanev1alpha2connect"
"github.com/redpanda-data/console/backend/pkg/redpanda"
)

var _ dataplanev1alpha2connect.TopicServiceHandler = (*Service)(nil)

// Service that implements the UserServiceHandler interface. This includes all
// RPCs to manage Redpanda or Kafka users.
type Service struct {
cfg *config.Config
logger *zap.Logger
consoleSvc console.Servicer
mapper kafkaClientMapper
defaulter defaulter
cfg *config.Config
logger *zap.Logger
consoleSvc console.Servicer
redpandaSvc *redpanda.Service
mapper mapper
defaulter defaulter
}

// ListTopics lists all Kafka topics with their most important metadata.
Expand Down Expand Up @@ -335,13 +337,15 @@ func (s *Service) SetTopicConfigurations(ctx context.Context, req *connect.Reque
func NewService(cfg *config.Config,
logger *zap.Logger,
consoleSvc console.Servicer,
redpandaSvc *redpanda.Service,
) *Service {
return &Service{
cfg: cfg,
logger: logger,
consoleSvc: consoleSvc,
mapper: kafkaClientMapper{},
defaulter: defaulter{},
cfg: cfg,
logger: logger,
consoleSvc: consoleSvc,
redpandaSvc: redpandaSvc,
mapper: mapper{},
defaulter: defaulter{},
}
}

Expand Down Expand Up @@ -387,3 +391,38 @@ func (s *Service) CreateTopic(ctx context.Context, req *connect.Request[v1alpha2
connectResponse.Header().Set("x-http-code", strconv.Itoa(http.StatusCreated))
return connectResponse, nil
}

// MountTopics initiates the process of mounting one or more topics from tiered
// storage, so that the topics become accessible by Kafka clients.
func (s *Service) MountTopics(ctx context.Context, req *connect.Request[v1alpha2.MountTopicsRequest]) (*connect.Response[v1alpha2.MountTopicsResponse], error) {
if !s.cfg.Redpanda.AdminAPI.Enabled {
return nil, apierrors.NewRedpandaAdminAPINotConfiguredError()
}

mountConfig := s.mapper.topicMountRequestToAdminAPI(req.Msg)

migrationInfo, err := s.redpandaSvc.MountTopics(ctx, mountConfig)
if err != nil {
return nil, apierrors.NewConnectErrorFromRedpandaAdminAPIError(err, "")
}

return connect.NewResponse(&v1alpha2.MountTopicsResponse{MigrationId: int32(migrationInfo.ID)}), nil
}

// UnmountTopics initiates the process of unmounting one or more topics from
// local brokers to the configured tiered storage. This frees up the cluster
// resources and the topic is not accessible anymore until it's re-mounted.
func (s *Service) UnmountTopics(ctx context.Context, req *connect.Request[v1alpha2.UnmountTopicsRequest]) (*connect.Response[v1alpha2.UnmountTopicsResponse], error) {
if !s.cfg.Redpanda.AdminAPI.Enabled {
return nil, apierrors.NewRedpandaAdminAPINotConfiguredError()
}

unmountConfig := s.mapper.topicUnmountRequestToAdminAPI(req.Msg)

migrationInfo, err := s.redpandaSvc.UnmountTopics(ctx, unmountConfig)
if err != nil {
return nil, apierrors.NewConnectErrorFromRedpandaAdminAPIError(err, "")
}

return connect.NewResponse(&v1alpha2.UnmountTopicsResponse{MigrationId: int32(migrationInfo.ID)}), nil
}
2 changes: 1 addition & 1 deletion backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (api *API) setupConnectWithGRPCGateway(r chi.Router) {
// v1alpha2

aclSvc := apiaclsvc.NewService(api.Cfg, api.Logger.Named("kafka_service"), api.ConsoleSvc)
topicSvc := topicsvc.NewService(api.Cfg, api.Logger.Named("topic_service"), api.ConsoleSvc)
topicSvc := topicsvc.NewService(api.Cfg, api.Logger.Named("topic_service"), api.ConsoleSvc, api.RedpandaSvc)
var userSvc dataplanev1alpha2connect.UserServiceHandler = apiusersvc.NewService(api.Cfg, api.Logger.Named("user_service"), api.RedpandaSvc, api.ConsoleSvc, api.Hooks.Authorization.IsProtectedKafkaUser)
transformSvc := transformsvc.NewService(api.Cfg, api.Logger.Named("transform_service"), api.RedpandaSvc, v)
kafkaConnectSvc := apikafkaconnectsvc.NewService(api.Cfg, api.Logger.Named("kafka_connect_service"), api.ConnectSvc)
Expand Down
Loading
Loading