Skip to content

Commit

Permalink
ms & sch & ds & tnd
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed Mar 22, 2024
1 parent 54d2909 commit 100761c
Show file tree
Hide file tree
Showing 28 changed files with 1,948 additions and 37 deletions.
4 changes: 3 additions & 1 deletion controllers/component_manager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"fmt"
Expand Down Expand Up @@ -41,7 +43,7 @@ func NewComponentManager(
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus, nil)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
Expand Down
150 changes: 150 additions & 0 deletions controllers/component_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package controllers

import (
"context"

apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
"github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig"
)

type component interface {
Sync(ctx context.Context) error
Status(ctx context.Context) (components.ComponentStatus, error)
GetName() string
GetType() consts.ComponentType
}

type componentRegistry struct {
comps map[string]component
byType map[consts.ComponentType][]component
}

func (cr *componentRegistry) add(comp component) {
cr.comps[comp.GetName()] = comp
compsOfSameType := cr.byType[comp.GetType()]
compsOfSameType = append(compsOfSameType, comp)
}

func (cr *componentRegistry) list() []component {
var result []component
for _, comp := range cr.comps {
result = append(result, comp)
}
return result
}
func (cr *componentRegistry) listByType(types ...consts.ComponentType) []component {
var result []component
for _, compType := range types {
result = append(result, cr.byType[compType]...)
}
return result
}

// TODO (l0kix2): cleanup New* for components since they not need deps anymore.
func buildComponentRegistry(
ytsaurus *apiProxy.Ytsaurus,
) *componentRegistry {
registry := &componentRegistry{
comps: make(map[string]component),
}

resource := ytsaurus.GetResource()
clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client())
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

yc := components.NewYtsaurusClient(cfgen, ytsaurus, nil)
registry.add(yc)

d := components.NewDiscovery(cfgen, ytsaurus)
registry.add(d)

m := components.NewMaster(cfgen, ytsaurus, yc)
registry.add(m)

for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hp := components.NewHTTPProxy(cfgen, ytsaurus, nil, hpSpec)
registry.add(hp)
}

nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain)
if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 {
for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes {
dnd := components.NewDataNode(nodeCfgGen, ytsaurus, nil, dndSpec)
registry.add(dnd)
}
}

if resource.Spec.UI != nil {
ui := components.NewUI(cfgen, ytsaurus, nil)
registry.add(ui)
}

if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 {
for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies {
rp := components.NewRPCProxy(cfgen, ytsaurus, nil, rpSpec)
registry.add(rp)
}
}

if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 {
for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies {
tp := components.NewTCPProxy(cfgen, ytsaurus, nil, tpSpec)
registry.add(tp)
}
}

if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
end := components.NewExecNode(nodeCfgGen, ytsaurus, nil, endSpec)
registry.add(end)
}
}

tndCount := 0
if resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes {
tnd := components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0)
registry.add(tnd)
tndCount++
}
}
if resource.Spec.Schedulers != nil {
s := components.NewScheduler(cfgen, ytsaurus, nil, nil, nil)
registry.add(s)
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus, nil)
registry.add(ca)
}

var q component
if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
q = components.NewQueryTracker(cfgen, ytsaurus, yc, nil)
registry.add(q)
}

if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
qa := components.NewQueueAgent(cfgen, ytsaurus, yc, nil, nil)
registry.add(qa)
}

if resource.Spec.YQLAgents != nil {
yqla := components.NewYQLAgent(cfgen, ytsaurus, nil)
registry.add(yqla)
}

if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil {
strawberry := components.NewStrawberryController(cfgen, ytsaurus, nil, nil, nil)
registry.add(strawberry)
}

if resource.Spec.MasterCaches != nil {
mc := components.NewMasterCache(cfgen, ytsaurus)
registry.add(mc)
}

return registry
}
65 changes: 65 additions & 0 deletions controllers/helpers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package controllers

import (
"context"
"fmt"
"net"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
)

