Skip to content

Commit

Permalink
Component Status() can return error (#180)
Browse files Browse the repository at this point in the history
* Component Status can return error

* Fix test
  • Loading branch information
l0kix2 authored Mar 22, 2024
1 parent 64e519e commit 54d2909
Show file tree
Hide file tree
Showing 23 changed files with 192 additions and 159 deletions.
12 changes: 10 additions & 2 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"fmt"
"time"

ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -153,7 +154,11 @@ func NewComponentManager(
return nil, err
}

componentStatus := c.Status(ctx)
componentStatus, err := c.Status(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err)
}

c.SetReadyCondition(componentStatus)
syncStatus := componentStatus.SyncStatus

Expand Down Expand Up @@ -205,7 +210,10 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) {

hasPending := false
for _, c := range cm.allComponents {
status := c.Status(ctx)
status, err := c.Status(ctx)
if err != nil {
return ctrl.Result{Requeue: true}, fmt.Errorf("failed to get status for %s: %w", c.GetName(), err)
}

if status.SyncStatus == components.SyncStatusPending ||
status.SyncStatus == components.SyncStatusUpdating {
Expand Down
3 changes: 2 additions & 1 deletion pkg/components/chyt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"

corev1 "k8s.io/api/core/v1"

"github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
"github.com/ytsaurus/yt-k8s-operator/pkg/labeller"
"github.com/ytsaurus/yt-k8s-operator/pkg/resources"
"github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig"
corev1 "k8s.io/api/core/v1"
)

type Chyt struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func SimpleStatus(status SyncStatus) ComponentStatus {
type Component interface {
Fetch(ctx context.Context) error
Sync(ctx context.Context) error
Status(ctx context.Context) ComponentStatus
Status(ctx context.Context) (ComponentStatus, error)
GetName() string
SetReadyCondition(status ComponentStatus)

Expand Down
15 changes: 7 additions & 8 deletions pkg/components/controller_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu
}
}

if !IsRunningStatus(ca.master.Status(ctx).SyncStatus) {
masterStatus, err := ca.master.Status(ctx)
if err != nil {
return masterStatus, err
}
if !IsRunningStatus(masterStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, ca.master.GetName()), err
}

Expand All @@ -91,13 +95,8 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu
return SimpleStatus(SyncStatusReady), err
}

func (ca *ControllerAgent) Status(ctx context.Context) ComponentStatus {
status, err := ca.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (ca *ControllerAgent) Status(ctx context.Context) (ComponentStatus, error) {
return ca.doSync(ctx, true)
}

func (ca *ControllerAgent) Sync(ctx context.Context) error {
Expand Down
15 changes: 7 additions & 8 deletions pkg/components/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error
}
}

if !IsRunningStatus(n.master.Status(ctx).SyncStatus) {
masterStatus, err := n.master.Status(ctx)
if err != nil {
return masterStatus, err
}
if !IsRunningStatus(masterStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err
}

Expand All @@ -98,13 +102,8 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error
return SimpleStatus(SyncStatusReady), err
}

func (n *DataNode) Status(ctx context.Context) ComponentStatus {
status, err := n.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (n *DataNode) Status(ctx context.Context) (ComponentStatus, error) {
return n.doSync(ctx, true)
}

func (n *DataNode) Sync(ctx context.Context) error {
Expand Down
9 changes: 2 additions & 7 deletions pkg/components/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,8 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
return SimpleStatus(SyncStatusReady), err
}

func (d *Discovery) Status(ctx context.Context) ComponentStatus {
status, err := d.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (d *Discovery) Status(ctx context.Context) (ComponentStatus, error) {
return d.doSync(ctx, true)
}

func (d *Discovery) Sync(ctx context.Context) error {
Expand Down
15 changes: 7 additions & 8 deletions pkg/components/exec_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error
}
}

if !IsRunningStatus(n.master.Status(ctx).SyncStatus) {
masterStatus, err := n.master.Status(ctx)
if err != nil {
return masterStatus, err
}
if !IsRunningStatus(masterStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err
}

Expand All @@ -95,13 +99,8 @@ func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error
return SimpleStatus(SyncStatusReady), err
}

func (n *ExecNode) Status(ctx context.Context) ComponentStatus {
status, err := n.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (n *ExecNode) Status(ctx context.Context) (ComponentStatus, error) {
return n.doSync(ctx, true)
}

func (n *ExecNode) Sync(ctx context.Context) error {
Expand Down
15 changes: 7 additions & 8 deletions pkg/components/httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err
}
}

if !IsRunningStatus(hp.master.Status(ctx).SyncStatus) {
masterStatus, err := hp.master.Status(ctx)
if err != nil {
return masterStatus, err
}
if !IsRunningStatus(masterStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, hp.master.GetName()), err
}

Expand Down Expand Up @@ -144,13 +148,8 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err
return SimpleStatus(SyncStatusReady), err
}

func (hp *HttpProxy) Status(ctx context.Context) ComponentStatus {
status, err := hp.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (hp *HttpProxy) Status(ctx context.Context) (ComponentStatus, error) {
return hp.doSync(ctx, true)
}

func (hp *HttpProxy) Sync(ctx context.Context) error {
Expand Down
9 changes: 2 additions & 7 deletions pkg/components/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,8 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error)
return m.initJob.Sync(ctx, dry)
}

func (m *Master) Status(ctx context.Context) ComponentStatus {
status, err := m.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (m *Master) Status(ctx context.Context) (ComponentStatus, error) {
return m.doSync(ctx, true)
}

func (m *Master) Sync(ctx context.Context) error {
Expand Down
9 changes: 2 additions & 7 deletions pkg/components/master_caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,8 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e
return SimpleStatus(SyncStatusReady), err
}

func (mc *MasterCache) Status(ctx context.Context) ComponentStatus {
status, err := mc.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (mc *MasterCache) Status(ctx context.Context) (ComponentStatus, error) {
return mc.doSync(ctx, true)
}

func (mc *MasterCache) Sync(ctx context.Context) error {
Expand Down
21 changes: 12 additions & 9 deletions pkg/components/query_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,22 @@ func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus,
}

for _, tnd := range qt.tabletNodes {
if !IsRunningStatus(tnd.Status(ctx).SyncStatus) {
tndStatus, err := tnd.Status(ctx)
if err != nil {
return tndStatus, err
}
if !IsRunningStatus(tndStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, "tablet nodes"), err
}
}

var ytClient yt.Client
if qt.ytsaurus.GetClusterState() != ytv1.ClusterStateUpdating {
if qt.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady {
ytClientStatus, err := qt.ytsaurusClient.Status(ctx)
if err != nil {
return ytClientStatus, err
}
if ytClientStatus.SyncStatus != SyncStatusReady {
return WaitingStatus(SyncStatusBlocked, qt.ytsaurusClient.GetName()), err
}

Expand Down Expand Up @@ -344,13 +352,8 @@ func (qt *QueryTracker) init(ctx context.Context, ytClient yt.Client) (err error
return
}

func (qt *QueryTracker) Status(ctx context.Context) ComponentStatus {
status, err := qt.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (qt *QueryTracker) Status(ctx context.Context) (ComponentStatus, error) {
return qt.doSync(ctx, true)
}

func (qt *QueryTracker) Sync(ctx context.Context) error {
Expand Down
27 changes: 17 additions & 10 deletions pkg/components/queue_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er
}
}

if !IsRunningStatus(qa.master.Status(ctx).SyncStatus) {
masterStatus, err := qa.master.Status(ctx)
if err != nil {
return masterStatus, err
}
if !IsRunningStatus(masterStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, qa.master.GetName()), err
}

Expand All @@ -127,7 +131,11 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er
return WaitingStatus(SyncStatusBlocked, "tablet nodes"), fmt.Errorf("cannot initialize queue agent without tablet nodes")
}
for _, tnd := range qa.tabletNodes {
if !IsRunningStatus(tnd.Status(ctx).SyncStatus) {
tndStatus, err := tnd.Status(ctx)
if err != nil {
return tndStatus, err
}
if !IsRunningStatus(tndStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err
}
}
Expand Down Expand Up @@ -157,7 +165,11 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er

var ytClient yt.Client
if qa.ytsaurus.GetClusterState() != ytv1.ClusterStateUpdating {
if qa.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady {
ytClientStatus, err := qa.ytsaurusClient.Status(ctx)
if err != nil {
return ytClientStatus, err
}
if ytClientStatus.SyncStatus != SyncStatusReady {
return WaitingStatus(SyncStatusBlocked, qa.ytsaurusClient.GetName()), err
}

Expand Down Expand Up @@ -298,13 +310,8 @@ func (qa *QueueAgent) prepareInitQueueAgentState() {
container.EnvFrom = []corev1.EnvFromSource{qa.secret.GetEnvSource()}
}

func (qa *QueueAgent) Status(ctx context.Context) ComponentStatus {
status, err := qa.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (qa *QueueAgent) Status(ctx context.Context) (ComponentStatus, error) {
return qa.doSync(ctx, true)
}

func (qa *QueueAgent) Sync(ctx context.Context) error {
Expand Down
15 changes: 7 additions & 8 deletions pkg/components/rpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
}
}

if !IsRunningStatus(rp.master.Status(ctx).SyncStatus) {
masterStatus, err := rp.master.Status(ctx)
if err != nil {
return masterStatus, err
}
if !IsRunningStatus(masterStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, rp.master.GetName()), err
}

Expand Down Expand Up @@ -144,13 +148,8 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
return SimpleStatus(SyncStatusReady), err
}

func (rp *RpcProxy) Status(ctx context.Context) ComponentStatus {
status, err := rp.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (rp *RpcProxy) Status(ctx context.Context) (ComponentStatus, error) {
return rp.doSync(ctx, true)
}

func (rp *RpcProxy) Sync(ctx context.Context) error {
Expand Down
27 changes: 17 additions & 10 deletions pkg/components/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,8 @@ func (s *Scheduler) Fetch(ctx context.Context) error {
)
}

func (s *Scheduler) Status(ctx context.Context) ComponentStatus {
status, err := s.doSync(ctx, true)
if err != nil {
panic(err)
}

return status
func (s *Scheduler) Status(ctx context.Context) (ComponentStatus, error) {
return s.doSync(ctx, true)
}

func (s *Scheduler) Sync(ctx context.Context) error {
Expand Down Expand Up @@ -150,13 +145,21 @@ func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
}
}

if !IsRunningStatus(s.master.Status(ctx).SyncStatus) {
masterStatus, err := s.master.Status(ctx)
if err != nil {
return masterStatus, err
}
if !IsRunningStatus(masterStatus.SyncStatus) {
return WaitingStatus(SyncStatusBlocked, s.master.GetName()), err
}

if s.execNodes == nil || len(s.execNodes) > 0 {
for _, end := range s.execNodes {
if !IsRunningStatus(end.Status(ctx).SyncStatus) {
endStatus, err := end.Status(ctx)
if err != nil {
return endStatus, err
}
if !IsRunningStatus(endStatus.SyncStatus) {
// It makes no sense to start scheduler without exec nodes.
return WaitingStatus(SyncStatusBlocked, end.GetName()), err
}
Expand Down Expand Up @@ -204,7 +207,11 @@ func (s *Scheduler) initOpAchieve(ctx context.Context, dry bool) (ComponentStatu
}

for _, tnd := range s.tabletNodes {
if !IsRunningStatus(tnd.Status(ctx).SyncStatus) {
tndStatus, err := tnd.Status(ctx)
if err != nil {
return tndStatus, err
}
if !IsRunningStatus(tndStatus.SyncStatus) {
// Wait for tablet nodes to proceed with operations archive init.
return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err
}
Expand Down
Loading

0 comments on commit 54d2909

Please sign in to comment.