diff --git a/controllers/component_manager.go b/controllers/component_manager.go index eafb2c79..4e69337f 100644 --- a/controllers/component_manager.go +++ b/controllers/component_manager.go @@ -1,5 +1,7 @@ package controllers +// TODO: file will be deleted after this refactoring. No need to review changes. + import ( "context" "fmt" @@ -41,7 +43,7 @@ func NewComponentManager( cfgen := ytconfig.NewGenerator(resource, clusterDomain) d := components.NewDiscovery(cfgen, ytsaurus) - m := components.NewMaster(cfgen, ytsaurus) + m := components.NewMaster(cfgen, ytsaurus, nil) var hps []components.Component for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies { hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec)) diff --git a/controllers/component_registry.go b/controllers/component_registry.go new file mode 100644 index 00000000..88ef58ee --- /dev/null +++ b/controllers/component_registry.go @@ -0,0 +1,161 @@ +package controllers + +import ( + "context" + + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" + "github.com/ytsaurus/yt-k8s-operator/pkg/consts" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +type component interface { + Sync(ctx context.Context) error + Status(ctx context.Context) (components.ComponentStatus, error) + GetName() string + GetType() consts.ComponentType +} + +type masterComponent interface { + component + BuildInitial(ctx context.Context) error + IsBuildInitially(ctx context.Context) (bool, error) + NeedBuild(ctx context.Context) (bool, error) + IsRebuildStarted() bool +} + +type componentRegistry struct { + comps map[string]component + master masterComponent + byType map[consts.ComponentType][]component +} + +func (cr *componentRegistry) add(comp component) { + cr.comps[comp.GetName()] = comp + cr.byType[comp.GetType()] = append(cr.byType[comp.GetType()], comp) + if comp.GetType() == consts.MasterType { + cr.master = comp.(masterComponent) + } +} +func (cr *componentRegistry) list() []component { + var result []component + for _, comp := range cr.comps { + result = append(result, comp) + } + return result +} +func (cr *componentRegistry) listByType(types ...consts.ComponentType) []component { + var result []component + for _, compType := range types { + result = append(result, cr.byType[compType]...) + } + return result +} + +// TODO (l0kix2): cleanup New* for components since they not need deps anymore. +func buildComponentRegistry( + ytsaurus *apiProxy.Ytsaurus, +) *componentRegistry { + registry := &componentRegistry{ + comps: make(map[string]component), + byType: make(map[consts.ComponentType][]component), + } + + resource := ytsaurus.GetResource() + clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client()) + cfgen := ytconfig.NewGenerator(resource, clusterDomain) + + yc := components.NewYtsaurusClient(cfgen, ytsaurus, nil) + registry.add(yc) + + d := components.NewDiscovery(cfgen, ytsaurus) + registry.add(d) + + m := components.NewMaster(cfgen, ytsaurus, yc) + registry.add(m) + + for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies { + hp := components.NewHTTPProxy(cfgen, ytsaurus, nil, hpSpec) + registry.add(hp) + } + + nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain) + if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 { + for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes { + dnd := components.NewDataNode(nodeCfgGen, ytsaurus, nil, dndSpec) + registry.add(dnd) + } + } + + if resource.Spec.UI != nil { + ui := components.NewUI(cfgen, ytsaurus, nil) + registry.add(ui) + } + + if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 { + for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies { + rp := components.NewRPCProxy(cfgen, ytsaurus, nil, rpSpec) + registry.add(rp) + } + } + + if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 { + for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies { + tp := components.NewTCPProxy(cfgen, ytsaurus, nil, tpSpec) + registry.add(tp) + } + } + + if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 { + for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes { + end := components.NewExecNode(nodeCfgGen, ytsaurus, nil, endSpec) + registry.add(end) + } + } + + tndCount := 0 + if resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { + for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes { + tnd := components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0) + registry.add(tnd) + tndCount++ + } + } + if resource.Spec.Schedulers != nil { + s := components.NewScheduler(cfgen, ytsaurus, nil, nil, nil) + registry.add(s) + } + + if resource.Spec.ControllerAgents != nil { + ca := components.NewControllerAgent(cfgen, ytsaurus, nil) + registry.add(ca) + } + + var q component + if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { + q = components.NewQueryTracker(cfgen, ytsaurus, yc, nil) + registry.add(q) + } + + if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { + qa := components.NewQueueAgent(cfgen, ytsaurus, yc, nil, nil) + registry.add(qa) + } + + if resource.Spec.YQLAgents != nil { + yqla := components.NewYQLAgent(cfgen, ytsaurus, nil) + registry.add(yqla) + } + + if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil { + strawberry := components.NewStrawberryController(cfgen, ytsaurus, nil, nil, nil) + registry.add(strawberry) + } + + if resource.Spec.MasterCaches != nil { + mc := components.NewMasterCache(cfgen, ytsaurus) + registry.add(mc) + } + + return registry +} diff --git a/controllers/helpers.go b/controllers/helpers.go index 0603c1f7..3539b6f6 100644 --- a/controllers/helpers.go +++ b/controllers/helpers.go @@ -1,11 +1,18 @@ package controllers import ( + "context" + "fmt" "net" "os" "strings" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" + "github.com/ytsaurus/yt-k8s-operator/pkg/consts" ) const ( @@ -30,3 +37,105 @@ func getClusterDomain(client client.Client) string { return clusterDomain } + +func logComponentStatuses( + ctx context.Context, + registry *componentRegistry, + statuses map[string]components.ComponentStatus, + componentsOrder [][]consts.ComponentType, + resource *ytv1.Ytsaurus, +) error { + logger := log.FromContext(ctx) + //logLine := logger.V(1).Info + logLine := fmt.Println + + var readyComponents []string + var notReadyComponents []string + + masterBuildStatus, err := getStatusForMasterBuild(ctx, registry.master) + if err != nil { + return err + } + logLine( + fmt.Sprintf( + "%s %s %s: %s", + "0.", + statusToSymbol(masterBuildStatus.SyncStatus), + registry.master.GetName()+" Build", + masterBuildStatus.Message, + ), + ) + + for batchIndex := 1; batchIndex <= len(componentsOrder); batchIndex++ { + typesInBatch := componentsOrder[batchIndex-1] + compsInBatch := registry.listByType(typesInBatch...) + for compIndex, comp := range compsInBatch { + name := comp.GetName() + status := statuses[name] + + if status.SyncStatus == components.SyncStatusReady { + readyComponents = append(readyComponents, name) + } else { + notReadyComponents = append(notReadyComponents, name) + } + + logName := name + if name == registry.master.GetName() { + logName = registry.master.GetName() + " Update" + } + + batchIndexStr := " " + if compIndex == 0 { + batchIndexStr = fmt.Sprintf("%d.", batchIndex) + } + + logLine( + fmt.Sprintf( + "%s %s %s: %s", + batchIndexStr, + statusToSymbol(status.SyncStatus), + logName, + status.Message, + ), + ) + } + } + + // NB: This log is mentioned at https://ytsaurus.tech/docs/ru/admin-guide/install-ytsaurus + logger.Info("Ytsaurus sync status", + "notReadyComponents", notReadyComponents, + "readyComponents", readyComponents, + "updateState", resource.Status.UpdateStatus.State, + "clusterState", resource.Status.State) + return nil +} + +func statusToSymbol(st components.SyncStatus) string { + switch st { + case components.SyncStatusReady: + return "[v]" + case components.SyncStatusBlocked: + return "[x]" + case components.SyncStatusUpdating: + return "[.]" + default: + return "[ ]" + } +} + +func stageToUpdateStatus(st components.ComponentStatus) ytv1.UpdateState { + if st.Stage == components.MasterUpdatePossibleCheckStepName && st.SyncStatus == components.SyncStatusBlocked { + return ytv1.UpdateStateImpossibleToStart + } + + return map[string]ytv1.UpdateState{ + components.MasterUpdatePossibleCheckStepName: ytv1.UpdateStatePossibilityCheck, + components.MasterEnableSafeModeStepName: ytv1.UpdateStateWaitingForSafeModeEnabled, + components.MasterBuildSnapshotsStepName: ytv1.UpdateStateWaitingForSnapshots, + components.MasterCheckSnapshotsBuiltStepName: ytv1.UpdateStateWaitingForSnapshots, + components.MasterStartPrepareMasterExitReadOnlyStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly, + components.MasterWaitMasterExitReadOnlyPreparedStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly, + components.MasterWaitMasterExitsReadOnlyStepName: ytv1.UpdateStateWaitingForMasterExitReadOnly, + components.MasterDisableSafeModeStepName: ytv1.UpdateStateWaitingForSafeModeDisabled, + }[st.Stage] +} diff --git a/controllers/sync.go b/controllers/sync.go index 7717303b..367f0046 100644 --- a/controllers/sync.go +++ b/controllers/sync.go @@ -1,5 +1,7 @@ package controllers +// TODO: file will be deleted after this refactoring. No need to review changes. + import ( "context" "time" @@ -243,7 +245,7 @@ func getComponentNames(components []components.Component) []string { return names } -func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) { +func (r *YtsaurusReconciler) SyncOld(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) { logger := log.FromContext(ctx) if !resource.Spec.IsManaged { diff --git a/controllers/ytsaurus_controller.go b/controllers/ytsaurus_controller.go index 621dd4b8..2743c305 100644 --- a/controllers/ytsaurus_controller.go +++ b/controllers/ytsaurus_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -48,7 +49,7 @@ func (r *YtsaurusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c var ytsaurus ytv1.Ytsaurus if err := r.Get(ctx, req.NamespacedName, &ytsaurus); err != nil { - logger.Error(err, "unable to fetch Ytsaurus") + logger.Info("unable to fetch Ytsaurus") // we'll ignore not-found errors, since they can't be fixed by an immediate // requeue (we'll need to wait for a new notification), and we can get them // on deleted requests. diff --git a/controllers/ytsaurus_flow.go b/controllers/ytsaurus_flow.go new file mode 100644 index 00000000..8f8b1eaa --- /dev/null +++ b/controllers/ytsaurus_flow.go @@ -0,0 +1,174 @@ +package controllers + +import ( + "context" + "errors" + "fmt" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" + "github.com/ytsaurus/yt-k8s-operator/pkg/consts" +) + +func getStatuses( + ctx context.Context, + registry *componentRegistry, + order [][]consts.ComponentType, +) (map[string]components.ComponentStatus, error) { + statuses := make(map[string]components.ComponentStatus) + for _, batch := range order { + batchComps := registry.listByType(batch...) + for _, c := range batchComps { + componentStatus, err := c.Status(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err) + } + statuses[c.GetName()] = componentStatus + } + } + return statuses, nil +} + +// componentsOrder is an order in which components will be built. +// The main rules are: +// - if component A needs component B for building (running jobs, using yt client, etc), it +// should be placed in some of the sections after component B section. +var componentsOrder = [][]consts.ComponentType{ + // At first, we check if master is *built* (not updated) before everything else. + { + consts.YtsaurusClientType, + consts.DiscoveryType, + consts.HttpProxyType, + consts.RpcProxyType, + consts.TcpProxyType, + consts.DataNodeType, + consts.ExecNodeType, + consts.MasterCacheType, + }, + { + consts.TabletNodeType, + consts.UIType, + consts.ControllerAgentType, + consts.YqlAgentType, + }, + { + consts.SchedulerType, + consts.QueryTrackerType, + consts.QueueAgentType, + }, + { + consts.StrawberryControllerType, + }, + { + // Here we UPDATE master after all the components, because it shouldn't be newer + // than others. + // Currently, we guarantee that only for the case when components are not redefine their images. + consts.MasterType, + }, +} + +func syncComponents( + ctx context.Context, + registry *componentRegistry, + resource *ytv1.Ytsaurus, +) (components.ComponentStatus, error) { + statuses, err := getStatuses(ctx, registry, componentsOrder) + if err != nil { + return components.ComponentStatus{}, err + } + if err = logComponentStatuses(ctx, registry, statuses, componentsOrder, resource); err != nil { + return components.ComponentStatus{}, err + } + + // Special check before everything other component (including master) update. + masterBuildStatus, err := getStatusForMasterBuild(ctx, registry.master) + if err != nil { + return components.ComponentStatus{}, err + } + switch masterBuildStatus.SyncStatus { + case components.SyncStatusBlocked: + return masterBuildStatus, nil + case components.SyncStatusNeedSync: + return masterBuildStatus, registry.master.BuildInitial(ctx) + } + + var batchToSync []component + for _, typesInBatch := range componentsOrder { + compsInBatch := registry.listByType(typesInBatch...) + for _, comp := range compsInBatch { + status := statuses[comp.GetName()] + if status.SyncStatus != components.SyncStatusReady && batchToSync == nil { + batchToSync = compsInBatch + } + } + } + + if batchToSync == nil { + // YTsaurus is running and happy. + return components.ComponentStatus{SyncStatus: components.SyncStatusReady}, nil + } + + // Run sync for non-ready components in the batch. + batchNotReadyStatuses := make(map[string]components.ComponentStatus) + var errList []error + for _, comp := range batchToSync { + status := statuses[comp.GetName()] + if status.SyncStatus == components.SyncStatusReady { + continue + } + batchNotReadyStatuses[comp.GetName()] = status + if err = comp.Sync(ctx); err != nil { + errList = append(errList, fmt.Errorf("failed to sync %s: %w", comp.GetName(), err)) + } + } + + if len(errList) != 0 { + return components.ComponentStatus{}, errors.Join(errList...) + } + + // Choosing the most important status for the batch to report up. + batchStatus := components.ComponentStatus{ + SyncStatus: components.SyncStatusUpdating, + Message: "", + Stage: "", + } + for compName, st := range batchNotReadyStatuses { + if st.SyncStatus == components.SyncStatusBlocked { + batchStatus.SyncStatus = components.SyncStatusBlocked + } + if compName == registry.master.GetName() { + // TODO: add scheduler (and maybe query tracker if they will be in separate batches) + batchStatus.Stage = st.Stage + } + batchStatus.Message += fmt.Sprintf("; %s=%s (%s)", compName, st.SyncStatus, st.Message) + } + return batchStatus, nil +} + +func getStatusForMasterBuild(ctx context.Context, master masterComponent) (components.ComponentStatus, error) { + masterBuiltInitially, err := master.IsBuildInitially(ctx) + if err != nil { + return components.ComponentStatus{}, err + } + masterNeedBuild, err := master.NeedBuild(ctx) + if err != nil { + return components.ComponentStatus{}, err + } + masterRebuildStarted := master.IsRebuildStarted() + + if !masterBuiltInitially { + // This only happens once on cluster initialization. + return components.NeedSyncStatus("master initial build"), nil + } + + if masterNeedBuild && !masterRebuildStarted { + // Not all the master's sub-resources are running, and it is NOT because master is in update stage + // (in which is reasonable to expect some not-yet-built sub-resources). + // So we can't proceed with update, because almost every component need working master to be updated properly. + return components.ComponentStatus{ + SyncStatus: components.SyncStatusBlocked, + Message: "Master is not built, cluster can't start the update", + }, nil + } + return components.ReadyStatus(), nil +} diff --git a/controllers/ytsaurus_sync.go b/controllers/ytsaurus_sync.go new file mode 100644 index 00000000..aa16b41f --- /dev/null +++ b/controllers/ytsaurus_sync.go @@ -0,0 +1,68 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" +) + +var ( + requeueNot = ctrl.Result{Requeue: false} + requeueASAP = ctrl.Result{Requeue: true} + requeueSoon = ctrl.Result{RequeueAfter: 1 * time.Second} + requeueLater = ctrl.Result{RequeueAfter: 1 * time.Minute} +) + +func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + if !resource.Spec.IsManaged { + logger.Info("Ytsaurus cluster is not managed by controller, do nothing") + return ctrl.Result{RequeueAfter: time.Minute}, nil + } + + ytsaurus := apiProxy.NewYtsaurus(resource, r.Client, r.Recorder, r.Scheme) + compRegistry := buildComponentRegistry(ytsaurus) + st, err := syncComponents(ctx, compRegistry, ytsaurus.GetResource()) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to sync components: %w", err) + } + + var requeue ctrl.Result + var clusterState ytv1.ClusterState + + switch st.SyncStatus { + case components.SyncStatusReady: + logger.Info("YTsaurus running and happy") + requeue = requeueNot + clusterState = ytv1.ClusterStateRunning + case components.SyncStatusBlocked: + logger.Info(fmt.Sprintf("Components update is blocked. Human is needed. %s", st.Message)) + requeue = requeueLater + clusterState = ytv1.ClusterStateCancelUpdate + default: + requeue = requeueSoon + clusterState = ytv1.ClusterStateUpdating + } + + stateManager := components.NewStateManagerFromYtsaurus(ytsaurus) + err = stateManager.SetClusterState(ctx, clusterState) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to save cluster state to %s: %w", clusterState, err) + } + // TODO: move that after statuses collection and before sync, so in case + // correct stage is also indicated. + updateState := stageToUpdateStatus(st) + err = stateManager.SetClusterUpdateState(ctx, updateState) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to save update state to %s: %w", updateState, err) + } + return requeue, nil +} diff --git a/main.go b/main.go index 29f88c93..307357a7 100644 --- a/main.go +++ b/main.go @@ -23,10 +23,11 @@ import ( "go.uber.org/zap/zapcore" - "github.com/ytsaurus/yt-k8s-operator/controllers" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" + "github.com/ytsaurus/yt-k8s-operator/controllers" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -65,7 +66,7 @@ func main() { "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") opts := zap.Options{ - Development: true, + Development: false, TimeEncoder: zapcore.ISO8601TimeEncoder, } opts.BindFlags(flag.CommandLine) diff --git a/pkg/components/component.go b/pkg/components/component.go index 556ae23f..0616fc56 100644 --- a/pkg/components/component.go +++ b/pkg/components/component.go @@ -6,6 +6,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/yt-k8s-operator/pkg/labeller" ) @@ -13,12 +14,14 @@ import ( type SyncStatus string const ( - SyncStatusBlocked SyncStatus = "Blocked" SyncStatusNeedFullUpdate SyncStatus = "NeedFullUpdate" SyncStatusNeedLocalUpdate SyncStatus = "NeedLocalUpdate" SyncStatusPending SyncStatus = "Pending" - SyncStatusReady SyncStatus = "Ready" SyncStatusUpdating SyncStatus = "Updating" + + SyncStatusNeedSync SyncStatus = "NeedSync" + SyncStatusReady SyncStatus = "Ready" + SyncStatusBlocked SyncStatus = "Blocked" ) func IsRunningStatus(status SyncStatus) bool { @@ -28,18 +31,31 @@ func IsRunningStatus(status SyncStatus) bool { type ComponentStatus struct { SyncStatus SyncStatus Message string + Stage string } func NewComponentStatus(status SyncStatus, message string) ComponentStatus { - return ComponentStatus{status, message} + return ComponentStatus{SyncStatus: status, Message: message} } func WaitingStatus(status SyncStatus, event string) ComponentStatus { - return ComponentStatus{status, fmt.Sprintf("Wait for %s", event)} + return ComponentStatus{SyncStatus: status, Message: fmt.Sprintf("Wait for %s", event)} } func SimpleStatus(status SyncStatus) ComponentStatus { - return ComponentStatus{status, string(status)} + return ComponentStatus{SyncStatus: status, Message: string(status)} +} + +func NeedSyncStatus(message string) ComponentStatus { + return ComponentStatus{SyncStatus: SyncStatusNeedSync, Message: message} +} + +func UpdatingStatus(message string) ComponentStatus { + return ComponentStatus{SyncStatus: SyncStatusUpdating, Message: message} +} + +func ReadyStatus() ComponentStatus { + return ComponentStatus{SyncStatus: SyncStatusReady} } type Component interface { @@ -53,6 +69,32 @@ type Component interface { IsUpdatable() bool } +type conditionManagerIface interface { + SetTrue(context.Context, ConditionName) error + SetTrueMsg(context.Context, ConditionName, string) error + SetFalse(context.Context, ConditionName) error + SetFalseMsg(context.Context, ConditionName, string) error + Set(context.Context, ConditionName, bool) error + SetMsg(context.Context, ConditionName, bool, string) error + SetCond(context.Context, Condition) error + SetCondMany(context.Context, ...Condition) error + SetCondMsg(context.Context, Condition, string) error + IsTrue(ConditionName) bool + IsFalse(ConditionName) bool + Is(condition Condition) bool + IsSatisfied(condition Condition) bool + IsNotSatisfied(condition Condition) bool + All(conds ...Condition) bool + Any(conds ...Condition) bool +} + +type stateManagerInterface interface { + SetTabletCellBundles(context.Context, []ytv1.TabletCellBundleInfo) error + GetTabletCellBundles() []ytv1.TabletCellBundleInfo + SetMasterMonitoringPaths(context.Context, []string) error + GetMasterMonitoringPaths() []string +} + // Following structs are used as a base for implementing YTsaurus components objects. // baseComponent is a base struct intendend for use in the simplest components and remote components // (the ones that don't have access to the ytsaurus resource). @@ -73,6 +115,11 @@ func (c *baseComponent) GetName() string { type localComponent struct { baseComponent ytsaurus *apiproxy.Ytsaurus + + // currently we have it in the component, but in the future we may + // want to receive it from the outside of the component. + condManager conditionManagerIface + stateManager stateManagerInterface } // localServerComponent is a base structs for components which have access to ytsaurus resource, @@ -89,6 +136,8 @@ func newLocalComponent( return localComponent{ baseComponent: baseComponent{labeller: labeller}, ytsaurus: ytsaurus, + condManager: NewConditionManagerFromYtsaurus(ytsaurus), + stateManager: NewStateManagerFromYtsaurus(ytsaurus), } } @@ -111,13 +160,8 @@ func newLocalServerComponent( server server, ) localServerComponent { return localServerComponent{ - localComponent: localComponent{ - baseComponent: baseComponent{ - labeller: labeller, - }, - ytsaurus: ytsaurus, - }, - server: server, + localComponent: newLocalComponent(labeller, ytsaurus), + server: server, } } diff --git a/pkg/components/component_flow.go b/pkg/components/component_flow.go new file mode 100644 index 00000000..bc3f87fc --- /dev/null +++ b/pkg/components/component_flow.go @@ -0,0 +1,192 @@ +package components + +import ( + "context" + "fmt" + + "github.com/ytsaurus/yt-k8s-operator/pkg/resources" +) + +const ( + StepStartBuild = "StartBuild" + StepWaitBuildFinished = "WaitBuildFinished" + StepInitStarted = "InitStarted" + StepInitFinished = "InitFinished" + StepCheckUpdateRequired = "CheckUpdateRequired" + StepUpdate = "Update" + StepStartRebuild = "StartRebuild" + StepWaitPodsRemoved = "WaitPodsRemoved" + StepPodsCreate = "PodsCreate" + StepWaitRebuildFinished = "WaitRebuildFinished" +) + +type withName interface { + GetName() string +} + +type fetchableWithName interface { + resources.Fetchable + withName +} + +func flowToStatus(ctx context.Context, c fetchableWithName, flow Step, condManager conditionManagerIface) (ComponentStatus, error) { + if err := c.Fetch(ctx); err != nil { + return ComponentStatus{}, fmt.Errorf("failed to fetch component %s: %w", c.GetName(), err) + } + + return flow.Status(ctx, condManager) +} + +func flowToSync(ctx context.Context, flow Step, condManager conditionManagerIface) error { + _, err := flow.Run(ctx, condManager) + return err +} + +// TODO: make all components have standard method for build/check issync. +func getStandardStartBuildStep(c withName, build func(ctx context.Context) error) StepRun { + name := c.GetName() + buildStartedCond := buildStarted(name) + return StepRun{ + StepMeta: StepMeta{ + Name: StepStartBuild, + RunIfCondition: not(buildStartedCond), + OnSuccessCondition: buildStartedCond, + }, + Body: build, + } +} +func getStandardWaitBuildFinishedStep(c withName, check func(ctx context.Context) (ok bool, err error)) StepCheck { + name := c.GetName() + builtFinishedCond := buildFinished(name) + return StepCheck{ + StepMeta: StepMeta{ + Name: StepWaitBuildFinished, + RunIfCondition: not(builtFinishedCond), + OnSuccessCondition: builtFinishedCond, + }, + Body: check, + } +} +func getStandardStartRebuildStep(c withName, run func(ctx context.Context) error) StepRun { + name := c.GetName() + rebuildStartedCond := rebuildStarted(name) + return StepRun{ + StepMeta: StepMeta{ + Name: StepStartRebuild, + RunIfCondition: not(rebuildStartedCond), + OnSuccessCondition: rebuildStartedCond, + }, + Body: run, + } +} +func getStandardWaitPodsRemovedStep(c withName, check func(ctx context.Context) bool) StepCheck { + name := c.GetName() + podsRemovedCond := podsRemoved(name) + return StepCheck{ + StepMeta: StepMeta{ + Name: StepWaitPodsRemoved, + RunIfCondition: not(podsRemovedCond), + OnSuccessCondition: podsRemovedCond, + }, + Body: func(ctx context.Context) (ok bool, err error) { + return check(ctx), nil + }, + } +} +func getStandardPodsCreateStep(c withName, build func(ctx context.Context) error) StepRun { + name := c.GetName() + podsCreatedCond := podsCreated(name) + return StepRun{ + StepMeta: StepMeta{ + Name: StepPodsCreate, + RunIfCondition: not(podsCreatedCond), + OnSuccessCondition: podsCreatedCond, + }, + Body: build, + } +} +func getStandardWaiRebuildFinishedStep(c withName, check func(ctx context.Context) (ok bool, err error)) StepCheck { + name := c.GetName() + rebuildFinishedCond := rebuildFinished(name) + return StepCheck{ + StepMeta: StepMeta{ + Name: StepWaitRebuildFinished, + RunIfCondition: not(rebuildFinishedCond), + OnSuccessCondition: rebuildFinishedCond, + }, + Body: check, + } +} +func getStandardInitFinishedStep(c withName, check func(ctx context.Context) (ok bool, err error)) StepCheck { + name := c.GetName() + initCond := initializationFinished(name) + return StepCheck{ + StepMeta: StepMeta{ + Name: StepInitFinished, + RunIfCondition: not(initCond), + OnSuccessCondition: initCond, + }, + Body: check, + } +} +func getStandardUpdateStep( + c withName, + condManager conditionManagerIface, + check func(ctx context.Context) (bool, error), + steps []Step, +) StepComposite { + name := c.GetName() + updateRequiredCond := updateRequired(name) + + allSteps := []Step{ + StepRun{ + StepMeta: StepMeta{ + Name: StepCheckUpdateRequired, + RunIfCondition: not(updateRequiredCond), + OnSuccessCondition: updateRequiredCond, + // If update started — setting updateRequired unconditionally. + }, + }, + } + allSteps = append(allSteps, steps...) + + var onSuccessConditions []Condition + for _, stepI := range allSteps { + var cond Condition + switch step := stepI.(type) { + case StepRun: + cond = step.OnSuccessCondition + case StepCheck: + cond = step.OnSuccessCondition + case StepComposite: + cond = step.OnSuccessCondition + } + onSuccessConditions = append(onSuccessConditions, not(cond)) + } + + return StepComposite{ + StepMeta: StepMeta{ + Name: StepUpdate, + // Update should be run if either diff exists or updateRequired condition is set, + // because a diff should disappear in the middle of the update, but it still needs + // to finish actions after the update. + StatusFunc: func(ctx context.Context) (SyncStatus, string, error) { + inSync, err := check(ctx) + if err != nil { + return "", "", err + } + if !inSync || condManager.IsSatisfied(updateRequiredCond) { + return SyncStatusNeedSync, "", nil + } + return SyncStatusReady, "", nil + }, + OnSuccessFunc: func(ctx context.Context) error { + return condManager.SetCondMany( + ctx, + onSuccessConditions..., + ) + }, + }, + Body: allSteps, + } +} diff --git a/pkg/components/component_local_test.go b/pkg/components/component_local_test.go new file mode 100644 index 00000000..88b6f392 --- /dev/null +++ b/pkg/components/component_local_test.go @@ -0,0 +1,109 @@ +package components + +import ( + "context" + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/record" + ptr "k8s.io/utils/pointer" + ctrlrt "sigs.k8s.io/controller-runtime" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/testutil" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +const ( + ytsaurusName = "testsaurus" + domain = "testdomain" +) + +func prepareTest(t *testing.T, namespace string) (*testutil.TestHelper, *apiproxy.Ytsaurus, *ytconfig.Generator) { + h := testutil.NewTestHelper(t, namespace, filepath.Join("..", "..", "config", "crd", "bases")) + h.Start(func(mgr ctrlrt.Manager) error { return nil }) + + ytsaurusResource := testutil.BuildMinimalYtsaurus(namespace, ytsaurusName) + // Deploy of ytsaurus spec is required, so it could set valid owner references for child resources. + testutil.DeployObject(h, &ytsaurusResource) + + scheme := runtime.NewScheme() + utilruntime.Must(ytv1.AddToScheme(scheme)) + fakeRecorder := record.NewFakeRecorder(100) + + ytsaurus := apiproxy.NewYtsaurus(&ytsaurusResource, h.GetK8sClient(), fakeRecorder, scheme) + cfgen := ytconfig.NewGenerator(ytsaurus.GetResource(), domain) + return h, ytsaurus, cfgen +} + +func syncUntilReady(t *testing.T, h *testutil.TestHelper, component Component) { + t.Logf("Start initial build for %s", component.GetName()) + defer t.Logf("Finished initial build for %s", component.GetName()) + ctx := context.Background() + testutil.Eventually(h, component.GetName()+" became ready", func() bool { + st, err := component.Status(ctx) + require.NoError(t, err) + if st.SyncStatus == SyncStatusReady { + return true + } + require.NoError(t, component.Sync(ctx)) + return false + }) +} + +func testComponentFlow( + t *testing.T, + shortName, longName, firstInstanceSuffix string, + build func(*ytconfig.Generator, *apiproxy.Ytsaurus) Component, + setImage func(*ytv1.Ytsaurus, *string), +) { + ctx := context.Background() + namespace := longName + h, ytsaurus, cfgen := prepareTest(t, namespace) + // TODO: separate helper so no need to remember to call stop + defer h.Stop() + ytsaurusResource := ytsaurus.GetResource() + + // initial creation + component := build(cfgen, ytsaurus) + syncUntilReady(t, h, component) + + cmData := testutil.FetchConfigMapData(h, "yt-"+longName+firstInstanceSuffix+"-config", "ytserver-"+longName+".yson") + require.Contains(t, cmData, "ms-0.masters."+namespace+".svc."+domain+":9010") + + // TODO: replace with get + testutil.FetchEventually( + h, + shortName+firstInstanceSuffix, + &appsv1.StatefulSet{}, + ) + + // update + // TODO: update2 to be sure that first update doesn't end with wrong state + // after fix jobs deletion + for i := 1; i <= 1; i++ { + t.Logf("Update %s #%d", shortName+firstInstanceSuffix, i) + newImage := ptr.String(fmt.Sprintf("new-image-%d", i)) + setImage(ytsaurusResource, newImage) + + component = build(cfgen, ytsaurus) + testutil.Eventually(h, shortName+firstInstanceSuffix+" became ready", func() bool { + st, err := component.Status(ctx) + require.NoError(t, err) + if st.SyncStatus == SyncStatusReady { + return true + } + require.NoError(t, component.Sync(ctx)) + return false + }) + sts := &appsv1.StatefulSet{} + testutil.GetObject(h, shortName+firstInstanceSuffix, sts) + require.Equal(t, *newImage, sts.Spec.Template.Spec.Containers[0].Image) + } +} diff --git a/pkg/components/conditions.go b/pkg/components/conditions.go new file mode 100644 index 00000000..4ba54881 --- /dev/null +++ b/pkg/components/conditions.go @@ -0,0 +1,281 @@ +package components + +import ( + "context" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" +) + +type ConditionName string + +type Condition struct { + Name ConditionName + Val bool +} + +func (c Condition) String() string { + if c.Val { + return string(c.Name) + } + return fmt.Sprintf("!%s", c.Name) +} + +func not(condDep Condition) Condition { + return Condition{ + Name: condDep.Name, + Val: !condDep.Val, + } +} +func isTrue(cond ConditionName) Condition { + // '^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$' for conditions. + replaced := strings.Replace(string(cond), "-", "_", -1) + return Condition{Name: ConditionName(replaced), Val: true} +} + +// buildFinished means that component was fully built initally. +func buildStarted(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sBuildStarted", compName))) +} +func buildFinished(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sBuildFinished", compName))) +} + +func initializationStarted(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%snitializationStarted", compName))) +} +func initializationFinished(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%snitializationFinished", compName))) +} +func updateRequired(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sUpdateRequired", compName))) +} +func rebuildStarted(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sRebuildStarted", compName))) +} +func podsRemoved(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sPodsRemoved", compName))) +} +func podsCreated(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sPodsCreated", compName))) +} +func rebuildFinished(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sRebuildFinished", compName))) +} + +type baseStateManager struct { + client client.Client + ytsaurus *ytv1.Ytsaurus +} + +type ConditionManager struct { + baseStateManager +} +type StateManager struct { + baseStateManager +} + +func NewConditionManagerFromYtsaurus(ytsaurus *apiproxy.Ytsaurus) *ConditionManager { + return NewConditionManager( + ytsaurus.APIProxy().Client(), + ytsaurus.GetResource(), + ) +} + +func NewConditionManager(client client.Client, ytsaurus *ytv1.Ytsaurus) *ConditionManager { + return &ConditionManager{ + baseStateManager{ + client: client, + ytsaurus: ytsaurus, + }, + } +} + +func NewStateManagerFromYtsaurus(ytsaurus *apiproxy.Ytsaurus) *StateManager { + return NewStateManager( + ytsaurus.APIProxy().Client(), + ytsaurus.GetResource(), + ) +} + +func NewStateManager(client client.Client, ytsaurus *ytv1.Ytsaurus) *StateManager { + return &StateManager{ + baseStateManager{ + client: client, + ytsaurus: ytsaurus, + }, + } +} + +// TODO: refactor this for managers to receive interface which can persist state on resource +// to reuse for other CRDS & don't have such wide interfaces for managers. +func (m *baseStateManager) updateStatusRetryOnConflict(ctx context.Context, change func(ytsaurusResource *ytv1.Ytsaurus)) error { + tryUpdate := func(ytsaurus *ytv1.Ytsaurus) error { + // N.B. Status().Update(...) updates not only status sub-resource, but also main + // ytsaurus resource which is the same reference in a lot of places of our code, + // and it bites us, because we're modifying spec (monitoring port). + // Not sure if it is good workaround, need to think about it, + // but we should get rid of overriding port and possibly using references to the main resource. + ytsaurusTemp := &ytv1.Ytsaurus{ + ObjectMeta: metav1.ObjectMeta{ + Name: ytsaurus.Name, + Namespace: ytsaurus.Namespace, + ResourceVersion: ytsaurus.ResourceVersion, + }, + Status: ytsaurus.Status, + } + change(ytsaurusTemp) + + err := m.client.Status().Update(ctx, ytsaurusTemp) + m.ytsaurus.Status = ytsaurusTemp.Status + // You have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err + } + + err := tryUpdate(m.ytsaurus) + if err == nil || !errors.IsConflict(err) { + return err + } + + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Fetch the resource here; you need to refetch it on every try, since + // if you got a conflict on the last update attempt then you need to get + // the current version before making your own changes. + ytsaurus := ytv1.Ytsaurus{} + name := types.NamespacedName{ + Namespace: m.ytsaurus.Namespace, + Name: m.ytsaurus.Name, + } + if err = m.client.Get(ctx, name, &ytsaurus); err != nil { + return err + } + + return tryUpdate(&ytsaurus) + }) +} + +func (cm *ConditionManager) SetTrue(ctx context.Context, condName ConditionName) error { + return cm.SetTrueMsg(ctx, condName, "") +} +func (cm *ConditionManager) SetTrueMsg(ctx context.Context, condName ConditionName, msg string) error { + return cm.SetMsg(ctx, condName, true, msg) +} +func (cm *ConditionManager) SetFalse(ctx context.Context, condName ConditionName) error { + return cm.SetFalseMsg(ctx, condName, "") +} +func (cm *ConditionManager) SetFalseMsg(ctx context.Context, condName ConditionName, msg string) error { + return cm.SetMsg(ctx, condName, false, msg) +} +func (cm *ConditionManager) Set(ctx context.Context, condName ConditionName, val bool) error { + return cm.SetMsg(ctx, condName, val, "") +} +func (cm *ConditionManager) SetCond(ctx context.Context, cond Condition) error { + return cm.SetMsg(ctx, cond.Name, cond.Val, "") +} +func (cm *ConditionManager) SetCondMany(ctx context.Context, conds ...Condition) error { + var metaconds []metav1.Condition + for _, cond := range conds { + metaconds = append(metaconds, cm.buildCond(cond.Name, cond.Val, "")) + } + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + for _, metacond := range metaconds { + meta.SetStatusCondition(&ytsaurus.Status.Conditions, metacond) + } + }) +} +func (cm *ConditionManager) SetCondMsg(ctx context.Context, cond Condition, msg string) error { + return cm.SetMsg(ctx, cond.Name, cond.Val, msg) +} +func (cm *ConditionManager) buildCond(condName ConditionName, val bool, msg string) metav1.Condition { + return metav1.Condition{ + Type: string(condName), + Status: map[bool]metav1.ConditionStatus{ + true: metav1.ConditionTrue, + false: metav1.ConditionFalse, + }[val], + // DO we need better reason? + Reason: string(condName), + Message: msg, + } +} +func (cm *ConditionManager) SetMsg(ctx context.Context, condName ConditionName, val bool, msg string) error { + metacond := cm.buildCond(condName, val, msg) + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + meta.SetStatusCondition(&ytsaurus.Status.Conditions, metacond) + }) +} +func (cm *ConditionManager) IsTrue(condName ConditionName) bool { + return meta.IsStatusConditionTrue(cm.ytsaurus.Status.Conditions, string(condName)) +} +func (cm *ConditionManager) IsFalse(condName ConditionName) bool { + return !cm.IsTrue(condName) +} +func (cm *ConditionManager) Is(cond Condition) bool { + return cm.IsSatisfied(cond) +} +func (cm *ConditionManager) All(conds ...Condition) bool { + for _, cond := range conds { + if cm.IsNotSatisfied(cond) { + return false + } + } + return true +} +func (cm *ConditionManager) Any(conds ...Condition) bool { + for _, cond := range conds { + if cm.IsSatisfied(cond) { + return true + } + } + return false +} +func (cm *ConditionManager) IsSatisfied(cond Condition) bool { + return cm.IsTrue(cond.Name) == cond.Val +} +func (cm *ConditionManager) IsNotSatisfied(cond Condition) bool { + return !cm.IsSatisfied(cond) +} +func (cm *ConditionManager) Get(condName ConditionName) bool { + if cm.IsTrue(condName) { + return true + } else { + return false + } +} + +func (cm *StateManager) SetClusterState(ctx context.Context, clusterState ytv1.ClusterState) error { + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + ytsaurus.Status.State = clusterState + }) +} +func (cm *StateManager) SetClusterUpdateState(ctx context.Context, updateState ytv1.UpdateState) error { + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + ytsaurus.Status.UpdateStatus.State = updateState + }) +} +func (cm *StateManager) SetTabletCellBundles(ctx context.Context, cells []ytv1.TabletCellBundleInfo) error { + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + ytsaurus.Status.UpdateStatus.TabletCellBundles = cells + }) +} +func (cm *StateManager) SetMasterMonitoringPaths(ctx context.Context, paths []string) error { + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + ytsaurus.Status.UpdateStatus.MasterMonitoringPaths = paths + }) +} +func (cm *StateManager) GetTabletCellBundles() []ytv1.TabletCellBundleInfo { + return cm.ytsaurus.Status.UpdateStatus.TabletCellBundles +} +func (cm *StateManager) GetMasterMonitoringPaths() []string { + return cm.ytsaurus.Status.UpdateStatus.MasterMonitoringPaths +} diff --git a/pkg/components/config_helper.go b/pkg/components/config_helper.go index 2ce1fc63..e24ad1e5 100644 --- a/pkg/components/config_helper.go +++ b/pkg/components/config_helper.go @@ -10,12 +10,13 @@ import ( "github.com/BurntSushi/toml" "github.com/google/go-cmp/cmp" + "go.ytsaurus.tech/yt/go/yson" + corev1 "k8s.io/api/core/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/yt-k8s-operator/pkg/labeller" "github.com/ytsaurus/yt-k8s-operator/pkg/resources" "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" - "go.ytsaurus.tech/yt/go/yson" - corev1 "k8s.io/api/core/v1" ) const ( @@ -209,6 +210,7 @@ func (h *ConfigHelper) Build() *corev1.ConfigMap { for fileName := range h.generators { data, err := h.getConfig(fileName) if err != nil { + // TODO: fix suppression of the error, it will fail with NPE in places of call return nil } diff --git a/pkg/components/controller_agent.go b/pkg/components/controller_agent.go index b9389ceb..d96ebc30 100644 --- a/pkg/components/controller_agent.go +++ b/pkg/components/controller_agent.go @@ -96,10 +96,10 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu } func (ca *ControllerAgent) Status(ctx context.Context) (ComponentStatus, error) { - return ca.doSync(ctx, true) + return flowToStatus(ctx, ca, ca.getFlow(), ca.condManager) } func (ca *ControllerAgent) Sync(ctx context.Context) error { - _, err := ca.doSync(ctx, false) - return err + return flowToSync(ctx, ca.getFlow(), ca.condManager) + } diff --git a/pkg/components/controller_agent_flow.go b/pkg/components/controller_agent_flow.go new file mode 100644 index 00000000..7a91ff78 --- /dev/null +++ b/pkg/components/controller_agent_flow.go @@ -0,0 +1,21 @@ +package components + +func (ca *ControllerAgent) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(ca, ca.server.Sync), + getStandardWaitBuildFinishedStep(ca, ca.server.inSync), + getStandardUpdateStep( + ca, + ca.condManager, + ca.server.inSync, + []Step{ + getStandardStartRebuildStep(ca, ca.server.removePods), + getStandardWaitPodsRemovedStep(ca, ca.server.arePodsRemoved), + getStandardPodsCreateStep(ca, ca.server.Sync), + getStandardWaiRebuildFinishedStep(ca, ca.server.inSync), + }, + ), + }, + } +} diff --git a/pkg/components/controller_agent_local_test.go b/pkg/components/controller_agent_local_test.go new file mode 100644 index 00000000..d425cf5b --- /dev/null +++ b/pkg/components/controller_agent_local_test.go @@ -0,0 +1,28 @@ +package components + +import ( + "testing" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +func TestControllerAgentFlow(t *testing.T) { + testComponentFlow( + t, + "ca", + "controller-agent", + "", + func(cfgen *ytconfig.Generator, ytsaurus *apiProxy.Ytsaurus) Component { + return NewControllerAgent( + cfgen, + ytsaurus, + nil, + ) + }, + func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.ControllerAgents.Image = image + }, + ) +} diff --git a/pkg/components/data_node.go b/pkg/components/data_node.go index 7cb22794..86a92a51 100644 --- a/pkg/components/data_node.go +++ b/pkg/components/data_node.go @@ -103,10 +103,9 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error } func (n *DataNode) Status(ctx context.Context) (ComponentStatus, error) { - return n.doSync(ctx, true) + return flowToStatus(ctx, n, n.getFlow(), n.condManager) } func (n *DataNode) Sync(ctx context.Context) error { - _, err := n.doSync(ctx, false) - return err + return flowToSync(ctx, n.getFlow(), n.condManager) } diff --git a/pkg/components/data_node_flow.go b/pkg/components/data_node_flow.go new file mode 100644 index 00000000..71305e75 --- /dev/null +++ b/pkg/components/data_node_flow.go @@ -0,0 +1,21 @@ +package components + +func (n *DataNode) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(n, n.server.Sync), + getStandardWaitBuildFinishedStep(n, n.server.inSync), + getStandardUpdateStep( + n, + n.condManager, + n.server.inSync, + []Step{ + getStandardStartRebuildStep(n, n.server.removePods), + getStandardWaitPodsRemovedStep(n, n.server.arePodsRemoved), + getStandardPodsCreateStep(n, n.server.Sync), + getStandardWaiRebuildFinishedStep(n, n.server.inSync), + }, + ), + }, + } +} diff --git a/pkg/components/data_node_local_test.go b/pkg/components/data_node_local_test.go new file mode 100644 index 00000000..1fe03bc4 --- /dev/null +++ b/pkg/components/data_node_local_test.go @@ -0,0 +1,30 @@ +package components + +import ( + "testing" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +func TestDataNodeFlow(t *testing.T) { + testComponentFlow( + t, + "dnd", + "data-node", + "-dn-1", + func(_ *ytconfig.Generator, ytsaurus *apiProxy.Ytsaurus) Component { + cfgen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), domain) + return NewDataNode( + cfgen, + ytsaurus, + nil, + ytsaurus.GetResource().Spec.DataNodes[0], + ) + }, + func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.DataNodes[0].Image = image + }, + ) +} diff --git a/pkg/components/discovery.go b/pkg/components/discovery.go index 87c2f8be..ec671c4e 100644 --- a/pkg/components/discovery.go +++ b/pkg/components/discovery.go @@ -88,10 +88,9 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } func (d *Discovery) Status(ctx context.Context) (ComponentStatus, error) { - return d.doSync(ctx, true) + return flowToStatus(ctx, d, d.getFlow(), d.condManager) } func (d *Discovery) Sync(ctx context.Context) error { - _, err := d.doSync(ctx, false) - return err + return flowToSync(ctx, d.getFlow(), d.condManager) } diff --git a/pkg/components/discovery_flow.go b/pkg/components/discovery_flow.go new file mode 100644 index 00000000..4e3fd949 --- /dev/null +++ b/pkg/components/discovery_flow.go @@ -0,0 +1,21 @@ +package components + +func (d *Discovery) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(d, d.server.Sync), + getStandardWaitBuildFinishedStep(d, d.server.inSync), + getStandardUpdateStep( + d, + d.condManager, + d.server.inSync, + []Step{ + getStandardStartRebuildStep(d, d.server.removePods), + getStandardWaitPodsRemovedStep(d, d.server.arePodsRemoved), + getStandardPodsCreateStep(d, d.server.Sync), + getStandardWaiRebuildFinishedStep(d, d.server.inSync), + }, + ), + }, + } +} diff --git a/pkg/components/discovery_local_test.go b/pkg/components/discovery_local_test.go new file mode 100644 index 00000000..caaf0b25 --- /dev/null +++ b/pkg/components/discovery_local_test.go @@ -0,0 +1,24 @@ +package components + +import ( + "testing" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +func TestDiscoveryFlow(t *testing.T) { + testComponentFlow( + t, + "ds", + "discovery", + "", + func(cfgen *ytconfig.Generator, ytsaurus *apiProxy.Ytsaurus) Component { + return NewDiscovery(cfgen, ytsaurus) + }, + func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.Discovery.Image = image + }, + ) +} diff --git a/pkg/components/exec_node.go b/pkg/components/exec_node.go index ea0605e0..186f35d3 100644 --- a/pkg/components/exec_node.go +++ b/pkg/components/exec_node.go @@ -117,10 +117,9 @@ func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error } func (n *ExecNode) Status(ctx context.Context) (ComponentStatus, error) { - return n.doSync(ctx, true) + return flowToStatus(ctx, n, n.getFlow(), n.condManager) } func (n *ExecNode) Sync(ctx context.Context) error { - _, err := n.doSync(ctx, false) - return err + return flowToSync(ctx, n.getFlow(), n.condManager) } diff --git a/pkg/components/exec_node_flow.go b/pkg/components/exec_node_flow.go new file mode 100644 index 00000000..3ecf661d --- /dev/null +++ b/pkg/components/exec_node_flow.go @@ -0,0 +1,21 @@ +package components + +func (n *ExecNode) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(n, n.server.Sync), + getStandardWaitBuildFinishedStep(n, n.server.inSync), + getStandardUpdateStep( + n, + n.condManager, + n.server.inSync, + []Step{ + getStandardStartRebuildStep(n, n.server.removePods), + getStandardWaitPodsRemovedStep(n, n.server.arePodsRemoved), + getStandardPodsCreateStep(n, n.server.Sync), + getStandardWaiRebuildFinishedStep(n, n.server.inSync), + }, + ), + }, + } +} diff --git a/pkg/components/exec_node_local_test.go b/pkg/components/exec_node_local_test.go new file mode 100644 index 00000000..fee1ecb5 --- /dev/null +++ b/pkg/components/exec_node_local_test.go @@ -0,0 +1,30 @@ +package components + +import ( + "testing" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +func TestExecNodeFlow(t *testing.T) { + testComponentFlow( + t, + "end", + "exec-node", + "", + func(_ *ytconfig.Generator, ytsaurus *apiProxy.Ytsaurus) Component { + cfgen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), domain) + return NewExecNode( + cfgen, + ytsaurus, + nil, + ytsaurus.GetResource().Spec.ExecNodes[0], + ) + }, + func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.ExecNodes[0].Image = image + }, + ) +} diff --git a/pkg/components/flow.go b/pkg/components/flow.go new file mode 100644 index 00000000..760fc427 --- /dev/null +++ b/pkg/components/flow.go @@ -0,0 +1,164 @@ +package components + +import ( + "context" + "fmt" +) + +type Step interface { + StepName() string + // Status answers if the body can and should be run. + // If st == Ready — no need to run. + // If st == NeedSync — need to run. + // If st == Blocked — run is impossible. + // msg is an optional human-readable explanation of status. + // err is an error of status checking. + Status(ctx context.Context, conds conditionManagerIface) (ComponentStatus, error) + // Run is a body of a step, it will be run only if Status() returns NeedSync. + // If ok is true and no error, run considered successful and postRun executed. + Run(ctx context.Context, conds conditionManagerIface) (ok bool, err error) + // PostRun can be used for cleanup or some other post-actions, + // typical usage is to clean up some intermediate conditions which are set in step. + PostRun(ctx context.Context, conds conditionManagerIface) error +} + +type StepMeta struct { + Name string + // RunIfCondition should be satisfied for step to run. + RunIfCondition Condition + // StatusFunc should return NeedSync status for step to run. + // If both RunIfCondition and StatusFunc a specified, then both should be resolved as true. + StatusFunc func(ctx context.Context) (st SyncStatus, msg string, err error) + + // OnSuccessCondition will be set after successful execution of the step. + OnSuccessCondition Condition + // OnSuccessCondition will be called after successful execution of the step. + OnSuccessFunc func(ctx context.Context) error +} + +func (m StepMeta) StepName() string { return m.Name } + +// Status decides if step is ready/blocked/need sync based on statusCondition and statusFunc +// (ones that not provided considered as satisfied and resulted in Ready status). +func (m StepMeta) Status( + ctx context.Context, + conds conditionManagerIface, +) (ComponentStatus, error) { + if m.RunIfCondition.Name != "" && conds.IsNotSatisfied(m.RunIfCondition) { + return ComponentStatus{ + SyncStatus: SyncStatusReady, + Message: fmt.Sprintf("%s ok", m.Name), + Stage: m.Name, + }, nil + } + if m.StatusFunc != nil { + syncSt, msg, err := m.StatusFunc(ctx) + return ComponentStatus{ + SyncStatus: syncSt, + Message: msg, + Stage: m.Name, + }, err + } + return ComponentStatus{ + SyncStatus: SyncStatusNeedSync, + Message: fmt.Sprintf("%s not done", m.Name), + Stage: m.Name, + }, nil +} + +func (m StepMeta) PostRun( + ctx context.Context, + conds conditionManagerIface, +) error { + if m.OnSuccessCondition.Name != "" { + if err := conds.SetCond(ctx, m.OnSuccessCondition); err != nil { + return err + } + } + if m.OnSuccessFunc != nil { + return m.OnSuccessFunc(ctx) + } + return nil +} + +// StepRun has Body func which only returns error, +// run considered successful if no error returned. +type StepRun struct { + StepMeta + Body func(ctx context.Context) error +} + +func (s StepRun) Run(ctx context.Context, _ conditionManagerIface) (bool, error) { + if s.Body == nil { + return true, nil + } + err := s.Body(ctx) + return err == nil, err +} + +// StepCheck has Body func which returns ok and error, +// run considered successful if ok is true and no error returned. +type StepCheck struct { + StepMeta + Body func(ctx context.Context) (ok bool, err error) +} + +func (s StepCheck) Run(ctx context.Context, _ conditionManagerIface) (bool, error) { + if s.Body == nil { + return true, nil + } + return s.Body(ctx) +} + +type StepComposite struct { + StepMeta + Body []Step +} + +func (s StepComposite) Run(ctx context.Context, conds conditionManagerIface) (bool, error) { + for _, step := range s.Body { + st, err := step.Status(ctx, conds) + if err != nil { + return false, err + } + if st.SyncStatus == SyncStatusReady { + continue + } + + runOk, err := step.Run(ctx, conds) + if err != nil { + return false, err + } + if runOk { + err = step.PostRun(ctx, conds) + return err != nil, err + } + return false, nil + } + return true, nil +} + +// Status of StepComposite is more complex: +// - at first it checks if it itself need to run +// - and since its body consists of steps — it locates first not-ready step and return its status, +// so Run method would go in step list to found step and execute it. +func (s StepComposite) Status(ctx context.Context, conds conditionManagerIface) (ComponentStatus, error) { + st, err := s.StepMeta.Status(ctx, conds) + if st.SyncStatus == SyncStatusReady || err != nil { + return st, err + } + + for _, step := range s.Body { + st, err = step.Status(ctx, conds) + if err != nil { + return ComponentStatus{}, err + } + if st.SyncStatus != SyncStatusReady { + return st, nil + } + } + return ComponentStatus{ + SyncStatus: SyncStatusReady, + Message: "ok", + }, nil +} diff --git a/pkg/components/httpproxy.go b/pkg/components/httpproxy.go index 76a9219e..0b0a6210 100644 --- a/pkg/components/httpproxy.go +++ b/pkg/components/httpproxy.go @@ -148,11 +148,35 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err return SimpleStatus(SyncStatusReady), err } +func (hp *HttpProxy) doServerSync(ctx context.Context) error { + statefulSet := hp.server.buildStatefulSet() + if hp.httpsSecret != nil { + hp.httpsSecret.AddVolume(&statefulSet.Spec.Template.Spec) + hp.httpsSecret.AddVolumeMount(&statefulSet.Spec.Template.Spec.Containers[0]) + } + err := hp.server.Sync(ctx) + if err != nil { + return err + } + + s := hp.balancingService.Build() + s.Spec.Type = hp.serviceType + return hp.balancingService.Sync(ctx) +} + +func (hp *HttpProxy) serverInSync(ctx context.Context) (bool, error) { + srvInSync, err := hp.server.inSync(ctx) + if err != nil { + return false, err + } + balancerExists := resources.Exists(hp.balancingService) + return srvInSync && balancerExists, nil +} + func (hp *HttpProxy) Status(ctx context.Context) (ComponentStatus, error) { - return hp.doSync(ctx, true) + return flowToStatus(ctx, hp, hp.getFlow(), hp.condManager) } func (hp *HttpProxy) Sync(ctx context.Context) error { - _, err := hp.doSync(ctx, false) - return err + return flowToSync(ctx, hp.getFlow(), hp.condManager) } diff --git a/pkg/components/httpproxy_flow.go b/pkg/components/httpproxy_flow.go new file mode 100644 index 00000000..ad461a46 --- /dev/null +++ b/pkg/components/httpproxy_flow.go @@ -0,0 +1,21 @@ +package components + +func (hp *HttpProxy) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(hp, hp.doServerSync), + getStandardWaitBuildFinishedStep(hp, hp.serverInSync), + getStandardUpdateStep( + hp, + hp.condManager, + hp.serverInSync, + []Step{ + getStandardStartRebuildStep(hp, hp.server.removePods), + getStandardWaitPodsRemovedStep(hp, hp.server.arePodsRemoved), + getStandardPodsCreateStep(hp, hp.doServerSync), + getStandardWaiRebuildFinishedStep(hp, hp.serverInSync), + }, + ), + }, + } +} diff --git a/pkg/components/httpproxy_local_test.go b/pkg/components/httpproxy_local_test.go new file mode 100644 index 00000000..d8210730 --- /dev/null +++ b/pkg/components/httpproxy_local_test.go @@ -0,0 +1,29 @@ +package components + +import ( + "testing" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +func TestHTTPProxyFlow(t *testing.T) { + testComponentFlow( + t, + "hp", + "http-proxy", + "", + func(cfgen *ytconfig.Generator, ytsaurus *apiProxy.Ytsaurus) Component { + return NewHTTPProxy( + cfgen, + ytsaurus, + nil, + ytsaurus.GetResource().Spec.HTTPProxies[0], + ) + }, + func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.HTTPProxies[0].Image = image + }, + ) +} diff --git a/pkg/components/init_job.go b/pkg/components/init_job.go index 5ff8554f..4b231b1e 100644 --- a/pkg/components/init_job.go +++ b/pkg/components/init_job.go @@ -139,8 +139,8 @@ func (j *InitJob) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { if j.conditionsManager.IsStatusConditionTrue(j.initCompletedCondition) { return ComponentStatus{ - SyncStatusReady, - fmt.Sprintf("%s completed", j.initJob.Name()), + SyncStatus: SyncStatusReady, + Message: fmt.Sprintf("%s completed", j.initJob.Name()), }, err } diff --git a/pkg/components/master.go b/pkg/components/master.go index e29daf53..db51b50c 100644 --- a/pkg/components/master.go +++ b/pkg/components/master.go @@ -26,16 +26,27 @@ const ( defaultHostAddressLabel = "kubernetes.io/hostname" ) +type ytsaurusClientForMaster interface { + HandlePossibilityCheck(context.Context) (bool, string, error) + EnableSafeMode(context.Context) error + DisableSafeMode(context.Context) error + GetMasterMonitoringPaths(context.Context) ([]string, error) + StartBuildMasterSnapshots(context.Context, []string) error + AreMasterSnapshotsBuilt(context.Context, []string) (bool, error) +} + type Master struct { localServerComponent cfgen *ytconfig.Generator + ytClient ytsaurusClientForMaster + initJob *InitJob exitReadOnlyJob *InitJob adminCredentials corev1.Secret } -func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master { +func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, ytClient ytsaurusClientForMaster) *Master { resource := ytsaurus.GetResource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, @@ -84,6 +95,7 @@ func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master { return &Master{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, + ytClient: ytClient, initJob: initJob, exitReadOnlyJob: exitReadOnlyJob, } @@ -319,13 +331,36 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error) return m.initJob.Sync(ctx, dry) } +func (m *Master) IsBuildInitially(ctx context.Context) (bool, error) { + if err := m.Fetch(ctx); err != nil { + return false, fmt.Errorf("failed to fetch component %s: %w", m.GetName(), err) + } + return m.condManager.Is(initializationFinished(m.GetName())), nil +} + +func (m *Master) NeedBuild(ctx context.Context) (bool, error) { + if err := m.Fetch(ctx); err != nil { + return false, fmt.Errorf("failed to fetch component %s: %w", m.GetName(), err) + } + return m.server.needBuild(), nil +} + +func (m *Master) IsRebuildStarted() bool { + return m.condManager.Is(rebuildStarted(m.GetName())) +} + +func (m *Master) BuildInitial(ctx context.Context) error { + return flowToSync(ctx, m.getBuildFlow(), m.condManager) +} + +// Status for master is only about update stage. func (m *Master) Status(ctx context.Context) (ComponentStatus, error) { - return m.doSync(ctx, true) + return flowToStatus(ctx, m, m.getUpdateFlow(), m.condManager) } +// Sync for master is only about update stage. func (m *Master) Sync(ctx context.Context) error { - _, err := m.doSync(ctx, false) - return err + return flowToSync(ctx, m.getUpdateFlow(), m.condManager) } func (m *Master) doServerSync(ctx context.Context) error { diff --git a/pkg/components/master_caches.go b/pkg/components/master_caches.go index c1588a3c..27f211a7 100644 --- a/pkg/components/master_caches.go +++ b/pkg/components/master_caches.go @@ -87,12 +87,11 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e } func (mc *MasterCache) Status(ctx context.Context) (ComponentStatus, error) { - return mc.doSync(ctx, true) + return flowToStatus(ctx, mc, mc.getFlow(), mc.condManager) } func (mc *MasterCache) Sync(ctx context.Context) error { - _, err := mc.doSync(ctx, false) - return err + return flowToSync(ctx, mc.getFlow(), mc.condManager) } func (mc *MasterCache) doServerSync(ctx context.Context) error { diff --git a/pkg/components/master_caches_flow.go b/pkg/components/master_caches_flow.go new file mode 100644 index 00000000..88fbe437 --- /dev/null +++ b/pkg/components/master_caches_flow.go @@ -0,0 +1,21 @@ +package components + +func (mc *MasterCache) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(mc, mc.server.Sync), + getStandardWaitBuildFinishedStep(mc, mc.server.inSync), + getStandardUpdateStep( + mc, + mc.condManager, + mc.server.inSync, + []Step{ + getStandardStartRebuildStep(mc, mc.server.removePods), + getStandardWaitPodsRemovedStep(mc, mc.server.arePodsRemoved), + getStandardPodsCreateStep(mc, mc.server.Sync), + getStandardWaiRebuildFinishedStep(mc, mc.server.inSync), + }, + ), + }, + } +} diff --git a/pkg/components/master_flow.go b/pkg/components/master_flow.go new file mode 100644 index 00000000..805c1742 --- /dev/null +++ b/pkg/components/master_flow.go @@ -0,0 +1,158 @@ +package components + +import ( + "context" +) + +var ( + masterUpdatePossibleCond = isTrue("MasterUpdatePossible") + masterSafeModeEnabledCond = isTrue("MasterSafeModeEnabled") + masterSnapshotsBuildStartedCond = isTrue("MasterSnapshotsBuildStarted") + masterSnapshotsBuildFinishedCond = isTrue("MasterSnapshotsBuildFinished") + masterExitReadOnlyPrepareStartedCond = isTrue("MasterExitReadOnlyPrepareStarted") + masterExitReadOnlyPrepareFinishedCond = isTrue("MasterExitReadOnlyPrepareFinished") + masterExitReadOnlyFinished = isTrue("MasterExitReadOnlyFinished") + masterSafeModeDisabledCond = isTrue("MasterSafeModeDisabled") +) + +var ( + MasterUpdatePossibleCheckStepName = "UpdatePossibleCheck" + MasterEnableSafeModeStepName = "EnableSafeMode" + MasterBuildSnapshotsStepName = "BuildSnapshots" + MasterCheckSnapshotsBuiltStepName = "CheckSnapshotsBuilt" + MasterStartPrepareMasterExitReadOnlyStepName = "StartPrepareMasterExitReadOnly" + MasterWaitMasterExitReadOnlyPreparedStepName = "WaitMasterExitReadOnlyPrepared" + MasterWaitMasterExitsReadOnlyStepName = "WaitMasterExitsReadOnly" + MasterDisableSafeModeStepName = "DisableSafeMode" +) + +func (m *Master) getBuildFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(m, m.doServerSync), + getStandardWaitBuildFinishedStep(m, m.server.inSync), + getStandardInitFinishedStep(m, func(ctx context.Context) (ok bool, err error) { + m.initJob.SetInitScript(m.createInitScript()) + st, err := m.initJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }), + }, + } +} + +func (m *Master) getUpdateFlow() Step { + return getStandardUpdateStep( + m, + m.condManager, + m.server.inSync, + []Step{ + StepCheck{ + StepMeta: StepMeta{ + Name: MasterUpdatePossibleCheckStepName, + StatusFunc: func(ctx context.Context) (SyncStatus, string, error) { + if m.condManager.IsSatisfied(masterUpdatePossibleCond) { + return SyncStatusReady, "", nil + } + possible, msg, err := m.ytClient.HandlePossibilityCheck(ctx) + // N.B.: here we return NeedSync (not Ready), so empty body of step become executed + // and masterUpdatePossibleCond became set. + st := SyncStatusNeedSync + if !possible { + st = SyncStatusBlocked + } + return st, msg, err + }, + OnSuccessCondition: masterUpdatePossibleCond, + }, + }, + StepRun{ + StepMeta: StepMeta{ + Name: MasterEnableSafeModeStepName, + RunIfCondition: not(masterSafeModeEnabledCond), + OnSuccessCondition: masterSafeModeEnabledCond, + }, + Body: m.ytClient.EnableSafeMode, + }, + StepRun{ + StepMeta: StepMeta{ + Name: MasterBuildSnapshotsStepName, + RunIfCondition: not(masterSnapshotsBuildStartedCond), + OnSuccessCondition: masterSnapshotsBuildStartedCond, + }, + Body: func(ctx context.Context) error { + monitoringPaths, err := m.ytClient.GetMasterMonitoringPaths(ctx) + if err != nil { + return err + } + if err = m.storeMasterMonitoringPaths(ctx, monitoringPaths); err != nil { + return err + } + return m.ytClient.StartBuildMasterSnapshots(ctx, monitoringPaths) + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: MasterCheckSnapshotsBuiltStepName, + RunIfCondition: not(masterSnapshotsBuildFinishedCond), + OnSuccessCondition: masterSnapshotsBuildFinishedCond, + }, + Body: func(ctx context.Context) (ok bool, err error) { + paths := m.getStoredMasterMonitoringPaths() + return m.ytClient.AreMasterSnapshotsBuilt(ctx, paths) + }, + }, + getStandardStartRebuildStep(m, m.server.removePods), + getStandardWaitPodsRemovedStep(m, m.server.arePodsRemoved), + getStandardPodsCreateStep(m, m.doServerSync), + getStandardWaiRebuildFinishedStep(m, m.server.inSync), + StepRun{ + StepMeta: StepMeta{ + Name: MasterStartPrepareMasterExitReadOnlyStepName, + RunIfCondition: not(masterExitReadOnlyPrepareStartedCond), + OnSuccessCondition: masterExitReadOnlyPrepareStartedCond, + }, + Body: func(ctx context.Context) error { + return m.exitReadOnlyJob.prepareRestart(ctx, false) + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: MasterWaitMasterExitReadOnlyPreparedStepName, + RunIfCondition: not(masterExitReadOnlyPrepareFinishedCond), + OnSuccessCondition: masterExitReadOnlyPrepareFinishedCond, + }, + Body: func(ctx context.Context) (bool, error) { + return m.exitReadOnlyJob.isRestartPrepared(), nil + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: MasterWaitMasterExitsReadOnlyStepName, + RunIfCondition: not(masterExitReadOnlyFinished), + OnSuccessCondition: masterExitReadOnlyFinished, + }, + Body: func(ctx context.Context) (ok bool, err error) { + m.exitReadOnlyJob.SetInitScript(m.createExitReadOnlyScript()) + st, err := m.exitReadOnlyJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }, + }, + StepRun{ + StepMeta: StepMeta{ + Name: MasterDisableSafeModeStepName, + RunIfCondition: not(masterSafeModeDisabledCond), + OnSuccessCondition: masterSafeModeDisabledCond, + }, + Body: m.ytClient.DisableSafeMode, + }, + }, + ) +} + +func (m *Master) storeMasterMonitoringPaths(ctx context.Context, paths []string) error { + return m.stateManager.SetMasterMonitoringPaths(ctx, paths) +} + +func (m *Master) getStoredMasterMonitoringPaths() []string { + return m.stateManager.GetMasterMonitoringPaths() +} diff --git a/pkg/components/master_local_test.go b/pkg/components/master_local_test.go new file mode 100644 index 00000000..e2fd287d --- /dev/null +++ b/pkg/components/master_local_test.go @@ -0,0 +1,85 @@ +package components + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + ptr "k8s.io/utils/pointer" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/testutil" +) + +type fakeYtsaurusForMaster struct { +} + +func (y *fakeYtsaurusForMaster) HandlePossibilityCheck(context.Context) (bool, string, error) { + return true, "", nil +} +func (y *fakeYtsaurusForMaster) EnableSafeMode(context.Context) error { + return nil +} +func (y *fakeYtsaurusForMaster) DisableSafeMode(context.Context) error { + return nil +} +func (y *fakeYtsaurusForMaster) GetMasterMonitoringPaths(context.Context) ([]string, error) { + return []string{"path1", "path2"}, nil +} +func (y *fakeYtsaurusForMaster) StartBuildMasterSnapshots(context.Context, []string) error { + return nil +} +func (y *fakeYtsaurusForMaster) AreMasterSnapshotsBuilt(context.Context, []string) (bool, error) { + return true, nil +} + +func TestMasterFlow(t *testing.T) { + ctx := context.Background() + shortName := "ms" + longName := "master" + namespace := longName + + h, ytsaurus, cfgen := prepareTest(t, namespace) + defer h.Stop() + + component := NewMaster(cfgen, ytsaurus, &fakeYtsaurusForMaster{}) + testutil.Eventually(h, shortName+" became ready", func() bool { + needBuild, err := component.NeedBuild(ctx) + require.NoError(t, err) + if !needBuild { + return true + } + require.NoError(t, component.BuildInitial(ctx)) + return false + }) + + cmData := testutil.FetchConfigMapData(h, "yt-"+longName+"-config", "ytserver-"+longName+".yson") + require.Contains(t, cmData, "ms-0.masters."+namespace+".svc."+domain+":9010") + + // TODO: replace with get + testutil.FetchEventually( + h, + shortName, + &appsv1.StatefulSet{}, + ) + + setImage := func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.PrimaryMasters.Image = image + } + // update + // TODO: update2 to be sure that first update doesn't end with wrong state + // after fix jobs deletion + for i := 1; i <= 1; i++ { + t.Logf("Update %s #%d", shortName, i) + newImage := ptr.String(fmt.Sprintf("new-image-%d", i)) + setImage(ytsaurus.GetResource(), newImage) + + component = NewMaster(cfgen, ytsaurus, &fakeYtsaurusForMaster{}) + syncUntilReady(t, h, component) + sts := &appsv1.StatefulSet{} + testutil.GetObject(h, shortName, sts) + require.Equal(t, *newImage, sts.Spec.Template.Spec.Containers[0].Image) + } +} diff --git a/pkg/components/microservice.go b/pkg/components/microservice.go index 02312c4c..0be129b7 100644 --- a/pkg/components/microservice.go +++ b/pkg/components/microservice.go @@ -3,15 +3,17 @@ package components import ( "context" - v1 "github.com/ytsaurus/yt-k8s-operator/api/v1" ptr "k8s.io/utils/pointer" + v1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/yt-k8s-operator/pkg/labeller" "github.com/ytsaurus/yt-k8s-operator/pkg/resources" "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" ) // microservice manages common resources of YTsaurus service component diff --git a/pkg/components/query_tracker.go b/pkg/components/query_tracker.go index ad1a7f79..b66f307b 100644 --- a/pkg/components/query_tracker.go +++ b/pkg/components/query_tracker.go @@ -200,7 +200,7 @@ func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus, } if !dry { - qt.prepareInitQueryTrackerState() + qt.prepareInitQueryTrackerState(qt.initQTState) } status, err := qt.initQTState.Sync(ctx, dry) if err != nil || status.SyncStatus != SyncStatusReady { @@ -213,6 +213,28 @@ func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus, return WaitingStatus(SyncStatusPending, fmt.Sprintf("setting %s condition", qt.initCondition)), err } +func (qt *QueryTracker) doServerSync(ctx context.Context) error { + if qt.secret.NeedSync(consts.TokenSecretKey, "") { + secretSpec := qt.secret.Build() + secretSpec.StringData = map[string]string{ + consts.TokenSecretKey: ytconfig.RandString(30), + } + if err := qt.secret.Sync(ctx); err != nil { + return err + } + } + return qt.server.Sync(ctx) +} + +func (qt *QueryTracker) serverInSync(ctx context.Context) (bool, error) { + srvInSync, err := qt.server.inSync(ctx) + if err != nil { + return false, err + } + secretNeedSync := qt.secret.NeedSync(consts.TokenSecretKey, "") + return srvInSync && !secretNeedSync, nil +} + func (qt *QueryTracker) createUser(ctx context.Context, ytClient yt.Client) (err error) { logger := log.FromContext(ctx) @@ -383,15 +405,14 @@ func (qt *QueryTracker) init(ctx context.Context, ytClient yt.Client) (err error } func (qt *QueryTracker) Status(ctx context.Context) (ComponentStatus, error) { - return qt.doSync(ctx, true) + return flowToStatus(ctx, qt, qt.getFlow(), qt.condManager) } func (qt *QueryTracker) Sync(ctx context.Context) error { - _, err := qt.doSync(ctx, false) - return err + return flowToSync(ctx, qt.getFlow(), qt.condManager) } -func (qt *QueryTracker) prepareInitQueryTrackerState() { +func (qt *QueryTracker) prepareInitQueryTrackerState(initJob *InitJob) { path := "/usr/bin/init_query_tracker_state" script := []string{ @@ -400,9 +421,9 @@ func (qt *QueryTracker) prepareInitQueryTrackerState() { path, path, qt.cfgen.GetHTTPProxiesServiceAddress(consts.DefaultHTTPProxyRole)), } - qt.initQTState.SetInitScript(strings.Join(script, "\n")) - job := qt.initQTState.Build() - container := &job.Spec.Template.Spec.Containers[0] + initJob.SetInitScript(strings.Join(script, "\n")) + batchJob := initJob.Build() + container := &batchJob.Spec.Template.Spec.Containers[0] container.EnvFrom = []corev1.EnvFromSource{qt.secret.GetEnvSource()} } diff --git a/pkg/components/query_tracker_flow.go b/pkg/components/query_tracker_flow.go new file mode 100644 index 00000000..6f8efded --- /dev/null +++ b/pkg/components/query_tracker_flow.go @@ -0,0 +1,87 @@ +package components + +import ( + "context" +) + +var ( + qtInitFinishedCond = isTrue("QTInitFinished") + qtUserCreatedCond = isTrue("QTUserCreated") + qtInitQTStatePrepareStartedCond = isTrue("QTInitQTStatePrepareStarted") + qtInitQTStatePrepareFinishedCond = isTrue("QTInitQTStatePrepareFinished") + qtInitQTStateFinishedCond = isTrue("QTInitQTStateFinished") +) + +func (qt *QueryTracker) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(qt, qt.doServerSync), + getStandardWaitBuildFinishedStep(qt, qt.serverInSync), + StepRun{ + StepMeta: StepMeta{ + Name: "QueryTrackerInit", + RunIfCondition: not(qtInitFinishedCond), + OnSuccessCondition: qtInitFinishedCond, + }, + Body: func(ctx context.Context) error { + return qt.init(ctx, qt.ytsaurusClient.GetYtClient()) + }, + }, + StepRun{ + StepMeta: StepMeta{ + Name: "QueryTrackerCreateUser", + RunIfCondition: not(qtUserCreatedCond), + OnSuccessCondition: qtUserCreatedCond, + }, + Body: func(ctx context.Context) error { + return qt.createUser(ctx, qt.ytsaurusClient.GetYtClient()) + }, + }, + // initQTState should be done on first build also? + getStandardUpdateStep( + qt, + qt.condManager, + qt.serverInSync, + []Step{ + getStandardStartRebuildStep(qt, qt.server.removePods), + getStandardWaitPodsRemovedStep(qt, qt.server.arePodsRemoved), + getStandardPodsCreateStep(qt, qt.doServerSync), + getStandardWaiRebuildFinishedStep(qt, qt.serverInSync), + // TODO: Suppose this job should be done once in init also. + StepRun{ + StepMeta: StepMeta{ + Name: "StartInitQTState", + RunIfCondition: not(qtInitQTStatePrepareStartedCond), + OnSuccessCondition: qtInitQTStatePrepareStartedCond, + }, + Body: func(ctx context.Context) error { + return qt.initQTState.prepareRestart(ctx, false) + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: "WaitInitQTStatePrepared", + RunIfCondition: not(qtInitQTStatePrepareFinishedCond), + OnSuccessCondition: qtInitQTStatePrepareFinishedCond, + }, + Body: func(ctx context.Context) (bool, error) { + return qt.initQTState.isRestartPrepared(), nil + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: "WaitInitQTStateFinished", + RunIfCondition: not(qtInitQTStateFinishedCond), + OnSuccessCondition: qtInitQTStateFinishedCond, + }, + Body: func(ctx context.Context) (ok bool, err error) { + qt.prepareInitQueryTrackerState(qt.initQTState) + st, err := qt.initQTState.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }, + }, + }, + ), + }, + } +} diff --git a/pkg/components/queue_agent.go b/pkg/components/queue_agent.go index b0481b32..ff8c67eb 100644 --- a/pkg/components/queue_agent.go +++ b/pkg/components/queue_agent.go @@ -184,7 +184,7 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er } if !dry { - qa.prepareInitQueueAgentState() + qa.prepareInitQueueAgentState(qa.initQAState) } status, err := qa.initQAState.Sync(ctx, dry) if err != nil || status.SyncStatus != SyncStatusReady { @@ -212,6 +212,28 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er return WaitingStatus(SyncStatusPending, fmt.Sprintf("setting %s condition", qa.initCondition)), err } +func (qa *QueueAgent) doServerSync(ctx context.Context) error { + if qa.secret.NeedSync(consts.TokenSecretKey, "") { + secretSpec := qa.secret.Build() + secretSpec.StringData = map[string]string{ + consts.TokenSecretKey: ytconfig.RandString(30), + } + if err := qa.secret.Sync(ctx); err != nil { + return err + } + } + return qa.server.Sync(ctx) +} + +func (qa *QueueAgent) serverInSync(ctx context.Context) (bool, error) { + srvInSync, err := qa.server.inSync(ctx) + if err != nil { + return false, err + } + secretNeedSync := qa.secret.NeedSync(consts.TokenSecretKey, "") + return srvInSync && !secretNeedSync, nil +} + func (qa *QueueAgent) createUser(ctx context.Context, ytClient yt.Client) (err error) { logger := log.FromContext(ctx) @@ -295,7 +317,7 @@ func (qa *QueueAgent) init(ctx context.Context, ytClient yt.Client) (err error) return } -func (qa *QueueAgent) prepareInitQueueAgentState() { +func (qa *QueueAgent) prepareInitQueueAgentState(job *InitJob) { path := "/usr/bin/init_queue_agent_state" script := []string{ @@ -305,16 +327,15 @@ func (qa *QueueAgent) prepareInitQueueAgentState() { } qa.initQAState.SetInitScript(strings.Join(script, "\n")) - job := qa.initQAState.Build() - container := &job.Spec.Template.Spec.Containers[0] + batchJob := job.Build() + container := &batchJob.Spec.Template.Spec.Containers[0] container.EnvFrom = []corev1.EnvFromSource{qa.secret.GetEnvSource()} } func (qa *QueueAgent) Status(ctx context.Context) (ComponentStatus, error) { - return qa.doSync(ctx, true) + return flowToStatus(ctx, qa, qa.getFlow(), qa.condManager) } func (qa *QueueAgent) Sync(ctx context.Context) error { - _, err := qa.doSync(ctx, false) - return err + return flowToSync(ctx, qa.getFlow(), qa.condManager) } diff --git a/pkg/components/queue_agent_flow.go b/pkg/components/queue_agent_flow.go new file mode 100644 index 00000000..3690c126 --- /dev/null +++ b/pkg/components/queue_agent_flow.go @@ -0,0 +1,87 @@ +package components + +import ( + "context" +) + +var ( + qaInitFinishedCond = isTrue("QAInitFinished") + qaUserCreatedCond = isTrue("QAUserCreated") + qaInitQAStatePrepareStartedCond = isTrue("QAInitQAStatePrepareStarted") + qaInitQAStatePrepareFinishedCond = isTrue("QAInitQAStatePrepareFinished") + qaInitQAStateFinishedCond = isTrue("QAInitQAStateFinished") +) + +func (qa *QueueAgent) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(qa, qa.doServerSync), + getStandardWaitBuildFinishedStep(qa, qa.serverInSync), + StepRun{ + StepMeta: StepMeta{ + Name: "QueueAgentInit", + RunIfCondition: not(qaInitFinishedCond), + OnSuccessCondition: qaInitFinishedCond, + }, + Body: func(ctx context.Context) error { + return qa.init(ctx, qa.ytsaurusClient.GetYtClient()) + }, + }, + StepRun{ + StepMeta: StepMeta{ + Name: "QueueAgentCreateUser", + RunIfCondition: not(qaUserCreatedCond), + OnSuccessCondition: qaUserCreatedCond, + }, + Body: func(ctx context.Context) error { + return qa.createUser(ctx, qa.ytsaurusClient.GetYtClient()) + }, + }, + // TODO: initQAState on first build also + getStandardUpdateStep( + qa, + qa.condManager, + qa.serverInSync, + []Step{ + getStandardStartRebuildStep(qa, qa.server.removePods), + getStandardWaitPodsRemovedStep(qa, qa.server.arePodsRemoved), + getStandardPodsCreateStep(qa, qa.doServerSync), + getStandardWaiRebuildFinishedStep(qa, qa.serverInSync), + // TODO: Suppose this job should be done once in init also. + StepRun{ + StepMeta: StepMeta{ + Name: "StartInitQAState", + RunIfCondition: not(qaInitQAStatePrepareStartedCond), + OnSuccessCondition: qaInitQAStatePrepareStartedCond, + }, + Body: func(ctx context.Context) error { + return qa.initQAState.prepareRestart(ctx, false) + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: "WaitInitQAStatePrepared", + RunIfCondition: not(qaInitQAStatePrepareFinishedCond), + OnSuccessCondition: qaInitQAStatePrepareFinishedCond, + }, + Body: func(ctx context.Context) (bool, error) { + return qa.initQAState.isRestartPrepared(), nil + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: "WaitInitQAStateFinished", + RunIfCondition: not(qaInitQAStateFinishedCond), + OnSuccessCondition: qaInitQAStateFinishedCond, + }, + Body: func(ctx context.Context) (ok bool, err error) { + qa.prepareInitQueueAgentState(qa.initQAState) + st, err := qa.initQAState.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }, + }, + }, + ), + }, + } +} diff --git a/pkg/components/rpcproxy.go b/pkg/components/rpcproxy.go index c28b57ad..c0e8c234 100644 --- a/pkg/components/rpcproxy.go +++ b/pkg/components/rpcproxy.go @@ -148,11 +148,35 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } +func (rp *RpcProxy) doServerSync(ctx context.Context) error { + statefulSet := rp.server.buildStatefulSet() + if secret := rp.tlsSecret; secret != nil { + secret.AddVolume(&statefulSet.Spec.Template.Spec) + secret.AddVolumeMount(&statefulSet.Spec.Template.Spec.Containers[0]) + } + err := rp.server.Sync(ctx) + if err != nil { + return err + } + + s := rp.balancingService.Build() + s.Spec.Type = *rp.serviceType + return rp.balancingService.Sync(ctx) +} + +func (rp *RpcProxy) serverInSync(ctx context.Context) (bool, error) { + srvInSync, err := rp.server.inSync(ctx) + if err != nil { + return false, err + } + balancerExists := resources.Exists(rp.balancingService) + return srvInSync && balancerExists, nil +} + func (rp *RpcProxy) Status(ctx context.Context) (ComponentStatus, error) { - return rp.doSync(ctx, true) + return flowToStatus(ctx, rp, rp.getFlow(), rp.condManager) } func (rp *RpcProxy) Sync(ctx context.Context) error { - _, err := rp.doSync(ctx, false) - return err + return flowToSync(ctx, rp.getFlow(), rp.condManager) } diff --git a/pkg/components/rpcproxy_flow.go b/pkg/components/rpcproxy_flow.go new file mode 100644 index 00000000..99a26b07 --- /dev/null +++ b/pkg/components/rpcproxy_flow.go @@ -0,0 +1,21 @@ +package components + +func (rp *RpcProxy) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(rp, rp.doServerSync), + getStandardWaitBuildFinishedStep(rp, rp.serverInSync), + getStandardUpdateStep( + rp, + rp.condManager, + rp.serverInSync, + []Step{ + getStandardStartRebuildStep(rp, rp.server.removePods), + getStandardWaitPodsRemovedStep(rp, rp.server.arePodsRemoved), + getStandardPodsCreateStep(rp, rp.doServerSync), + getStandardWaiRebuildFinishedStep(rp, rp.serverInSync), + }, + ), + }, + } +} diff --git a/pkg/components/scheduler.go b/pkg/components/scheduler.go index 629806e0..46d8617c 100644 --- a/pkg/components/scheduler.go +++ b/pkg/components/scheduler.go @@ -108,12 +108,11 @@ func (s *Scheduler) Fetch(ctx context.Context) error { } func (s *Scheduler) Status(ctx context.Context) (ComponentStatus, error) { - return s.doSync(ctx, true) + return flowToStatus(ctx, s, s.getFlow(), s.condManager) } func (s *Scheduler) Sync(ctx context.Context) error { - _, err := s.doSync(ctx, false) - return err + return flowToSync(ctx, s.getFlow(), s.condManager) } func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { @@ -196,6 +195,28 @@ func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return s.initOpAchieve(ctx, dry) } +func (s *Scheduler) doServerSync(ctx context.Context) error { + if s.secret.NeedSync(consts.TokenSecretKey, "") { + secretSpec := s.secret.Build() + secretSpec.StringData = map[string]string{ + consts.TokenSecretKey: ytconfig.RandString(30), + } + if err := s.secret.Sync(ctx); err != nil { + return err + } + } + return s.server.Sync(ctx) +} + +func (s *Scheduler) serverInSync(ctx context.Context) (bool, error) { + srvInSync, err := s.server.inSync(ctx) + if err != nil { + return false, err + } + secretNeedSync := s.secret.NeedSync(consts.TokenSecretKey, "") + return srvInSync && !secretNeedSync, nil +} + func (s *Scheduler) initOpAchieve(ctx context.Context, dry bool) (ComponentStatus, error) { if !dry { s.initUser.SetInitScript(s.createInitUserScript()) @@ -218,7 +239,7 @@ func (s *Scheduler) initOpAchieve(ctx context.Context, dry bool) (ComponentStatu } if !dry { - s.prepareInitOperationsArchive() + s.prepareInitOperationsArchive(s.initOpArchive) } return s.initOpArchive.Sync(ctx, dry) } @@ -302,7 +323,7 @@ export INIT_OP_ARCHIVE=/usr/bin/init_operation_archive fi ` -func (s *Scheduler) prepareInitOperationsArchive() { +func (s *Scheduler) prepareInitOperationsArchive(job *InitJob) { script := []string{ initJobWithNativeDriverPrologue(), setInitOpArchivePath, @@ -311,8 +332,8 @@ func (s *Scheduler) prepareInitOperationsArchive() { SetWithIgnoreExisting("//sys/cluster_nodes/@config", "'{\"%true\" = {job_agent={enable_job_reporter=%true}}}'"), } - s.initOpArchive.SetInitScript(strings.Join(script, "\n")) - job := s.initOpArchive.Build() - container := &job.Spec.Template.Spec.Containers[0] + job.SetInitScript(strings.Join(script, "\n")) + batchJob := s.initOpArchive.Build() + container := &batchJob.Spec.Template.Spec.Containers[0] container.EnvFrom = []corev1.EnvFromSource{s.secret.GetEnvSource()} } diff --git a/pkg/components/scheduler_flow.go b/pkg/components/scheduler_flow.go new file mode 100644 index 00000000..0f1e0913 --- /dev/null +++ b/pkg/components/scheduler_flow.go @@ -0,0 +1,69 @@ +package components + +import ( + "context" +) + +var ( + schedulerUpdateOpArchivePrepareStartedCond = isTrue("UpdateOpArchivePrepareStarted") + schedulerUpdateOpArchivePrepareFinishedCond = isTrue("UpdateOpArchivePrepareFinished") + schedulerUpdateOpArchiveFinishedCond = isTrue("UpdateOpArchiveFinished") +) + +func (s *Scheduler) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(s, s.doServerSync), + getStandardWaitBuildFinishedStep(s, s.serverInSync), + getStandardInitFinishedStep(s, func(ctx context.Context) (ok bool, err error) { + s.initUser.SetInitScript(s.createInitUserScript()) + st, err := s.initUser.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }), + getStandardUpdateStep( + s, + s.condManager, + s.serverInSync, + []Step{ + getStandardStartRebuildStep(s, s.server.removePods), + getStandardWaitPodsRemovedStep(s, s.server.arePodsRemoved), + getStandardPodsCreateStep(s, s.doServerSync), + getStandardWaiRebuildFinishedStep(s, s.serverInSync), + // TODO: Suppose this job should be done once in init also. + StepRun{ + StepMeta: StepMeta{ + Name: "StartPrepareUpdateOpArchive", + RunIfCondition: not(schedulerUpdateOpArchivePrepareStartedCond), + OnSuccessCondition: schedulerUpdateOpArchivePrepareStartedCond, + }, + Body: func(ctx context.Context) error { + return s.initOpArchive.prepareRestart(ctx, false) + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: "WaitUpdateOpArchivePrepared", + RunIfCondition: not(schedulerUpdateOpArchivePrepareFinishedCond), + OnSuccessCondition: schedulerUpdateOpArchivePrepareFinishedCond, + }, + Body: func(ctx context.Context) (bool, error) { + return s.initOpArchive.isRestartPrepared(), nil + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: "WaitUpdateOpArchive", + RunIfCondition: not(schedulerUpdateOpArchiveFinishedCond), + OnSuccessCondition: schedulerUpdateOpArchiveFinishedCond, + }, + Body: func(ctx context.Context) (ok bool, err error) { + s.prepareInitOperationsArchive(s.initOpArchive) + st, err := s.initOpArchive.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }, + }, + }, + ), + }, + } +} diff --git a/pkg/components/scheduler_local_test.go b/pkg/components/scheduler_local_test.go new file mode 100644 index 00000000..ba779ad7 --- /dev/null +++ b/pkg/components/scheduler_local_test.go @@ -0,0 +1,24 @@ +package components + +import ( + "testing" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +func TestSchedulerFlow(t *testing.T) { + testComponentFlow( + t, + "sch", + "scheduler", + "", + func(cfgen *ytconfig.Generator, ytsaurus *apiProxy.Ytsaurus) Component { + return NewScheduler(cfgen, ytsaurus, nil, nil, nil) + }, + func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.Schedulers.Image = image + }, + ) +} diff --git a/pkg/components/server.go b/pkg/components/server.go index 529c1f4c..2d483c41 100644 --- a/pkg/components/server.go +++ b/pkg/components/server.go @@ -35,6 +35,7 @@ type server interface { needSync() bool buildStatefulSet() *appsv1.StatefulSet rebuildStatefulSet() *appsv1.StatefulSet + inSync(ctx context.Context) (bool, error) } type serverImpl struct { @@ -221,6 +222,36 @@ func (s *serverImpl) needUpdate() bool { return needReload } +// hasDiff checks if any is different from the desired state. +// Currently, it only triggered if +// - any of sub-resources are missing, +// - sts have actual image +// - config needs reload +// +// In the future we want it to check more. +func (s *serverImpl) hasDiff(ctx context.Context) (bool, error) { + if !s.exists() { + return true, nil + } + if !s.arePodsReady(ctx) { + return true, nil + } + if !s.podsImageCorrespondsToSpec() { + return true, nil + } + + needReload, err := s.configHelper.NeedReload() + if err != nil { + return false, err + } + return needReload, nil +} + +func (s *serverImpl) inSync(ctx context.Context) (bool, error) { + diff, err := s.hasDiff(ctx) + return !diff, err +} + func (s *serverImpl) arePodsReady(ctx context.Context) bool { return s.statefulSet.ArePodsReady(ctx, s.instanceSpec.MinReadyInstanceCount) } diff --git a/pkg/components/strawberry_controller.go b/pkg/components/strawberry_controller.go index 2b477d5f..117f57a9 100644 --- a/pkg/components/strawberry_controller.go +++ b/pkg/components/strawberry_controller.go @@ -316,11 +316,30 @@ func (c *StrawberryController) doSync(ctx context.Context, dry bool) (ComponentS return SimpleStatus(SyncStatusReady), err } +func (c *StrawberryController) doServerSync(ctx context.Context) error { + if c.secret.NeedSync(consts.TokenSecretKey, "") { + s := c.secret.Build() + s.StringData = map[string]string{ + consts.TokenSecretKey: ytconfig.RandString(30), + } + if err := c.secret.Sync(ctx); err != nil { + return err + } + } + return c.syncComponents(ctx) +} + +func (c *StrawberryController) serverInSync(ctx context.Context) (bool, error) { + // TODO: check diff between microservice needSync & server in sync + srvInSync := c.microservice.needSync() + secretNeedSync := c.secret.NeedSync(consts.TokenSecretKey, "") + return srvInSync && !secretNeedSync, nil +} + func (c *StrawberryController) Status(ctx context.Context) (ComponentStatus, error) { - return c.doSync(ctx, true) + return flowToStatus(ctx, c, c.getFlow(), c.condManager) } func (c *StrawberryController) Sync(ctx context.Context) error { - _, err := c.doSync(ctx, false) - return err + return flowToSync(ctx, c.getFlow(), c.condManager) } diff --git a/pkg/components/strawberry_controller_flow.go b/pkg/components/strawberry_controller_flow.go new file mode 100644 index 00000000..77fddfb8 --- /dev/null +++ b/pkg/components/strawberry_controller_flow.go @@ -0,0 +1,42 @@ +package components + +import ( + "context" +) + +func (c *StrawberryController) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(c, c.doServerSync), + getStandardWaitBuildFinishedStep(c, c.serverInSync), + StepCheck{ + StepMeta: StepMeta{ + Name: "InitUserAndUrl", + RunIfCondition: not(initializationStarted(c.GetName())), + OnSuccessCondition: initializationStarted(c.GetName()), + }, + Body: func(ctx context.Context) (ok bool, err error) { + c.initUserAndUrlJob.SetInitScript(c.createInitUserAndUrlScript()) + st, err := c.initUserAndUrlJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }, + }, + getStandardInitFinishedStep(c, func(ctx context.Context) (ok bool, err error) { + c.initChytClusterJob.SetInitScript(c.createInitChytClusterScript()) + st, err := c.initChytClusterJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }), + getStandardUpdateStep( + c, + c.condManager, + c.serverInSync, + []Step{ + getStandardStartRebuildStep(c, c.microservice.removePods), + getStandardWaitPodsRemovedStep(c, c.microservice.arePodsRemoved), + getStandardPodsCreateStep(c, c.doServerSync), + getStandardWaiRebuildFinishedStep(c, c.serverInSync), + }, + ), + }, + } +} diff --git a/pkg/components/suite_test.go b/pkg/components/suite_test.go index 0e0b678b..951d1b13 100644 --- a/pkg/components/suite_test.go +++ b/pkg/components/suite_test.go @@ -13,6 +13,7 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" mock_yt "github.com/ytsaurus/yt-k8s-operator/pkg/mock" ) @@ -109,6 +110,10 @@ func (fs *FakeServer) Sync(ctx context.Context) error { return nil } +func (fs *FakeServer) inSync(ctx context.Context) (bool, error) { + return true, nil +} + func (fs *FakeServer) buildStatefulSet() *appsv1.StatefulSet { return nil } @@ -141,6 +146,19 @@ func (fyc *FakeYtsaurusClient) GetYtClient() yt.Client { return fyc.client } +func (fyc *FakeYtsaurusClient) GetTabletCells(context.Context) ([]ytv1.TabletCellBundleInfo, error) { + return []ytv1.TabletCellBundleInfo{}, nil +} +func (fyc *FakeYtsaurusClient) RemoveTabletCells(context.Context) error { + return nil +} +func (fyc *FakeYtsaurusClient) RecoverTableCells(context.Context, []ytv1.TabletCellBundleInfo) error { + return nil +} +func (fyc *FakeYtsaurusClient) AreTabletCellsRemoved(context.Context) (bool, error) { + return true, nil +} + func (fyc *FakeYtsaurusClient) SetStatus(status ComponentStatus) { fyc.status = status } diff --git a/pkg/components/tablet_node.go b/pkg/components/tablet_node.go index d06840fd..4dc6e6cd 100644 --- a/pkg/components/tablet_node.go +++ b/pkg/components/tablet_node.go @@ -21,11 +21,24 @@ import ( const SysBundle string = "sys" const DefaultBundle string = "default" +type ytsaurusClientForTabletNodes interface { + GetYtClient() yt.Client + + GetTabletCells(context.Context) ([]ytv1.TabletCellBundleInfo, error) + RemoveTabletCells(context.Context) error + RecoverTableCells(context.Context, []ytv1.TabletCellBundleInfo) error + AreTabletCellsRemoved(context.Context) (bool, error) + + // TODO (l0kix2): remove later. + Status(ctx context.Context) (ComponentStatus, error) + GetName() string +} + type TabletNode struct { localServerComponent cfgen *ytconfig.NodeGenerator - ytsaurusClient internalYtsaurusClient + ytsaurusClient ytsaurusClientForTabletNodes initBundlesCondition string spec ytv1.TabletNodesSpec @@ -35,7 +48,7 @@ type TabletNode struct { func NewTabletNode( cfgen *ytconfig.NodeGenerator, ytsaurus *apiproxy.Ytsaurus, - ytsaurusClient internalYtsaurusClient, + ytsaurusClient ytsaurusClientForTabletNodes, spec ytv1.TabletNodesSpec, doInitiailization bool, ) *TabletNode { @@ -207,6 +220,82 @@ func (tn *TabletNode) doSync(ctx context.Context, dry bool) (ComponentStatus, er return WaitingStatus(SyncStatusPending, fmt.Sprintf("setting %s condition", tn.initBundlesCondition)), err } +func (tn *TabletNode) initializeBundles(ctx context.Context) error { + ytClient := tn.ytsaurusClient.GetYtClient() + + if exists, err := ytClient.NodeExists(ctx, ypath.Path(fmt.Sprintf("//sys/tablet_cell_bundles/%s", SysBundle)), nil); err == nil { + if !exists { + options := map[string]string{ + "changelog_account": "sys", + "snapshot_account": "sys", + } + + bootstrap := tn.getBundleBootstrap(SysBundle) + if bootstrap != nil { + if bootstrap.ChangelogPrimaryMedium != nil { + options["changelog_primary_medium"] = *bootstrap.ChangelogPrimaryMedium + } + if bootstrap.SnapshotPrimaryMedium != nil { + options["snapshot_primary_medium"] = *bootstrap.SnapshotPrimaryMedium + } + } + + _, err = ytClient.CreateObject(ctx, yt.NodeTabletCellBundle, &yt.CreateObjectOptions{ + Attributes: map[string]interface{}{ + "name": SysBundle, + "options": options, + }, + }) + + if err != nil { + return fmt.Errorf("creating tablet_cell_bundle failed: %w", err) + } + } + } else { + return err + } + + defaultBundleBootstrap := tn.getBundleBootstrap(DefaultBundle) + if defaultBundleBootstrap != nil { + path := ypath.Path(fmt.Sprintf("//sys/tablet_cell_bundles/%s", DefaultBundle)) + if defaultBundleBootstrap.ChangelogPrimaryMedium != nil { + err := ytClient.SetNode(ctx, path.Attr("options/changelog_primary_medium"), *defaultBundleBootstrap.ChangelogPrimaryMedium, nil) + if err != nil { + return fmt.Errorf("setting changelog_primary_medium for `default` bundle failed: %w", err) + } + } + + if defaultBundleBootstrap.SnapshotPrimaryMedium != nil { + err := ytClient.SetNode(ctx, path.Attr("options/snapshot_primary_medium"), *defaultBundleBootstrap.SnapshotPrimaryMedium, nil) + if err != nil { + return fmt.Errorf("Setting snapshot_primary_medium for `default` bundle failed: %w", err) + } + } + } + + for _, bundle := range []string{DefaultBundle, SysBundle} { + tabletCellCount := 1 + bootstrap := tn.getBundleBootstrap(bundle) + if bootstrap != nil { + tabletCellCount = bootstrap.TabletCellCount + } + err := CreateTabletCells(ctx, ytClient, bundle, tabletCellCount) + if err != nil { + return err + } + } + + return nil + + //tn.ytsaurus.SetStatusCondition(metav1.Condition{ + // Type: tn.initBundlesCondition, + // Status: metav1.ConditionTrue, + // Reason: "InitBundlesCompleted", + // Message: "Init bundles successfully completed", + //}) + +} + func (tn *TabletNode) getBundleBootstrap(bundle string) *ytv1.BundleBootstrapSpec { resource := tn.ytsaurus.GetResource() if resource.Spec.Bootstrap == nil || resource.Spec.Bootstrap.TabletCellBundles == nil { @@ -225,14 +314,12 @@ func (tn *TabletNode) getBundleBootstrap(bundle string) *ytv1.BundleBootstrapSpe } func (tn *TabletNode) Status(ctx context.Context) (ComponentStatus, error) { - return tn.doSync(ctx, true) + return flowToStatus(ctx, tn, tn.getFlow(), tn.condManager) } func (tn *TabletNode) Sync(ctx context.Context) error { - _, err := tn.doSync(ctx, false) - return err + return flowToSync(ctx, tn.getFlow(), tn.condManager) } - func (tn *TabletNode) Fetch(ctx context.Context) error { return resources.Fetch(ctx, tn.server) } diff --git a/pkg/components/tablet_node_flow.go b/pkg/components/tablet_node_flow.go new file mode 100644 index 00000000..54b2c606 --- /dev/null +++ b/pkg/components/tablet_node_flow.go @@ -0,0 +1,94 @@ +package components + +import ( + "context" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" +) + +var ( + tnTabletCellsBackupStartedCond = isTrue("TabletCellsBackupStarted") + tnTabletCellsBackupFinishedCond = isTrue("TabletCellsBackupFinished") + tnTabletCellsRecoveredCond = isTrue("TabletCellsRecovered") +) + +func (tn *TabletNode) getFlow() Step { + name := tn.GetName() + initFinishedCond := initializationFinished(name) + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(tn, tn.server.Sync), + getStandardWaitBuildFinishedStep(tn, tn.server.inSync), + StepRun{ + StepMeta: StepMeta{ + Name: StepInitFinished, + RunIfCondition: not(initFinishedCond), + OnSuccessCondition: initFinishedCond, + }, + Body: func(ctx context.Context) error { + if !tn.doInitialization { + return nil + } + return tn.initializeBundles(ctx) + }, + }, + getStandardUpdateStep( + tn, + tn.condManager, + tn.server.inSync, + []Step{ + StepRun{ + StepMeta: StepMeta{ + Name: "SaveTabletCellBundles", + RunIfCondition: not(tnTabletCellsBackupStartedCond), + OnSuccessCondition: tnTabletCellsBackupStartedCond, + }, + Body: func(ctx context.Context) error { + bundles, err := tn.ytsaurusClient.GetTabletCells(ctx) + if err != nil { + return err + } + if err = tn.storeTabletCellBundles(ctx, bundles); err != nil { + return err + } + return tn.ytsaurusClient.RemoveTabletCells(ctx) + }, + }, + StepCheck{ + StepMeta: StepMeta{ + Name: "CheckTabletCellsRemoved", + RunIfCondition: not(tnTabletCellsBackupFinishedCond), + OnSuccessCondition: tnTabletCellsBackupFinishedCond, + }, + Body: func(ctx context.Context) (ok bool, err error) { + return tn.ytsaurusClient.AreTabletCellsRemoved(ctx) + }, + }, + getStandardStartRebuildStep(tn, tn.server.removePods), + getStandardWaitPodsRemovedStep(tn, tn.server.arePodsRemoved), + getStandardPodsCreateStep(tn, tn.server.Sync), + getStandardWaiRebuildFinishedStep(tn, tn.server.inSync), + StepRun{ + StepMeta: StepMeta{ + Name: "RecoverTableCells", + RunIfCondition: not(tnTabletCellsRecoveredCond), + OnSuccessCondition: tnTabletCellsRecoveredCond, + }, + Body: func(ctx context.Context) error { + bundles := tn.getStoredTabletCellBundles() + return tn.ytsaurusClient.RecoverTableCells(ctx, bundles) + }, + }, + }, + ), + }, + } +} + +func (tn *TabletNode) storeTabletCellBundles(ctx context.Context, bundles []ytv1.TabletCellBundleInfo) error { + return tn.stateManager.SetTabletCellBundles(ctx, bundles) +} + +func (tn *TabletNode) getStoredTabletCellBundles() []ytv1.TabletCellBundleInfo { + return tn.stateManager.GetTabletCellBundles() +} diff --git a/pkg/components/tablet_nodes_local_test.go b/pkg/components/tablet_nodes_local_test.go new file mode 100644 index 00000000..4e36c6da --- /dev/null +++ b/pkg/components/tablet_nodes_local_test.go @@ -0,0 +1,68 @@ +package components + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "go.ytsaurus.tech/yt/go/yt" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + mock_yt "github.com/ytsaurus/yt-k8s-operator/pkg/mock" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +type fakeYtsaurusForTnd struct { + mockClient *mock_yt.MockClient +} + +func (y *fakeYtsaurusForTnd) GetYtClient() yt.Client { + return y.mockClient +} +func (y *fakeYtsaurusForTnd) GetName() string { return "fakeYtClient" } +func (y *fakeYtsaurusForTnd) Status(_ context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (y *fakeYtsaurusForTnd) GetTabletCells(context.Context) ([]ytv1.TabletCellBundleInfo, error) { + return []ytv1.TabletCellBundleInfo{}, nil +} +func (y *fakeYtsaurusForTnd) RemoveTabletCells(context.Context) error { + return nil +} +func (y *fakeYtsaurusForTnd) RecoverTableCells(context.Context, []ytv1.TabletCellBundleInfo) error { + return nil +} +func (y *fakeYtsaurusForTnd) AreTabletCellsRemoved(context.Context) (bool, error) { + return true, nil +} + +func TestTabletNodesFlow(t *testing.T) { + ctrl = gomock.NewController(t) + ytCli := mock_yt.NewMockClient(ctrl) + //ytCli.EXPECT().NodeExists(context.Background(), ypath.Path("//sys/tablet_cell_bundles/sys"), nil).Return(true, nil) + //count := 0 + //ytCli.EXPECT().GetNode(context.Background(), ypath.Path("//sys/tablet_cell_bundles/default/@tablet_cell_count"), &count, nil).Return(nil) + + testComponentFlow( + t, + "tnd", + "tablet-node", + "-tn-1", + func(_ *ytconfig.Generator, ytsaurus *apiProxy.Ytsaurus) Component { + cfgen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), domain) + return NewTabletNode( + cfgen, + ytsaurus, + &fakeYtsaurusForTnd{ytCli}, + ytsaurus.GetResource().Spec.TabletNodes[0], + // TODO: doInitialization: true + false, + ) + }, + func(ytsaurus *ytv1.Ytsaurus, image *string) { + ytsaurus.Spec.TabletNodes[0].Image = image + }, + ) +} diff --git a/pkg/components/tcpproxy.go b/pkg/components/tcpproxy.go index d7a689bb..cbb48927 100644 --- a/pkg/components/tcpproxy.go +++ b/pkg/components/tcpproxy.go @@ -133,11 +133,28 @@ func (tp *TcpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } +func (tp *TcpProxy) doServerSync(ctx context.Context) error { + err := tp.server.Sync(ctx) + if err != nil { + return err + } + tp.balancingService.Build() + return tp.balancingService.Sync(ctx) +} + +func (tp *TcpProxy) serverInSync(ctx context.Context) (bool, error) { + srvInSync, err := tp.server.inSync(ctx) + if err != nil { + return false, err + } + balancerExists := resources.Exists(tp.balancingService) + return srvInSync && balancerExists, nil +} + func (tp *TcpProxy) Status(ctx context.Context) (ComponentStatus, error) { - return tp.doSync(ctx, true) + return flowToStatus(ctx, tp, tp.getFlow(), tp.condManager) } func (tp *TcpProxy) Sync(ctx context.Context) error { - _, err := tp.doSync(ctx, false) - return err + return flowToSync(ctx, tp.getFlow(), tp.condManager) } diff --git a/pkg/components/tcpproxy_flow.go b/pkg/components/tcpproxy_flow.go new file mode 100644 index 00000000..b16d8fc9 --- /dev/null +++ b/pkg/components/tcpproxy_flow.go @@ -0,0 +1,21 @@ +package components + +func (tp *TcpProxy) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(tp, tp.doServerSync), + getStandardWaitBuildFinishedStep(tp, tp.serverInSync), + getStandardUpdateStep( + tp, + tp.condManager, + tp.serverInSync, + []Step{ + getStandardStartRebuildStep(tp, tp.server.removePods), + getStandardWaitPodsRemovedStep(tp, tp.server.arePodsRemoved), + getStandardPodsCreateStep(tp, tp.doServerSync), + getStandardWaiRebuildFinishedStep(tp, tp.serverInSync), + }, + ), + }, + } +} diff --git a/pkg/components/ui.go b/pkg/components/ui.go index 1730365d..c768063b 100644 --- a/pkg/components/ui.go +++ b/pkg/components/ui.go @@ -313,11 +313,32 @@ func (u *UI) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { return SimpleStatus(SyncStatusReady), err } +func (u *UI) doServerSync(ctx context.Context) error { + if u.secret.NeedSync(consts.TokenSecretKey, "") { + token := ytconfig.RandString(30) + s := u.secret.Build() + s.StringData = map[string]string{ + consts.UISecretFileName: fmt.Sprintf("{\"oauthToken\" : \"%s\"}", token), + consts.TokenSecretKey: token, + } + if err := u.secret.Sync(ctx); err != nil { + return err + } + } + return u.syncComponents(ctx) +} + +func (u *UI) serverInSync(ctx context.Context) (bool, error) { + // TODO: check diff between microservice needSync & server in sync + srvInSync := u.microservice.needSync() + secretNeedSync := u.secret.NeedSync(consts.TokenSecretKey, "") + return srvInSync && !secretNeedSync, nil +} + func (u *UI) Status(ctx context.Context) (ComponentStatus, error) { - return u.doSync(ctx, true) + return flowToStatus(ctx, u, u.getFlow(), u.condManager) } func (u *UI) Sync(ctx context.Context) error { - _, err := u.doSync(ctx, false) - return err + return flowToSync(ctx, u.getFlow(), u.condManager) } diff --git a/pkg/components/ui_flow.go b/pkg/components/ui_flow.go new file mode 100644 index 00000000..844e1445 --- /dev/null +++ b/pkg/components/ui_flow.go @@ -0,0 +1,30 @@ +package components + +import ( + "context" +) + +func (u *UI) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(u, u.doServerSync), + getStandardWaitBuildFinishedStep(u, u.serverInSync), + getStandardInitFinishedStep(u, func(ctx context.Context) (ok bool, err error) { + u.initJob.SetInitScript(u.createInitScript()) + st, err := u.initJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }), + getStandardUpdateStep( + u, + u.condManager, + u.serverInSync, + []Step{ + getStandardStartRebuildStep(u, u.microservice.removePods), + getStandardWaitPodsRemovedStep(u, u.microservice.arePodsRemoved), + getStandardPodsCreateStep(u, u.doServerSync), + getStandardWaiRebuildFinishedStep(u, u.serverInSync), + }, + ), + }, + } +} diff --git a/pkg/components/yql_agent.go b/pkg/components/yql_agent.go index 34296d2a..6b90c14b 100644 --- a/pkg/components/yql_agent.go +++ b/pkg/components/yql_agent.go @@ -181,11 +181,42 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er return yqla.initEnvironment.Sync(ctx, dry) } +func (yqla *YqlAgent) doServerSync(ctx context.Context) error { + if yqla.secret.NeedSync(consts.TokenSecretKey, "") { + secretSpec := yqla.secret.Build() + secretSpec.StringData = map[string]string{ + consts.TokenSecretKey: ytconfig.RandString(30), + } + if err := yqla.secret.Sync(ctx); err != nil { + return err + } + } + ss := yqla.server.buildStatefulSet() + container := &ss.Spec.Template.Spec.Containers[0] + container.EnvFrom = []corev1.EnvFromSource{yqla.secret.GetEnvSource()} + if yqla.ytsaurus.GetResource().Spec.UseIPv6 && !yqla.ytsaurus.GetResource().Spec.UseIPv4 { + container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "1"}} + } else if !yqla.ytsaurus.GetResource().Spec.UseIPv6 && yqla.ytsaurus.GetResource().Spec.UseIPv4 { + container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "1"}, {Name: "YT_FORCE_IPV6", Value: "0"}} + } else { + container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "0"}} + } + return yqla.server.Sync(ctx) +} + +func (yqla *YqlAgent) serverInSync(ctx context.Context) (bool, error) { + srvInSync, err := yqla.server.inSync(ctx) + if err != nil { + return false, err + } + secretNeedSync := yqla.secret.NeedSync(consts.TokenSecretKey, "") + return srvInSync && !secretNeedSync, nil +} + func (yqla *YqlAgent) Status(ctx context.Context) (ComponentStatus, error) { - return yqla.doSync(ctx, true) + return flowToStatus(ctx, yqla, yqla.getFlow(), yqla.condManager) } func (yqla *YqlAgent) Sync(ctx context.Context) error { - _, err := yqla.doSync(ctx, false) - return err + return flowToSync(ctx, yqla.getFlow(), yqla.condManager) } diff --git a/pkg/components/yql_agent_flow.go b/pkg/components/yql_agent_flow.go new file mode 100644 index 00000000..5be866db --- /dev/null +++ b/pkg/components/yql_agent_flow.go @@ -0,0 +1,30 @@ +package components + +import ( + "context" +) + +func (yqla *YqlAgent) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(yqla, yqla.doServerSync), + getStandardWaitBuildFinishedStep(yqla, yqla.serverInSync), + getStandardInitFinishedStep(yqla, func(ctx context.Context) (ok bool, err error) { + yqla.initEnvironment.SetInitScript(yqla.createInitScript()) + st, err := yqla.initEnvironment.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }), + getStandardUpdateStep( + yqla, + yqla.condManager, + yqla.serverInSync, + []Step{ + getStandardStartRebuildStep(yqla, yqla.server.removePods), + getStandardWaitPodsRemovedStep(yqla, yqla.server.arePodsRemoved), + getStandardPodsCreateStep(yqla, yqla.doServerSync), + getStandardWaiRebuildFinishedStep(yqla, yqla.serverInSync), + }, + ), + }, + } +} diff --git a/pkg/components/ytsaurus_client.go b/pkg/components/ytsaurus_client.go index 0ee7f31f..1ef42971 100644 --- a/pkg/components/ytsaurus_client.go +++ b/pkg/components/ytsaurus_client.go @@ -78,11 +78,19 @@ func (yc *YtsaurusClient) IsUpdatable() bool { func (yc *YtsaurusClient) GetType() consts.ComponentType { return consts.YtsaurusClientType } func (yc *YtsaurusClient) Fetch(ctx context.Context) error { - return resources.Fetch(ctx, + if err := resources.Fetch(ctx, yc.secret, yc.initUserJob, yc.httpProxy, - ) + ); err != nil { + return err + } + if yc.ytClient == nil && !yc.secret.NeedSync(consts.TokenSecretKey, "") { + if err := yc.doInit(); err != nil { + return fmt.Errorf("failed to init yt client: %w", err) + } + } + return nil } func (yc *YtsaurusClient) createInitUserScript() string { @@ -399,13 +407,43 @@ func (yc *YtsaurusClient) doSync(ctx context.Context, dry bool) (ComponentStatus return SimpleStatus(SyncStatusReady), err } +func (yc *YtsaurusClient) doInit() error { + token, _ := yc.secret.GetValue(consts.TokenSecretKey) + timeout := time.Second * 10 + proxy, ok := os.LookupEnv("YTOP_PROXY") + disableProxyDiscovery := true + if !ok { + proxy = yc.cfgen.GetHTTPProxiesAddress(consts.DefaultHTTPProxyRole) + disableProxyDiscovery = false + } + var err error + yc.ytClient, err = ythttp.NewClient(&yt.Config{ + Proxy: proxy, + Token: token, + LightRequestTimeout: &timeout, + DisableProxyDiscovery: disableProxyDiscovery, + }) + return err +} + +func (yc *YtsaurusClient) doKubeSync(ctx context.Context) error { + s := yc.secret.Build() + s.StringData = map[string]string{ + consts.TokenSecretKey: ytconfig.RandString(30), + } + return yc.secret.Sync(ctx) +} + +func (yc *YtsaurusClient) isInSync(_ context.Context) (bool, error) { + return !yc.secret.NeedSync(consts.TokenSecretKey, ""), nil +} + func (yc *YtsaurusClient) Status(ctx context.Context) (ComponentStatus, error) { - return yc.doSync(ctx, true) + return flowToStatus(ctx, yc, yc.getFlow(), yc.condManager) } func (yc *YtsaurusClient) Sync(ctx context.Context) error { - _, err := yc.doSync(ctx, false) - return err + return flowToSync(ctx, yc.getFlow(), yc.condManager) } func (yc *YtsaurusClient) GetYtClient() yt.Client { diff --git a/pkg/components/ytsaurus_client_flow.go b/pkg/components/ytsaurus_client_flow.go new file mode 100644 index 00000000..83ac4459 --- /dev/null +++ b/pkg/components/ytsaurus_client_flow.go @@ -0,0 +1,31 @@ +package components + +import ( + "context" +) + +func (yc *YtsaurusClient) getFlow() Step { + return StepComposite{ + Body: []Step{ + getStandardStartBuildStep(yc, yc.doKubeSync), + getStandardWaitBuildFinishedStep(yc, yc.isInSync), + getStandardInitFinishedStep( + yc, + func(ctx context.Context) (ok bool, err error) { + yc.initUserJob.SetInitScript(yc.createInitUserScript()) + st, err := yc.initUserJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }, + ), + getStandardUpdateStep( + yc, + yc.condManager, + yc.isInSync, + []Step{ + getStandardStartRebuildStep(yc, yc.doKubeSync), + getStandardWaiRebuildFinishedStep(yc, yc.isInSync), + }, + ), + }, + } +} diff --git a/pkg/components/ytsaurus_client_local_test.go b/pkg/components/ytsaurus_client_local_test.go new file mode 100644 index 00000000..f67d26b6 --- /dev/null +++ b/pkg/components/ytsaurus_client_local_test.go @@ -0,0 +1,16 @@ +package components + +import ( + "testing" +) + +func TestYtsaurusClientFlow(t *testing.T) { + namespace := "ytclient" + h, ytsaurus, cfgen := prepareTest(t, namespace) + defer h.Stop() + + component := NewYtsaurusClient(cfgen, ytsaurus, nil) + syncUntilReady(t, h, component) + + // TODO: test update flow. +} diff --git a/pkg/testutil/builders.go b/pkg/testutil/builders.go index cf778d85..7ec7c55a 100644 --- a/pkg/testutil/builders.go +++ b/pkg/testutil/builders.go @@ -3,17 +3,17 @@ package testutil import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ptr "k8s.io/utils/pointer" ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" - "github.com/ytsaurus/yt-k8s-operator/pkg/consts" ) const ( testYtsaurusImage = "test-ytsaurus-image" dndsNameOne = "dn-1" + tndsNameOne = "tn-1" ) +// BuildMinimalYtsaurus builds not minimal anymore, should be splitted. func BuildMinimalYtsaurus(namespace, name string) ytv1.Ytsaurus { return ytv1.Ytsaurus{ ObjectMeta: v1.ObjectMeta{Namespace: namespace, Name: name}, @@ -27,14 +27,20 @@ func BuildMinimalYtsaurus(namespace, name string) ytv1.Ytsaurus { Discovery: ytv1.DiscoverySpec{ InstanceSpec: ytv1.InstanceSpec{ - InstanceCount: 3, - MonitoringPort: ptr.Int32(consts.DiscoveryMonitoringPort), + InstanceCount: 3, + //MonitoringPort: ptr.Int32(consts.DiscoveryMonitoringPort), + }, + }, + Schedulers: &ytv1.SchedulersSpec{ + InstanceSpec: ytv1.InstanceSpec{ + InstanceCount: 1, + //MonitoringPort: ptr.Int32(consts.SchedulerMonitoringPort), }, }, PrimaryMasters: ytv1.MastersSpec{ InstanceSpec: ytv1.InstanceSpec{ - InstanceCount: 3, - MonitoringPort: ptr.Int32(consts.MasterMonitoringPort), + InstanceCount: 3, + //MonitoringPort: ptr.Int32(consts.MasterMonitoringPort), Locations: []ytv1.LocationSpec{ { LocationType: "MasterChangelogs", @@ -56,6 +62,14 @@ func BuildMinimalYtsaurus(namespace, name string) ytv1.Ytsaurus { ServiceType: corev1.ServiceTypeNodePort, }, }, + TabletNodes: []ytv1.TabletNodesSpec{ + { + InstanceSpec: ytv1.InstanceSpec{ + InstanceCount: 3, + }, + Name: tndsNameOne, + }, + }, DataNodes: []ytv1.DataNodesSpec{ { InstanceSpec: ytv1.InstanceSpec{ @@ -71,6 +85,29 @@ func BuildMinimalYtsaurus(namespace, name string) ytv1.Ytsaurus { Name: dndsNameOne, }, }, + ExecNodes: []ytv1.ExecNodesSpec{ + { + InstanceSpec: ytv1.InstanceSpec{ + InstanceCount: 1, + Locations: []ytv1.LocationSpec{ + { + LocationType: "ChunkCache", + Path: "/yt/node-data/chunk-cache", + }, + { + LocationType: "Slots", + Path: "/yt/node-data/slots", + }, + }, + }, + }, + }, + ControllerAgents: &ytv1.ControllerAgentsSpec{ + InstanceSpec: ytv1.InstanceSpec{ + InstanceCount: 1, + //MonitoringPort: ptr.Int32(consts.ControllerAgentMonitoringPort), + }, + }, }, } } diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 1c73fa8d..5a2b5e9f 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -63,7 +63,9 @@ func TestAPIs(t *testing.T) { } var _ = SynchronizedBeforeSuite(func(ctx context.Context) []byte { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + logger := zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)) + logf.SetLogger(logger) + ctx = logf.IntoContext(ctx, logger) By("bootstrapping test environment") cfg, err := config.GetConfig() @@ -84,7 +86,9 @@ var _ = SynchronizedBeforeSuite(func(ctx context.Context) []byte { // Cannot serialize rest config here - just load again in each process and check host to be sure. return []byte(cfg.Host) }, func(ctx context.Context, host []byte) { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + logger := zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)) + logf.SetLogger(logger) + ctx = logf.IntoContext(ctx, logger) By("bootstrapping k8s client") diff --git a/test/e2e/ytsaurus_controller_test.go b/test/e2e/ytsaurus_controller_test.go index 0963b925..df46449a 100644 --- a/test/e2e/ytsaurus_controller_test.go +++ b/test/e2e/ytsaurus_controller_test.go @@ -32,7 +32,7 @@ const ( pollInterval = time.Millisecond * 250 reactionTimeout = time.Second * 150 bootstrapTimeout = time.Minute * 3 - upgradeTimeout = time.Minute * 5 + upgradeTimeout = time.Minute * 10 ) func getYtClient(g *ytconfig.Generator, namespace string) yt.Client { @@ -282,7 +282,7 @@ var _ = Describe("Basic test for Ytsaurus controller", func() { Expect(k8sClient.Update(ctx, ytsaurus)).Should(Succeed()) By("Waiting PossibilityCheck") - EventuallyYtsaurus(ctx, ytsaurusKey, reactionTimeout).Should(HaveClusterUpdateState(ytv1.UpdateStatePossibilityCheck)) + EventuallyYtsaurus(ctx, ytsaurusKey, upgradeTimeout).Should(HaveClusterUpdateState(ytv1.UpdateStatePossibilityCheck)) By("Check that master pod was NOT recreated at the PossibilityCheck stage") time.Sleep(1 * time.Second)