Skip to content

Commit

Permalink
Fixes after main rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed May 24, 2024
1 parent 3e0a832 commit 90d7ab0
Show file tree
Hide file tree
Showing 65 changed files with 2,987 additions and 130 deletions.
4 changes: 3 additions & 1 deletion controllers/component_manager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"fmt"
Expand Down Expand Up @@ -40,7 +42,7 @@ func NewComponentManager(
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus, nil)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
Expand Down
161 changes: 161 additions & 0 deletions controllers/component_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package controllers

import (
"context"

apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
"github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig"
)

type component interface {
Sync(ctx context.Context) error
Status(ctx context.Context) (components.ComponentStatus, error)
GetName() string
GetType() consts.ComponentType
}

type masterComponent interface {
component
BuildInitial(ctx context.Context) error
IsBuildInitially(ctx context.Context) (bool, error)
NeedBuild(ctx context.Context) (bool, error)
IsRebuildStarted() bool
}

type componentRegistry struct {
comps map[string]component
master masterComponent
byType map[consts.ComponentType][]component
}

func (cr *componentRegistry) add(comp component) {
cr.comps[comp.GetName()] = comp
cr.byType[comp.GetType()] = append(cr.byType[comp.GetType()], comp)
if comp.GetType() == consts.MasterType {
cr.master = comp.(masterComponent)
}
}
func (cr *componentRegistry) list() []component {
var result []component
for _, comp := range cr.comps {
result = append(result, comp)
}
return result
}
func (cr *componentRegistry) listByType(types ...consts.ComponentType) []component {
var result []component
for _, compType := range types {
result = append(result, cr.byType[compType]...)
}
return result
}

// TODO (l0kix2): cleanup New* for components since they not need deps anymore.
func buildComponentRegistry(
ytsaurus *apiProxy.Ytsaurus,
) *componentRegistry {
registry := &componentRegistry{
comps: make(map[string]component),
byType: make(map[consts.ComponentType][]component),
}

resource := ytsaurus.GetResource()
clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client())
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

yc := components.NewYtsaurusClient(cfgen, ytsaurus, nil)
registry.add(yc)

d := components.NewDiscovery(cfgen, ytsaurus)
registry.add(d)

m := components.NewMaster(cfgen, ytsaurus, yc)
registry.add(m)

for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hp := components.NewHTTPProxy(cfgen, ytsaurus, nil, hpSpec)
registry.add(hp)
}

nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain)
if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 {
for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes {
dnd := components.NewDataNode(nodeCfgGen, ytsaurus, nil, dndSpec)
registry.add(dnd)
}
}

if resource.Spec.UI != nil {
ui := components.NewUI(cfgen, ytsaurus, nil)
registry.add(ui)
}

if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 {
for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies {
rp := components.NewRPCProxy(cfgen, ytsaurus, nil, rpSpec)
registry.add(rp)
}
}

if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 {
for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies {
tp := components.NewTCPProxy(cfgen, ytsaurus, nil, tpSpec)
registry.add(tp)
}
}

if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
end := components.NewExecNode(nodeCfgGen, ytsaurus, nil, endSpec)
registry.add(end)
}
}

tndCount := 0
if resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes {
tnd := components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0)
registry.add(tnd)
tndCount++
}
}
if resource.Spec.Schedulers != nil {
s := components.NewScheduler(cfgen, ytsaurus, nil, nil, nil)
registry.add(s)
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus, nil)
registry.add(ca)
}

var q component
if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
q = components.NewQueryTracker(cfgen, ytsaurus, yc, nil)
registry.add(q)
}

if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
qa := components.NewQueueAgent(cfgen, ytsaurus, yc, nil, nil)
registry.add(qa)
}

if resource.Spec.YQLAgents != nil {
yqla := components.NewYQLAgent(cfgen, ytsaurus, nil)
registry.add(yqla)
}

if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil {
strawberry := components.NewStrawberryController(cfgen, ytsaurus, nil, nil, nil)
registry.add(strawberry)
}

if resource.Spec.MasterCaches != nil {
mc := components.NewMasterCache(cfgen, ytsaurus)
registry.add(mc)
}

return registry
}
108 changes: 108 additions & 0 deletions controllers/helpers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package controllers

import (
"context"
"fmt"
"net"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
)

