diff --git a/cmd/ui/main.go b/cmd/ui/main.go index 5f6c7b04..bf9cf86e 100644 --- a/cmd/ui/main.go +++ b/cmd/ui/main.go @@ -11,6 +11,7 @@ import ( "vc/internal/ui/apiv1" "vc/internal/ui/httpserver" "vc/pkg/configuration" + "vc/pkg/kafka" "vc/pkg/logger" "vc/pkg/trace" ) @@ -45,7 +46,20 @@ func main() { panic(err) } - apiClient, err := apiv1.New(ctx, cfg, tracer, log.New("api_client")) + var kafkaMessageProducer *apiv1.KafkaMessageProducer + if cfg.Common.Kafka.Enabled { + // Start max one producer client for each service + var err error + kafkaMessageProducer, err = apiv1.NewKafkaMessageProducer(kafka.CommonProducerConfig(cfg), ctx, cfg, tracer, log) + if err != nil { + panic(err) + } + services["kafkaMessageProducer"] = kafkaMessageProducer + } else { + log.Info("Kafka disabled - no Kafka message producer created") + } + + apiClient, err := apiv1.New(ctx, cfg, tracer, kafkaMessageProducer, log.New("api_client")) services["apiClient"] = apiClient if err != nil { panic(err) diff --git a/internal/apigw/httpserver/endpoints.go b/internal/apigw/httpserver/endpoints.go index 309923f2..a5734b50 100644 --- a/internal/apigw/httpserver/endpoints.go +++ b/internal/apigw/httpserver/endpoints.go @@ -20,7 +20,7 @@ func (s *Service) endpointUpload(ctx context.Context, c *gin.Context) (any, erro return nil, err } - if s.kafkaMessageProducer != nil { + if s.config.Common.Kafka.Enabled { err := s.kafkaMessageProducer.Upload(request) if err != nil { span.SetStatus(codes.Error, err.Error()) diff --git a/internal/ui/apiv1/client.go b/internal/ui/apiv1/client.go index 93cb3179..d6cf5bfd 100644 --- a/internal/ui/apiv1/client.go +++ b/internal/ui/apiv1/client.go @@ -2,7 +2,6 @@ package apiv1 import ( "context" - "vc/pkg/kafka" "vc/pkg/logger" "vc/pkg/model" "vc/pkg/trace" @@ -18,23 +17,14 @@ type Client struct { kafkaMessageProducer *KafkaMessageProducer } -func New(ctx context.Context, cfg *model.Cfg, tp *trace.Tracer, log *logger.Log) (*Client, error) { +func New(ctx context.Context, cfg *model.Cfg, tp *trace.Tracer, kafkaMessageProducer *KafkaMessageProducer, log *logger.Log) (*Client, error) { c := &Client{ - cfg: cfg, - tp: tp, - log: log, - apigwClient: NewAPIGWClient(cfg, tp, log.New("apiwg_client")), - mockasClient: NewMockASClient(cfg, tp, log.New("mockas_client")), - } - - if cfg.Common.Kafka.Enabled { - kafkaMessageProducer, err := NewKafkaMessageProducer(kafka.CommonProducerConfig(cfg), ctx, cfg, tp, log) - if err != nil { - return nil, err - } - c.kafkaMessageProducer = kafkaMessageProducer - } else { - log.Info("Kafka disabled - no Kafka message producer client created") + cfg: cfg, + tp: tp, + log: log, + apigwClient: NewAPIGWClient(cfg, tp, log.New("apiwg_client")), + mockasClient: NewMockASClient(cfg, tp, log.New("mockas_client")), + kafkaMessageProducer: kafkaMessageProducer, } c.log.Info("Started") @@ -43,8 +33,5 @@ func New(ctx context.Context, cfg *model.Cfg, tp *trace.Tracer, log *logger.Log) } func (c *Client) Close(ctx context.Context) error { - if c.kafkaMessageProducer != nil { - return c.kafkaMessageProducer.Close(ctx) - } return nil }