Skip to content

Commit

Permalink
feat(maestro): WIP, still missing monitor piece.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Sep 18, 2023
1 parent c7f920c commit 6759d35
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 178 deletions.
103 changes: 0 additions & 103 deletions maestro/deployment/deploy_service.go

This file was deleted.

41 changes: 33 additions & 8 deletions maestro/deployment/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Service interface {
// GetDeploymentByCollectorName to be used to get the deployment information for creating the collector or monitoring the collector
GetDeploymentByCollectorName(ctx context.Context, collectorName string) (*Deployment, error)
// NotifyCollector add collector information to deployment
NotifyCollector(ctx context.Context, ownerID string, sinkId string, collectorName string, operation string, status string, errorMessage string) error
NotifyCollector(ctx context.Context, ownerID string, sinkId string, operation string, status string, errorMessage string) (string, error)
}

type deploymentService struct {
Expand Down Expand Up @@ -66,7 +66,7 @@ func (d *deploymentService) CreateDeployment(ctx context.Context, deployment *De
}
d.logger.Info("added deployment", zap.String("id", added.Id),
zap.String("ownerID", added.OwnerID), zap.String("sinkID", added.SinkID))
err = d.maestroProducer.PublishSinkStatus(added.OwnerID, added.SinkID, "unknown", "")
err = d.maestroProducer.PublishSinkStatus(ctx, added.OwnerID, added.SinkID, "unknown", "")
if err != nil {
return err
}
Expand Down Expand Up @@ -108,15 +108,22 @@ func (d *deploymentService) GetDeployment(ctx context.Context, ownerID string, s
// UpdateDeployment will stop the running collector if any, and change the deployment, it will not spin the collector back up,
// it will wait for the next sink.activity
func (d *deploymentService) UpdateDeployment(ctx context.Context, deployment *Deployment) error {
now := time.Now()
got, err := d.dbRepository.FindByOwnerAndSink(ctx, deployment.OwnerID, deployment.SinkID)
if err != nil {
return errors.New("could not find deployment to update")
}
// Spin down the collector if it is running
err = d.kubecontrol.KillOtelCollector(ctx, got.OwnerID, got.SinkID)
if err != nil {
d.logger.Warn("could not stop running collector, will try to update anyway", zap.Error(err))
}
err = deployment.Merge(*got)
if err != nil {
d.logger.Error("error during merge of deployments", zap.Error(err))
return err
}
deployment.LastCollectorStopTime = &now
if deployment == nil {
return errors.New("deployment is nil")
}
Expand All @@ -129,17 +136,35 @@ func (d *deploymentService) UpdateDeployment(ctx context.Context, deployment *De
return nil
}

func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string, sinkId string, collectorName string, operation string, status string, errorMessage string) error {
func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string, sinkId string, operation string, status string, errorMessage string) (string, error) {
got, err := d.dbRepository.FindByOwnerAndSink(ctx, ownerID, sinkId)
if err != nil {
return errors.New("could not find deployment to update")
return "", errors.New("could not find deployment to update")
}
now := time.Now()
got.CollectorName = collectorName
if operation == "delete" {
got.LastCollectorStopTime = &now
err = d.kubecontrol.KillOtelCollector(ctx, got.OwnerID, got.SinkID)
if err != nil {
d.logger.Warn("could not stop running collector, will try to update anyway", zap.Error(err))
}
} else if operation == "deploy" {
got.LastCollectorDeployTime = &now
// Spin up the collector
if got.LastCollectorDeployTime != nil || got.LastCollectorDeployTime.Before(now) {
if got.LastCollectorStopTime != nil || got.LastCollectorStopTime.Before(now) {
d.logger.Debug("collector is not running deploying")
manifest, err := d.configBuilder.BuildDeploymentConfig(got)
if err != nil {
d.logger.Error("error during build deployment config", zap.Error(err))
return "", err
}
got.CollectorName, err = d.kubecontrol.CreateOtelCollector(ctx, got.OwnerID, got.SinkID, manifest)
got.LastCollectorDeployTime = &now
} else {
d.logger.Info("collector is already running")
}
}

}
if status != "" {
got.LastStatus = status
Expand All @@ -151,13 +176,13 @@ func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string,
}
updated, err := d.dbRepository.Update(ctx, got)
if err != nil {
return err
return "", err
}
d.logger.Info("updated deployment information for collector and status or error",
zap.String("ownerID", updated.OwnerID), zap.String("sinkID", updated.SinkID),
zap.String("collectorName", updated.CollectorName),
zap.String("status", updated.LastStatus), zap.String("errorMessage", updated.LastErrorMessage))
return nil
return updated.CollectorName, nil
}

// UpdateStatus this will change the status in postgres and notify sinks service to show new status to user
Expand Down
48 changes: 10 additions & 38 deletions maestro/kubecontrol/kubecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"os"
"os/exec"
"strings"
"time"
)

const namespace = "otelcollectors"
Expand Down Expand Up @@ -58,19 +57,13 @@ func NewService(logger *zap.Logger) Service {

type Service interface {
// CreateOtelCollector - create an existing collector by id
CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error

// DeleteOtelCollector - delete an existing collector by id
DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error

// UpdateOtelCollector - update an existing collector by id
UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error
CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) (string, error)

// KillOtelCollector - kill an existing collector by id, terminating by the ownerID, sinkID without the file
KillOtelCollector(ctx context.Context, ownerID, sinkID string) error
}

func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error {
func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) (string, error) {
_, status, err := svc.getDeploymentState(ctx, ownerID, sinkId)
fileContent := []byte(manifest)
tmp := strings.Split(string(fileContent), "\n")
Expand All @@ -83,7 +76,7 @@ func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerI
err = os.WriteFile("/tmp/otel-collector-"+sinkId+".json", []byte(newContent), 0644)
if err != nil {
svc.logger.Error("failed to write file content", zap.Error(err))
return err
return "", err
}
stdOutListenFunction := func(out *bufio.Scanner, err *bufio.Scanner) {
for out.Scan() {
Expand All @@ -100,8 +93,9 @@ func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerI
if err == nil {
svc.logger.Info(fmt.Sprintf("successfully %s the otel-collector for sink-id: %s", operation, sinkId))
}

return nil
// TODO this will be retrieved once we move to K8s SDK
collectorName := fmt.Sprintf("otelcol-%s-%s", ownerID, sinkId)
return collectorName, nil
}

func execCmd(_ context.Context, cmd *exec.Cmd, logger *zap.Logger, stdOutFunc func(stdOut *bufio.Scanner, stdErr *bufio.Scanner)) (*bufio.Scanner, *bufio.Scanner, error) {
Expand Down Expand Up @@ -146,35 +140,13 @@ func (svc *deployService) getDeploymentState(ctx context.Context, _, sinkId stri
return "", "deleted", nil
}

func (svc *deployService) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error {
err := svc.collectorDeploy(ctx, "apply", ownerID, sinkID, deploymentEntry)
func (svc *deployService) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) (string, error) {
col, err := svc.collectorDeploy(ctx, "apply", ownerID, sinkID, deploymentEntry)
if err != nil {
return err
return "", err
}

return nil
}

func (svc *deployService) UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error {
err := svc.DeleteOtelCollector(ctx, ownerID, sinkID, deploymentEntry)
if err != nil {
return err
}
// Time to wait until K8s completely removes before re-creating
time.Sleep(3 * time.Second)
err = svc.CreateOtelCollector(ctx, ownerID, sinkID, deploymentEntry)
if err != nil {
return err
}
return nil
}

func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error {
err := svc.collectorDeploy(ctx, "delete", ownerID, sinkID, deploymentEntry)
if err != nil {
return err
}
return nil
return col, nil
}

func (svc *deployService) KillOtelCollector(ctx context.Context, deploymentName string, sinkId string) error {
Expand Down
10 changes: 5 additions & 5 deletions maestro/redis/consumer/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"errors"
"github.com/go-redis/redis/v8"
"github.com/orb-community/orb/maestro/deployment"
maestroredis "github.com/orb-community/orb/maestro/redis"
"github.com/orb-community/orb/maestro/service"
sinkspb "github.com/orb-community/orb/sinks/pb"
"go.uber.org/zap"
)
Expand All @@ -17,7 +17,7 @@ type SinksListenerController interface {

type sinksListenerService struct {
logger *zap.Logger
deploymentService deployment.DeployService
deploymentService service.EventService
redisClient *redis.Client
sinksClient sinkspb.SinkServiceClient
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error
}
}

// handleSinksUpdate logic moved to deployment.DeployService
// handleSinksUpdate logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received maestro UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkUpdate(ctx, event)
Expand All @@ -94,7 +94,7 @@ func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event mae
return nil
}

// handleSinksDelete logic moved to deployment.DeployService
// handleSinksDelete logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkDelete(ctx, event)
Expand All @@ -104,7 +104,7 @@ func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event mae
return nil
}

// handleSinksCreate logic moved to deployment.DeployService
// handleSinksCreate logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received event to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkCreate(ctx, event)
Expand Down
18 changes: 2 additions & 16 deletions maestro/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,6 @@ const (
exists = "BUSYGROUP Consumer Group name already exists"
)

type Subscriber interface {
CreateDeploymentEntry(ctx context.Context, sink config.SinkData) error
GetDeploymentEntryFromSinkId(ctx context.Context, sinkId string) (string, error)

UpdateSinkCache(ctx context.Context, data config.SinkData) (err error)
UpdateSinkStateCache(ctx context.Context, data config.SinkData) (err error)
PublishSinkStateChange(sink *sinkspb.SinkRes, status string, logsErr error, err error)

GetActivity(sinkID string) (int64, error)
RemoveSinkActivity(ctx context.Context, sinkId string) error

SubscribeSinkerEvents(context context.Context) error
}

type eventStore struct {
kafkaUrl string
kubecontrol kubecontrol.Service
Expand All @@ -60,8 +46,8 @@ type eventStore struct {
}

func NewEventStore(streamRedisClient, sinkerKeyRedisClient *redis.Client, kafkaUrl string, kubecontrol kubecontrol.Service,
esconsumer string, sinksClient sinkspb.SinkServiceClient, logger *zap.Logger, service deployment.Service) Subscriber {
return eventStore{
esconsumer string, sinksClient sinkspb.SinkServiceClient, logger *zap.Logger, service deployment.Service) *eventStore {
return &eventStore{
kafkaUrl: kafkaUrl,
kubecontrol: kubecontrol,
streamRedisClient: streamRedisClient,
Expand Down
2 changes: 0 additions & 2 deletions maestro/redis/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type SinkerUpdateEvent struct {
SinkID string
Owner string
State string
Msg string
Timestamp time.Time
}

Expand All @@ -38,7 +37,6 @@ func (cse SinkerUpdateEvent) Encode() map[string]interface{} {
"sink_id": cse.SinkID,
"owner": cse.Owner,
"state": cse.State,
"msg": cse.Msg,
"timestamp": cse.Timestamp.Unix(),
"operation": SinkerUpdate,
}
Expand Down
Loading

0 comments on commit 6759d35

Please sign in to comment.