Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] Split full update flow by components / linear components workflow #179

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ linters:
- ineffassign # detects when assignments to existing variables are not used
- staticcheck # is a go vet on steroids, applying a ton of static analysis checks
- typecheck # like the front-end of a Go compiler, parses and type-checks Go code
- unused # checks for unused constants, variables, functions and types
# TODO: uncomment later when code would be deleted
# - unused # checks for unused constants, variables, functions and types

## preset "bugs"
- asasalint # checks for pass []any as any in variadic func(...any)
Expand Down Expand Up @@ -113,6 +114,8 @@ linters:
# - whitespace # detects leading and trailing whitespace
# - wrapcheck # checks that errors returned from external packages are wrapped
# - wsl # add or remove empty lines
disable:
- unused

linters-settings:
cyclop:
Expand Down
4 changes: 3 additions & 1 deletion controllers/component_manager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"fmt"
Expand Down Expand Up @@ -40,7 +42,7 @@ func NewComponentManager(
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus, nil)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
Expand Down
162 changes: 162 additions & 0 deletions controllers/component_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package controllers

import (
"context"

apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
"github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig"
)

type component interface {
Sync(ctx context.Context) error
Status(ctx context.Context) (components.ComponentStatus, error)
GetName() string
GetType() consts.ComponentType
}

type masterComponent interface {
component
BuildInitial(ctx context.Context) error
IsBuildInitially(ctx context.Context) (bool, error)
NeedBuild(ctx context.Context) (bool, error)
IsRebuildStarted() bool
}

type componentRegistry struct {
comps map[string]component
master masterComponent
byType map[consts.ComponentType][]component
}

func (cr *componentRegistry) add(comp component) {
cr.comps[comp.GetName()] = comp
cr.byType[comp.GetType()] = append(cr.byType[comp.GetType()], comp)
if comp.GetType() == consts.MasterType {
cr.master = comp.(masterComponent)
}
}

// func (cr *componentRegistry) list() []component {
// var result []component
// for _, comp := range cr.comps {
// result = append(result, comp)
// }
// return result
// }
func (cr *componentRegistry) listByType(types ...consts.ComponentType) []component {
var result []component
for _, compType := range types {
result = append(result, cr.byType[compType]...)
}
return result
}

// TODO (l0kix2): cleanup New* for components since they not need deps anymore.
func buildComponentRegistry(
ytsaurus *apiProxy.Ytsaurus,
) *componentRegistry {
registry := &componentRegistry{
comps: make(map[string]component),
byType: make(map[consts.ComponentType][]component),
}

resource := ytsaurus.GetResource()
clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client())
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

yc := components.NewYtsaurusClient(cfgen, ytsaurus, nil)
registry.add(yc)

d := components.NewDiscovery(cfgen, ytsaurus)
registry.add(d)

m := components.NewMaster(cfgen, ytsaurus, yc)
registry.add(m)

for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hp := components.NewHTTPProxy(cfgen, ytsaurus, nil, hpSpec)
registry.add(hp)
}

nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain)
if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 {
for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes {
dnd := components.NewDataNode(nodeCfgGen, ytsaurus, nil, dndSpec)
registry.add(dnd)
}
}

if resource.Spec.UI != nil {
ui := components.NewUI(cfgen, ytsaurus, nil)
registry.add(ui)
}

if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 {
for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies {
rp := components.NewRPCProxy(cfgen, ytsaurus, nil, rpSpec)
registry.add(rp)
}
}

if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 {
for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies {
tp := components.NewTCPProxy(cfgen, ytsaurus, nil, tpSpec)
registry.add(tp)
}
}

if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
end := components.NewExecNode(nodeCfgGen, ytsaurus, nil, endSpec)
registry.add(end)
}
}

tndCount := 0
if resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes {
tnd := components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0)
registry.add(tnd)
tndCount++
}
}
if resource.Spec.Schedulers != nil {
s := components.NewScheduler(cfgen, ytsaurus, nil, nil, nil)
registry.add(s)
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus, nil)
registry.add(ca)
}

var q component
if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
q = components.NewQueryTracker(cfgen, ytsaurus, yc, nil)
registry.add(q)
}

if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
qa := components.NewQueueAgent(cfgen, ytsaurus, yc, nil, nil)
registry.add(qa)
}

if resource.Spec.YQLAgents != nil {
yqla := components.NewYQLAgent(cfgen, ytsaurus, nil)
registry.add(yqla)
}

if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil {
strawberry := components.NewStrawberryController(cfgen, ytsaurus, nil, nil, nil)
registry.add(strawberry)
}

