Skip to content

Commit

Permalink
Switch to dry behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed Mar 22, 2024
1 parent 4f14eaa commit 5dea6a8
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 133 deletions.
22 changes: 16 additions & 6 deletions pkg/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ func NeedSyncStatus(message string) ComponentStatus {
return ComponentStatus{SyncStatus: SyncStatusNeedSync, Message: message}
}

func UpdatingStatus(message string) ComponentStatus {
return ComponentStatus{SyncStatus: SyncStatusUpdating, Message: message}
}

func ReadyStatus() ComponentStatus {
return ComponentStatus{SyncStatus: SyncStatusReady}
}
Expand Down Expand Up @@ -180,7 +176,11 @@ func (c *localServerComponent) runUntilNoErr(
ctx context.Context,
run func(ctx context.Context) error,
onSuccess Condition,
dry bool,
) error {
if dry {
return nil
}
if err := run(ctx); err != nil {
return fmt.Errorf("failed to run %s for cond %s: %w", c.GetName(), onSuccess, err)
}
Expand All @@ -194,11 +194,21 @@ func (c *localServerComponent) runUntilOk(
ctx context.Context,
run func(ctx context.Context) (bool, error),
onSuccess Condition,
dry bool,
) error {
return c.runUntilOkWithCleanup(ctx, run, nil, onSuccess)
return c.runUntilOkWithCleanup(ctx, run, nil, onSuccess, dry)
}

func (c *localServerComponent) runUntilOkWithCleanup(ctx context.Context, run func(ctx context.Context) (bool, error), cleanup func(ctx context.Context) error, onSuccess Condition) error {
func (c *localServerComponent) runUntilOkWithCleanup(
ctx context.Context,
run func(ctx context.Context) (bool, error),
cleanup func(ctx context.Context) error,
onSuccess Condition,
dry bool,
) error {
if dry {
return nil
}
done, err := run(ctx)
if err != nil {
return fmt.Errorf("failed to run %s for cond %s: %w", c.GetName(), onSuccess, err)
Expand Down
105 changes: 40 additions & 65 deletions pkg/components/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"go.ytsaurus.tech/library/go/ptr"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
"github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
"github.com/ytsaurus/yt-k8s-operator/pkg/labeller"
Expand Down Expand Up @@ -62,51 +61,57 @@ func (d *Discovery) Fetch(ctx context.Context) error {
}

func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

if ytv1.IsReadyToUpdateClusterState(d.ytsaurus.GetClusterState()) && d.server.needUpdate() {
return SimpleStatus(SyncStatusNeedLocalUpdate), err
// Initial component creation.
builtStartedCond := buildStarted(d.GetName())
if d.condManager.Is(not(builtStartedCond)) {
return NeedSyncStatus("build is started"), d.runUntilNoErr(ctx, d.server.Sync, builtStartedCond, dry)
}

if d.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
}
builtCond := buildFinished(d.GetName())
if d.condManager.Is(not(builtCond)) {
return NeedSyncStatus("build is not finished"), d.runUntilOk(
ctx,
func(ctx context.Context) (bool, error) {
diff, err := d.server.hasDiff(ctx)
return !diff, err
},
builtCond,
dry,
)
}

if d.NeedSync() {
if !dry {
err = d.server.Sync(ctx)
// Update in case of a diff.
needUpdate, err := d.server.hasDiff(ctx)
if err != nil {
return NeedSyncStatus(""), err
}
updateRequiredCond := updateRequired(d.GetName())
if needUpdate {
if err = d.condManager.SetCond(ctx, updateRequiredCond); err != nil {
return NeedSyncStatus(""), err
}
return WaitingStatus(SyncStatusPending, "components"), err
}

if !d.server.arePodsReady(ctx) {
return WaitingStatus(SyncStatusBlocked, "pods"), err
if d.condManager.Is(updateRequiredCond) {
return NeedSyncStatus(""), d.runUntilOkWithCleanup(
ctx,
func(ctx context.Context) (bool, error) {
return d.handleUpdate(ctx, dry)
},
d.handlePostUpdate,
not(updateRequiredCond),
dry,
)
}

return SimpleStatus(SyncStatusReady), err
return ReadyStatus(), nil
}

func (d *Discovery) Status(ctx context.Context) (ComponentStatus, error) {
if err := d.Fetch(ctx); err != nil {
return ComponentStatus{}, fmt.Errorf("failed to fetch component %s: %w", d.GetName(), err)
}

if d.condManager.Is(not(buildFinished(d.GetName()))) {
return NeedSyncStatus("initial build not yet have finished"), nil
}

needUpdate, err := d.server.hasDiff(ctx)
if err != nil {
return ComponentStatus{}, err
}

if needUpdate || d.condManager.Is(updateRequired(d.GetName())) {
return NeedSyncStatus("component needs update"), nil
}

return ReadyStatus(), nil
return d.doSync(ctx, true)
}

func (d *Discovery) StatusOld(ctx context.Context) ComponentStatus {
Expand All @@ -118,44 +123,14 @@ func (d *Discovery) StatusOld(ctx context.Context) ComponentStatus {
}

func (d *Discovery) Sync(ctx context.Context) error {
srv := d.server.(*serverImpl)

// Initial component creation.
builtStartedCond := buildStarted(d.GetName())
if d.condManager.Is(not(builtStartedCond)) {
return d.runUntilNoErr(ctx, d.server.Sync, builtStartedCond)
}

builtCond := buildFinished(d.GetName())
if d.condManager.Is(not(builtCond)) {
return d.runUntilOk(ctx, func(ctx context.Context) (bool, error) {
diff, err := d.server.hasDiff(ctx)
return !diff, err
}, builtCond)
}

// Update in case of a diff.
needUpdate, err := srv.hasDiff(ctx)
if err != nil {
return err
}
updateRequiredCond := updateRequired(d.GetName())
if needUpdate {
if err = d.condManager.SetCond(ctx, updateRequiredCond); err != nil {
return err
}
}
if d.condManager.Is(updateRequiredCond) {
return d.runUntilOkWithCleanup(ctx, d.handleUpdate, d.handlePostUpdate, not(updateRequiredCond))
}

return nil
_, err := d.doSync(ctx, false)
return err
}

func (d *Discovery) handleUpdate(ctx context.Context) (bool, error) {
func (d *Discovery) handleUpdate(ctx context.Context, dry bool) (bool, error) {
podsWereRemoved := podsRemoved(d.GetName())
if d.condManager.Is(not(podsWereRemoved)) {
return false, d.runUntilNoErr(ctx, d.server.removePods, podsWereRemoved)
return false, d.runUntilNoErr(ctx, d.server.removePods, podsWereRemoved, dry)
}
return true, nil
}
Expand Down
131 changes: 69 additions & 62 deletions pkg/components/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,88 +340,95 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error)
return m.initJob.Sync(ctx, dry)
}

func (m *Master) Status(ctx context.Context) (ComponentStatus, error) {
if err := m.Fetch(ctx); err != nil {
return ComponentStatus{}, fmt.Errorf("failed to fetch component %s: %w", m.GetName(), err)
}

if m.condManager.Is(not(buildFinished(m.GetName()))) {
return NeedSyncStatus("initial build not yet have finished"), nil
}

needUpdate, err := m.server.hasDiff(ctx)
if err != nil {
return ComponentStatus{}, err
}

if needUpdate || m.condManager.Is(updateRequired(m.GetName())) {
return NeedSyncStatus("component needs update"), nil
}

return ReadyStatus(), nil
}

func (m *Master) StatusOld(ctx context.Context) ComponentStatus {
status, err := m.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
}

func (m *Master) Sync(ctx context.Context) error {
func (m *Master) doSync2(ctx context.Context, dry bool) (ComponentStatus, error) {
// 1. Initial component creation.
builtStartedCond := buildStarted(m.GetName())
if m.condManager.Is(not(builtStartedCond)) {
return m.runUntilNoErr(ctx, m.server.Sync, builtStartedCond)
return NeedSyncStatus("build not started"),
m.runUntilNoErr(ctx, m.server.Sync, builtStartedCond, dry)
}

builtCond := buildFinished(m.GetName())
if m.condManager.Is(not(builtCond)) {
return m.runUntilOk(ctx, func(ctx context.Context) (bool, error) {
diff, err := m.server.hasDiff(ctx)
return !diff, err
}, builtCond)
return NeedSyncStatus("not built yet"),
m.runUntilOk(
ctx,
func(ctx context.Context) (bool, error) {
diff, err := m.server.hasDiff(ctx)
return !diff, err
},
builtCond,
dry,
)
}

// 2. Initialization once in a lifetime of the main component
initCond := initializationFinished(m.GetName())
if m.condManager.Is(not(initCond)) {
return m.runUntilOk(ctx, m.handleInitialization, initCond)
return NeedSyncStatus("init not done"),
m.runUntilOk(ctx, m.handleInitialization, initCond, dry)
}

// 3. Update in case of a diff until full component update is completed.
needUpdate, err := m.server.hasDiff(ctx)
if err != nil {
return err
return NeedSyncStatus(""), err
}
updateRequiredCond := updateRequired(m.GetName())
if needUpdate {
if err = m.condManager.SetCond(ctx, updateRequiredCond); err != nil {
return err
return NeedSyncStatus(""), err
}
}
if m.condManager.Is(updateRequiredCond) {
if m.condManager.Is(not(masterUpdatePossibleCond)) {
return m.runUntilOk(ctx, func(ctx context.Context) (bool, error) {
// TODO: put message in the condition reason
ok, _, err := m.ytClient.HandlePossibilityCheck(ctx)
if err != nil {
return false, err
}
return ok, nil
}, masterUpdatePossibleCond)
return NeedSyncStatus(""),
m.runUntilOk(ctx,
func(ctx context.Context) (bool, error) {
// TODO: put message in the condition reason
ok, _, err := m.ytClient.HandlePossibilityCheck(ctx)
if err != nil {
return false, err
}
return ok, nil
},
masterUpdatePossibleCond,
dry,
)
}
return m.runUntilOkWithCleanup(
ctx,
m.handleUpdate,
m.cleanupAfterUpdate,
not(updateRequiredCond),
)
return NeedSyncStatus(""),
m.runUntilOkWithCleanup(
ctx,
func(ctx context.Context) (bool, error) {
return m.handleUpdate(ctx, dry)
},
m.cleanupAfterUpdate,
not(updateRequiredCond),
dry,
)
}
return ReadyStatus(), nil
}

return nil
func (m *Master) Status(ctx context.Context) (ComponentStatus, error) {
if err := m.Fetch(ctx); err != nil {
return ComponentStatus{}, fmt.Errorf("failed to fetch component %s: %w", m.GetName(), err)
}
return m.doSync2(ctx, true)
}

func (m *Master) StatusOld(ctx context.Context) ComponentStatus {
status, err := m.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
}

func (m *Master) Sync(ctx context.Context) error {
_, err := m.doSync2(ctx, false)
return err
}

func (m *Master) handleInitialization(ctx context.Context) (bool, error) {
Expand All @@ -430,11 +437,11 @@ func (m *Master) handleInitialization(ctx context.Context) (bool, error) {
return st.SyncStatus == SyncStatusReady, err
}

func (m *Master) handleUpdate(ctx context.Context) (bool, error) {
func (m *Master) handleUpdate(ctx context.Context, dry bool) (bool, error) {
if m.condManager.Is(not(masterSafeModeEnabledCond)) {
return false, m.runUntilNoErr(ctx, func(ctx context.Context) error {
return m.ytClient.EnableSafeMode(ctx)
}, masterSafeModeEnabledCond)
}, masterSafeModeEnabledCond, dry)
}
if m.condManager.Is(not(masterSnapshotsBuildStartedCond)) {
return false, m.runUntilNoErr(ctx, func(ctx context.Context) error {
Expand All @@ -447,34 +454,34 @@ func (m *Master) handleUpdate(ctx context.Context) (bool, error) {
}

return m.ytClient.StartBuildMasterSnapshots(ctx, monitoringPaths)
}, masterSnapshotsBuildStartedCond)
}, masterSnapshotsBuildStartedCond, dry)
}
if m.condManager.Is(not(masterSnapshotsBuildFinishedCond)) {
return false, m.runUntilOk(ctx, func(ctx context.Context) (bool, error) {
paths := m.getStoredMasterMonitoringPaths()
return m.ytClient.AreMasterSnapshotsBuilt(ctx, paths)
}, masterSnapshotsBuildFinishedCond)
}, masterSnapshotsBuildFinishedCond, dry)
}
podsWereRemoved := podsRemoved(m.GetName())
if m.condManager.Is(not(podsWereRemoved)) {
return false, m.runUntilNoErr(ctx, m.server.removePods, podsWereRemoved)
return false, m.runUntilNoErr(ctx, m.server.removePods, podsWereRemoved, dry)
}
if m.condManager.Is(not(masterExitReadOnlyPrepared)) {
return false, m.runUntilNoErr(ctx, func(ctx context.Context) error {
return m.exitReadOnlyJob.prepareRestart(ctx, false)
}, masterExitReadOnlyPrepared)
}, masterExitReadOnlyPrepared, dry)
}
if m.condManager.Is(not(masterExitReadOnlyFinished)) {
return false, m.runUntilOk(ctx, func(ctx context.Context) (done bool, err error) {
m.exitReadOnlyJob.SetInitScript(m.createInitScript())
st, err := m.exitReadOnlyJob.Sync(ctx, false)
return st.SyncStatus == SyncStatusReady, err
}, masterExitReadOnlyFinished)
}, masterExitReadOnlyFinished, dry)
}
if m.condManager.Is(not(masterSafeModeDisabledCond)) {
return false, m.runUntilNoErr(ctx, func(ctx context.Context) error {
return m.ytClient.DisableSafeMode(ctx)
}, masterSafeModeDisabledCond)
}, masterSafeModeDisabledCond, dry)
}
return true, nil
}
Expand Down

0 comments on commit 5dea6a8

Please sign in to comment.