const (
Expand All @@ -30,3 +37,104 @@ func getClusterDomain(client client.Client) string {

return clusterDomain
}

func logComponentStatuses(
ctx context.Context,
registry *componentRegistry,
statuses map[string]components.ComponentStatus,
componentsOrder [][]consts.ComponentType,
resource *ytv1.Ytsaurus,
) error {
logger := log.FromContext(ctx)
logLine := logger.V(1).Info

var readyComponents []string
var notReadyComponents []string

masterBuildStatus, err := getStatusForMasterBuild(ctx, registry.master)
if err != nil {
return err
}
logLine(
fmt.Sprintf(
"%s %s %s: %s",
"0.",
statusToSymbol(masterBuildStatus.SyncStatus),
registry.master.GetName()+" Build",
masterBuildStatus.Message,
),
)

for batchIndex := 1; batchIndex <= len(componentsOrder); batchIndex++ {
typesInBatch := componentsOrder[batchIndex-1]
compsInBatch := registry.listByType(typesInBatch...)
for compIndex, comp := range compsInBatch {
name := comp.GetName()
status := statuses[name]

if status.SyncStatus == components.SyncStatusReady {
readyComponents = append(readyComponents, name)
} else {
notReadyComponents = append(notReadyComponents, name)
}

logName := name
if name == registry.master.GetName() {
logName = registry.master.GetName() + " Update"
}

batchIndexStr := " "
if compIndex == 0 {
batchIndexStr = fmt.Sprintf("%d.", batchIndex)
}

logLine(
fmt.Sprintf(
"%s %s %s: %s",
batchIndexStr,
statusToSymbol(status.SyncStatus),
logName,
status.Message,
),
)
}
}

// NB: This log is mentioned at https://ytsaurus.tech/docs/ru/admin-guide/install-ytsaurus
logger.Info("Ytsaurus sync status",
"notReadyComponents", notReadyComponents,
"readyComponents", readyComponents,
"updateState", resource.Status.UpdateStatus.State,
"clusterState", resource.Status.State)
return nil
}

func statusToSymbol(st components.SyncStatus) string {
switch st {
case components.SyncStatusReady:
return "[v]"
case components.SyncStatusBlocked:
return "[x]"
case components.SyncStatusUpdating:
return "[.]"
default:
return "[ ]"
}
}

func stageToUpdateStatus(st components.ComponentStatus) ytv1.UpdateState {
if st.Stage == components.MasterUpdatePossibleCheckStepName && st.SyncStatus == components.SyncStatusBlocked {
return ytv1.UpdateStateImpossibleToStart
}

return map[string]ytv1.UpdateState{
components.MasterUpdatePossibleCheckStepName: ytv1.UpdateStatePossibilityCheck,
components.MasterEnableSafeModeStepName: ytv1.UpdateStateWaitingForSafeModeEnabled,
components.MasterBuildSnapshotsStepName: ytv1.UpdateStateWaitingForSnapshots,
components.MasterCheckSnapshotsBuiltStepName: ytv1.UpdateStateWaitingForSnapshots,
components.MasterStartPrepareMasterExitReadOnlyStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly,
components.MasterWaitMasterExitReadOnlyPreparedStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly,
components.MasterWaitMasterExitsReadOnlyStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly,
components.MasterDisableSafeModeStepName: ytv1.UpdateStateWaitingForSafeModeDisabled,
}[st.Stage]
}
4 changes: 3 additions & 1 deletion controllers/sync.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"fmt"
Expand Down Expand Up @@ -504,7 +506,7 @@ func chooseUpdateFlow(spec ytv1.YtsaurusSpec, needUpdate []components.Component)
}
}

func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
func (r *YtsaurusReconciler) SyncOld(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
logger := log.FromContext(ctx)

if !resource.Spec.IsManaged {
Expand Down
3 changes: 2 additions & 1 deletion controllers/ytsaurus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -48,7 +49,7 @@ func (r *YtsaurusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

var ytsaurus ytv1.Ytsaurus
if err := r.Get(ctx, req.NamespacedName, &ytsaurus); err != nil {
logger.Error(err, "unable to fetch Ytsaurus")
logger.Info("unable to fetch Ytsaurus")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
Expand Down
Loading

0 comments on commit 90d7ab0

Please sign in to comment.