const (
Expand All @@ -30,3 +37,61 @@ func getClusterDomain(client client.Client) string {

return clusterDomain
}

func logComponentStatuses(
ctx context.Context,
registry *componentRegistry,
statuses map[string]components.ComponentStatus,
componentsOrder [][]consts.ComponentType,
resource *ytv1.Ytsaurus,
) {
logger := log.FromContext(ctx)

var readyComponents []string
var notReadyComponents []string

for batchIndex, typesInBatch := range componentsOrder {
compsInBatch := registry.listByType(typesInBatch...)
for _, comp := range compsInBatch {
name := comp.GetName()
status := statuses[name]

if status.SyncStatus == components.SyncStatusReady {
readyComponents = append(readyComponents, name)
} else {
notReadyComponents = append(notReadyComponents, name)
}

logger.V(1).Info(
fmt.Sprintf(
"%d.%s %s: %s",
batchIndex,
statusToSymbol(status.SyncStatus),
name,
status.Message,
),
)
}
}

// NB: This log is mentioned at https://ytsaurus.tech/docs/ru/admin-guide/install-ytsaurus
logger.Info("Ytsaurus sync status",
"notReadyComponents", notReadyComponents,
"readyComponents", readyComponents,
"updateState", resource.Status.UpdateStatus.State,
"clusterState", resource.Status.State)

}

func statusToSymbol(st components.SyncStatus) string {
switch st {
case components.SyncStatusReady:
return "[v]"
case components.SyncStatusBlocked:
return "[x]"
case components.SyncStatusUpdating:
return "[.]"
default:
return "[ ]"
}
}
4 changes: 3 additions & 1 deletion controllers/sync.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"time"
Expand Down Expand Up @@ -243,7 +245,7 @@ func getComponentNames(components []components.Component) []string {
return names
}

func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
func (r *YtsaurusReconciler) SyncOld(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
logger := log.FromContext(ctx)

if !resource.Spec.IsManaged {
Expand Down
128 changes: 128 additions & 0 deletions controllers/ytsaurus_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package controllers

import (
"context"
"errors"
"fmt"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
)

func getStatuses(
ctx context.Context,
registry *componentRegistry,
) (map[string]components.ComponentStatus, error) {
statuses := make(map[string]components.ComponentStatus)
for _, c := range registry.list() {
componentStatus, err := c.Status(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err)
}
statuses[c.GetName()] = componentStatus
}

return statuses, nil
}

// componentsOrder is an order in which components will be built.
// The main rules are:
// - if component A needs component B for building (running jobs, using yt client, etc), it
// should be placed in some of the sections after component B section.
var componentsOrder = [][]consts.ComponentType{
{
// Discovery doesn't depend on anyone.
consts.DiscoveryType,
// Proxy are placed first since it is needed for ytsaurus client to work.
consts.HttpProxyType,
},
{
consts.YtsaurusClientType,
},
{
// Master uses ytsaurus client when being updated, so if both are down,
// ytsaurus client (and proxies) should be built first.
consts.MasterType,
},
{
consts.UIType,
consts.RpcProxyType,
consts.TcpProxyType,
consts.DataNodeType,
consts.MasterCacheType,
},
{
consts.TabletNodeType,
consts.ExecNodeType,
},
{
consts.SchedulerType,
consts.ControllerAgentType,
consts.QueryTrackerType,
consts.QueueAgentType,
consts.YqlAgentType,
},
{
consts.StrawberryControllerType,
},
}

func syncComponents(
ctx context.Context,
registry *componentRegistry,
resource *ytv1.Ytsaurus,
) (components.ComponentStatus, error) {
statuses, err := getStatuses(ctx, registry)
if err != nil {
return components.ComponentStatus{}, err
}
logComponentStatuses(ctx, registry, statuses, componentsOrder, resource)

var batchToSync []component
for _, typesInBatch := range componentsOrder {
compsInBatch := registry.listByType(typesInBatch...)
for _, comp := range compsInBatch {
status := statuses[comp.GetName()]
if status.SyncStatus != components.SyncStatusReady && batchToSync == nil {
batchToSync = compsInBatch
}
}
}

if batchToSync == nil {
// YTsaurus is running and happy.
return components.ComponentStatus{SyncStatus: components.SyncStatusReady}, nil
}

// Run sync for non-ready components in the batch.
batchNotReadyStatuses := make(map[string]components.ComponentStatus)
var errList []error
for _, comp := range batchToSync {
status := statuses[comp.GetName()]
if status.SyncStatus == components.SyncStatusReady {
continue
}
batchNotReadyStatuses[comp.GetName()] = status
if err = comp.Sync(ctx); err != nil {
errList = append(errList, fmt.Errorf("failed to sync %s: %w", comp.GetName(), err))
}
}

if len(errList) != 0 {
return components.ComponentStatus{}, errors.Join(errList...)
}

// Choosing the most important status for the batch to report up.
batchStatus := components.ComponentStatus{
SyncStatus: components.SyncStatusUpdating,
Message: "",
}
for compName, st := range batchNotReadyStatuses {
if st.SyncStatus == components.SyncStatusBlocked {
batchStatus.SyncStatus = components.SyncStatusBlocked
}
batchStatus.Message += fmt.Sprintf("; %s=%s (%s)", compName, st.SyncStatus, st.Message)
}
return batchStatus, nil
}
Loading

0 comments on commit 100761c

Please sign in to comment.