Skip to content

Commit

Permalink
Kafka: kafka client created in main for ui
Browse files Browse the repository at this point in the history
  • Loading branch information
matskramer committed Sep 18, 2024
1 parent 26d6fdf commit 0f3c868
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 22 deletions.
16 changes: 15 additions & 1 deletion cmd/ui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"vc/internal/ui/apiv1"
"vc/internal/ui/httpserver"
"vc/pkg/configuration"
"vc/pkg/kafka"
"vc/pkg/logger"
"vc/pkg/trace"
)
Expand Down Expand Up @@ -45,7 +46,20 @@ func main() {
panic(err)
}

apiClient, err := apiv1.New(ctx, cfg, tracer, log.New("api_client"))
var kafkaMessageProducer *apiv1.KafkaMessageProducer
if cfg.Common.Kafka.Enabled {
// Start max one producer client for each service
var err error
kafkaMessageProducer, err = apiv1.NewKafkaMessageProducer(kafka.CommonProducerConfig(cfg), ctx, cfg, tracer, log)
if err != nil {
panic(err)
}
services["kafkaMessageProducer"] = kafkaMessageProducer
} else {
log.Info("Kafka disabled - no Kafka message producer created")
}

apiClient, err := apiv1.New(ctx, cfg, tracer, kafkaMessageProducer, log.New("api_client"))
services["apiClient"] = apiClient
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/apigw/httpserver/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (s *Service) endpointUpload(ctx context.Context, c *gin.Context) (any, erro
return nil, err
}

if s.kafkaMessageProducer != nil {
if s.config.Common.Kafka.Enabled {
err := s.kafkaMessageProducer.Upload(request)
if err != nil {
span.SetStatus(codes.Error, err.Error())
Expand Down
27 changes: 7 additions & 20 deletions internal/ui/apiv1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package apiv1

import (
"context"
"vc/pkg/kafka"
"vc/pkg/logger"
"vc/pkg/model"
"vc/pkg/trace"
Expand All @@ -18,23 +17,14 @@ type Client struct {
kafkaMessageProducer *KafkaMessageProducer
}

func New(ctx context.Context, cfg *model.Cfg, tp *trace.Tracer, log *logger.Log) (*Client, error) {
func New(ctx context.Context, cfg *model.Cfg, tp *trace.Tracer, kafkaMessageProducer *KafkaMessageProducer, log *logger.Log) (*Client, error) {
c := &Client{
cfg: cfg,
tp: tp,
log: log,
apigwClient: NewAPIGWClient(cfg, tp, log.New("apiwg_client")),
mockasClient: NewMockASClient(cfg, tp, log.New("mockas_client")),
}

if cfg.Common.Kafka.Enabled {
kafkaMessageProducer, err := NewKafkaMessageProducer(kafka.CommonProducerConfig(cfg), ctx, cfg, tp, log)
if err != nil {
return nil, err
}
c.kafkaMessageProducer = kafkaMessageProducer
} else {
log.Info("Kafka disabled - no Kafka message producer client created")
cfg: cfg,
tp: tp,
log: log,
apigwClient: NewAPIGWClient(cfg, tp, log.New("apiwg_client")),
mockasClient: NewMockASClient(cfg, tp, log.New("mockas_client")),
kafkaMessageProducer: kafkaMessageProducer,
}

c.log.Info("Started")
Expand All @@ -43,8 +33,5 @@ func New(ctx context.Context, cfg *model.Cfg, tp *trace.Tracer, log *logger.Log)
}

func (c *Client) Close(ctx context.Context) error {
if c.kafkaMessageProducer != nil {
return c.kafkaMessageProducer.Close(ctx)
}
return nil
}

0 comments on commit 0f3c868

Please sign in to comment.