if resource.Spec.MasterCaches != nil {
mc := components.NewMasterCache(cfgen, ytsaurus)
registry.add(mc)
}

return registry
}
108 changes: 108 additions & 0 deletions controllers/helpers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package controllers

import (
"context"
"fmt"
"net"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
)

const (
Expand All @@ -30,3 +37,104 @@ func getClusterDomain(client client.Client) string {

return clusterDomain
}

func logComponentStatuses(
ctx context.Context,
registry *componentRegistry,
statuses map[string]components.ComponentStatus,
componentsOrder [][]consts.ComponentType,
resource *ytv1.Ytsaurus,
) error {
logger := log.FromContext(ctx)
logLine := logger.V(1).Info

var readyComponents []string
var notReadyComponents []string

masterBuildStatus, err := getStatusForMasterBuild(ctx, registry.master)
if err != nil {
return err
}
logLine(
fmt.Sprintf(
"%s %s %s: %s",
"0.",
statusToSymbol(masterBuildStatus.SyncStatus),
registry.master.GetName()+" Build",
masterBuildStatus.Message,
),
)

for batchIndex := 1; batchIndex <= len(componentsOrder); batchIndex++ {
typesInBatch := componentsOrder[batchIndex-1]
compsInBatch := registry.listByType(typesInBatch...)
for compIndex, comp := range compsInBatch {
name := comp.GetName()
status := statuses[name]

if status.SyncStatus == components.SyncStatusReady {
readyComponents = append(readyComponents, name)
} else {
notReadyComponents = append(notReadyComponents, name)
}

logName := name
if name == registry.master.GetName() {
logName = registry.master.GetName() + " Update"
}

batchIndexStr := " "
if compIndex == 0 {
batchIndexStr = fmt.Sprintf("%d.", batchIndex)
}

logLine(
fmt.Sprintf(
"%s %s %s: %s",
batchIndexStr,
statusToSymbol(status.SyncStatus),
logName,
status.Message,
),
)
}
}

// NB: This log is mentioned at https://ytsaurus.tech/docs/ru/admin-guide/install-ytsaurus
logger.Info("Ytsaurus sync status",
"notReadyComponents", notReadyComponents,
"readyComponents", readyComponents,
"updateState", resource.Status.UpdateStatus.State,
"clusterState", resource.Status.State)
return nil
}

func statusToSymbol(st components.SyncStatus) string {
switch st {
case components.SyncStatusReady:
return "[v]"
case components.SyncStatusBlocked:
return "[x]"
case components.SyncStatusUpdating:
return "[.]"
default:
return "[ ]"
}
}

func stageToUpdateStatus(st components.ComponentStatus) ytv1.UpdateState {
if st.Stage == components.MasterUpdatePossibleCheckStepName && st.SyncStatus == components.SyncStatusBlocked {
return ytv1.UpdateStateImpossibleToStart
}

return map[string]ytv1.UpdateState{
components.MasterUpdatePossibleCheckStepName: ytv1.UpdateStatePossibilityCheck,
components.MasterEnableSafeModeStepName: ytv1.UpdateStateWaitingForSafeModeEnabled,
components.MasterBuildSnapshotsStepName: ytv1.UpdateStateWaitingForSnapshots,
components.MasterCheckSnapshotsBuiltStepName: ytv1.UpdateStateWaitingForSnapshots,
components.MasterStartPrepareMasterExitReadOnlyStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly,
components.MasterWaitMasterExitReadOnlyPreparedStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly,
components.MasterWaitMasterExitsReadOnlyStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly,
components.MasterDisableSafeModeStepName: ytv1.UpdateStateWaitingForSafeModeDisabled,
}[st.Stage]
}
4 changes: 3 additions & 1 deletion controllers/sync.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"fmt"
Expand Down Expand Up @@ -504,7 +506,7 @@ func chooseUpdateFlow(spec ytv1.YtsaurusSpec, needUpdate []components.Component)
}
}

func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
func (r *YtsaurusReconciler) SyncOld(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
logger := log.FromContext(ctx)

if !resource.Spec.IsManaged {
Expand Down
3 changes: 2 additions & 1 deletion controllers/ytsaurus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -48,7 +49,7 @@ func (r *YtsaurusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

var ytsaurus ytv1.Ytsaurus
if err := r.Get(ctx, req.NamespacedName, &ytsaurus); err != nil {
logger.Error(err, "unable to fetch Ytsaurus")
logger.Info("unable to fetch Ytsaurus")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
Expand Down
Loading
Loading