Skip to content

Commit

Permalink
feat(sinker): cleaning and fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Sep 25, 2023
1 parent 2390bf1 commit 9e67a63
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 38 deletions.
2 changes: 1 addition & 1 deletion maestro/kubecontrol/kubecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Service interface {
CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) (string, error)

// KillOtelCollector - kill an existing collector by id, terminating by the ownerID, sinkID without the file
KillOtelCollector(ctx context.Context, ownerID, sinkID string) error
KillOtelCollector(ctx context.Context, deploymentName, sinkID string) error
}

func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) (string, error) {
Expand Down
41 changes: 4 additions & 37 deletions maestro/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,8 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
if sink == nil {
svc.logger.Warn("collector not found for sink, depleting collector", zap.String("collector name", collector.Name))
sinkId := collector.Name[5:41]
deploymentEntry, err := svc.deploymentSvc.GetDeploymentByCollectorName(ctx, collector.Name)
if err != nil {
svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId))
deploymentName := "otel-" + sinkId
err = svc.kubecontrol.KillOtelCollector(ctx, deploymentName, sinkId)
if err != nil {
svc.logger.Error("error removing otel collector, manual intervention required", zap.Error(err))
}
continue
}
err = svc.kubecontrol.DeleteOtelCollector(ctx, "", sinkId, deploymentEntry)
deploymentName := "otel-" + sinkId
err = svc.kubecontrol.KillOtelCollector(ctx, deploymentName, sinkId)
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
Expand All @@ -203,39 +194,15 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
svc.logger.Error("error during analyze logs", zap.Error(logsErr))
continue
}
lastActivity, activityErr := svc.eventStore.GetActivity(sink.Id)
// if logs reported 'active' status
// here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle
idleLimit := time.Now().Unix() - idleTimeSeconds // within 10 minutes
if idleLimit >= lastActivity {
//changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err)
//changing state on redis sinker
data.State.SetFromString("idle")
svc.eventStore.UpdateSinkStateCache(ctx, data)
deploymentEntry, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id)
if errDeploy != nil {
svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr))
continue
}
err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deploymentEntry)
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
continue
}

//set the new sink status if changed during checks
if sink.GetState() != status && status != "" {
svc.logger.Info("changing sink status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
if err != nil {
svc.logger.Error("error updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("error_message (opt)", err.Error()), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
} else {
svc.logger.Info("updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
// changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, status, logsErr, err)
// changing state on redis sinker
data.State.SetFromString(status)
svc.eventStore.UpdateSinkStateCache(ctx, data)
_ = svc.deploymentSvc.UpdateStatus(ctx, sink.OwnerID, sink.Id, status, logsErr.Error())
}
}
}
Expand Down

0 comments on commit 9e67a63

Please sign in to comment.