From e0d17425aeba03e76deb7bcdb27a053f7ceb0f8d Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 24 Oct 2023 20:33:10 -0600 Subject: [PATCH 1/3] simplify status and perform in correct order --- controllers/cosmosfullnode_controller.go | 17 +- internal/cosmos/cache_controller.go | 5 - internal/cosmos/status_collection.go | 44 --- internal/fullnode/pod_control.go | 81 +++-- internal/fullnode/pod_control_test.go | 369 +++++++++++------------ 5 files changed, 236 insertions(+), 280 deletions(-) diff --git a/controllers/cosmosfullnode_controller.go b/controllers/cosmosfullnode_controller.go index 221cecfe..aec64940 100644 --- a/controllers/cosmosfullnode_controller.go +++ b/controllers/cosmosfullnode_controller.go @@ -71,7 +71,7 @@ func NewFullNode( configMapControl: fullnode.NewConfigMapControl(client), nodeKeyControl: fullnode.NewNodeKeyControl(client), peerCollector: fullnode.NewPeerCollector(client), - podControl: fullnode.NewPodControl(client, cacheController), + podControl: fullnode.NewPodControl(client), pvcControl: fullnode.NewPVCControl(client), recorder: recorder, serviceControl: fullnode.NewServiceControl(client), @@ -118,7 +118,10 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque reporter := kube.NewEventReporter(logger, r.recorder, crd) fullnode.ResetStatus(crd) - defer r.updateStatus(ctx, crd) + + syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController) + + defer r.updateStatus(ctx, crd, &syncInfo) errs := &kube.ReconcileErrors{} @@ -169,7 +172,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Reconcile pods. - podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums) + podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, &syncInfo) if err != nil { errs.Append(err) } @@ -218,16 +221,14 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e return stopResult, err } -func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) { - consensus := fullnode.SyncInfoStatus(ctx, crd, r.cacheController) - +func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo *cosmosv1.SyncInfoStatus) { if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) { status.ObservedGeneration = crd.Status.ObservedGeneration status.Phase = crd.Status.Phase status.StatusMessage = crd.Status.StatusMessage status.Peers = crd.Status.Peers - status.SyncInfo = &consensus - for _, v := range consensus.Pods { + status.SyncInfo = syncInfo + for _, v := range syncInfo.Pods { if v.Height != nil && *v.Height > 0 { if status.Height == nil { status.Height = make(map[string]uint64) diff --git a/internal/cosmos/cache_controller.go b/internal/cosmos/cache_controller.go index f3577dd7..a4b71d92 100644 --- a/internal/cosmos/cache_controller.go +++ b/internal/cosmos/cache_controller.go @@ -168,11 +168,6 @@ func (c *CacheController) SyncedPods(ctx context.Context, controller client.Obje return kube.AvailablePods(c.Collect(ctx, controller).SyncedPods(), 5*time.Second, time.Now()) } -// PodsWithStatus returns all pods with their status. -func (c *CacheController) PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []PodStatus { - return c.Collect(ctx, client.ObjectKeyFromObject(crd)).PodsWithStatus(crd) -} - func (c *CacheController) listPods(ctx context.Context, controller client.ObjectKey) ([]corev1.Pod, error) { var pods corev1.PodList if err := c.client.List(ctx, &pods, diff --git a/internal/cosmos/status_collection.go b/internal/cosmos/status_collection.go index 672df04b..aefa018f 100644 --- a/internal/cosmos/status_collection.go +++ b/internal/cosmos/status_collection.go @@ -6,7 +6,6 @@ import ( "time" "github.com/samber/lo" - cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/kube" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -114,46 +113,3 @@ func (coll StatusCollection) Synced() StatusCollection { func (coll StatusCollection) SyncedPods() []*corev1.Pod { return lo.Map(coll.Synced(), func(status StatusItem, _ int) *corev1.Pod { return status.GetPod() }) } - -// PodStatus is the status of a pod. -type PodStatus struct { - Pod *corev1.Pod - RPCReachable bool - Synced bool - AwaitingUpgrade bool -} - -// PodsWithStatus returns all pods with their status. -func (coll StatusCollection) PodsWithStatus(crd *cosmosv1.CosmosFullNode) []PodStatus { - out := make([]PodStatus, len(coll)) - for i, status := range coll { - ps := PodStatus{ - Pod: status.GetPod(), - } - if crd.Spec.ChainSpec.Versions != nil { - instanceHeight := uint64(0) - if height, ok := crd.Status.Height[status.Pod.Name]; ok { - instanceHeight = height - } - var image string - for _, version := range crd.Spec.ChainSpec.Versions { - if instanceHeight < version.UpgradeHeight { - break - } - image = version.Image - } - if image != "" && status.Pod.Spec.Containers[0].Image != image { - ps.AwaitingUpgrade = true - } - } - if status.Err == nil { - ps.RPCReachable = true - if !status.Status.Result.SyncInfo.CatchingUp { - ps.Synced = true - } - } - - out[i] = ps - } - return out -} diff --git a/internal/fullnode/pod_control.go b/internal/fullnode/pod_control.go index 789b2208..28a81bd9 100644 --- a/internal/fullnode/pod_control.go +++ b/internal/fullnode/pod_control.go @@ -5,7 +5,6 @@ import ( "fmt" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" - "github.com/strangelove-ventures/cosmos-operator/internal/cosmos" "github.com/strangelove-ventures/cosmos-operator/internal/diff" "github.com/strangelove-ventures/cosmos-operator/internal/kube" corev1 "k8s.io/api/core/v1" @@ -16,10 +15,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type PodFilter interface { - PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus -} - // Client is a controller client. It is a subset of client.Client. type Client interface { client.Reader @@ -31,22 +26,26 @@ type Client interface { // PodControl reconciles pods for a CosmosFullNode. type PodControl struct { client Client - podFilter PodFilter computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int } // NewPodControl returns a valid PodControl. -func NewPodControl(client Client, filter PodFilter) PodControl { +func NewPodControl(client Client) PodControl { return PodControl{ client: client, - podFilter: filter, computeRollout: kube.ComputeRollout, } } // Reconcile is the control loop for pods. The bool return value, if true, indicates the controller should requeue // the request. -func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd *cosmosv1.CosmosFullNode, cksums ConfigChecksums) (bool, kube.ReconcileError) { +func (pc PodControl) Reconcile( + ctx context.Context, + reporter kube.Reporter, + crd *cosmosv1.CosmosFullNode, + cksums ConfigChecksums, + syncInfo *cosmosv1.SyncInfoStatus, +) (bool, kube.ReconcileError) { var pods corev1.PodList if err := pc.client.List(ctx, &pods, client.InNamespace(crd.Namespace), @@ -62,7 +61,7 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd diffed := diff.New(ptrSlice(pods.Items), wantPods) for _, pod := range diffed.Creates() { - reporter.Info("Creating pod", "pod", pod.Name) + reporter.Info("Creating pod", "name", pod.Name) if err := ctrl.SetControllerReference(crd, pod, pc.client.Scheme()); err != nil { return true, kube.TransientError(fmt.Errorf("set controller reference on pod %q: %w", pod.Name, err)) } @@ -72,7 +71,7 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd } for _, pod := range diffed.Deletes() { - reporter.Info("Deleting pod", "pod", pod.Name) + reporter.Info("Deleting pod", "name", pod.Name) if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); kube.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("delete pod %q: %w", pod.Name, err)) } @@ -86,37 +85,54 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd diffedUpdates := diffed.Updates() if len(diffedUpdates) > 0 { var ( - podsWithStatus = pc.podFilter.PodsWithStatus(ctx, crd) upgradePods = make(map[string]bool) otherUpdates = make(map[string]*corev1.Pod) - rpcReachablePods = make(map[string]bool) - inSyncPods = make(map[string]bool) - deletedPods = make(map[string]bool) + rpcReachablePods = 0 + inSyncPods = 0 ) - for _, ps := range podsWithStatus { - if ps.Synced { - inSyncPods[ps.Pod.Name] = true + PodsLoop: + for _, ps := range syncInfo.Pods { + for _, existing := range pods.Items { + if existing.Name == ps.Pod { + if existing.DeletionTimestamp != nil { + continue PodsLoop + } + break + } + } + + if ps.InSync != nil && *ps.InSync { + inSyncPods++ } - if ps.RPCReachable { - rpcReachablePods[ps.Pod.Name] = true + rpcReachable := ps.Error == nil + if rpcReachable { + rpcReachablePods++ } for _, update := range diffedUpdates { - if ps.Pod.Name == update.Name { - if ps.AwaitingUpgrade { - if !ps.RPCReachable { - upgradePods[ps.Pod.Name] = true - reporter.Info("Deleting pod for version upgrade", "podName", ps.Pod.Name) + if ps.Pod == update.Name { + awaitingUpgrade := false + for _, existing := range pods.Items { + if existing.Name == ps.Pod { + if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image { + awaitingUpgrade = true + } + break + } + } + if awaitingUpgrade { + if !rpcReachable { + upgradePods[ps.Pod] = true + reporter.Info("Deleting pod for version upgrade", "name", ps.Pod) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. - if err := pc.client.Delete(ctx, ps.Pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { - return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod.Name, err)) + if err := pc.client.Delete(ctx, update, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { + return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod, err)) } - deletedPods[ps.Pod.Name] = true } else { - otherUpdates[ps.Pod.Name] = ps.Pod + otherUpdates[ps.Pod] = update } } else { - otherUpdates[ps.Pod.Name] = ps.Pod + otherUpdates[ps.Pod] = update } break } @@ -125,9 +141,9 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd // If we don't have any pods in sync, we are down anyways, so we can use the number of RPC reachable pods for computing the rollout, // with the goal of recovering the pods as quickly as possible. - ready := len(inSyncPods) + ready := inSyncPods if ready == 0 { - ready = len(rpcReachablePods) + ready = rpcReachablePods } numUpdates := pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), ready) @@ -150,7 +166,6 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("update pod %q: %w", podName, err)) } - deletedPods[podName] = true updated++ if updated >= numUpdates { // done for this round diff --git a/internal/fullnode/pod_control_test.go b/internal/fullnode/pod_control_test.go index 41480a69..6d9a1c53 100644 --- a/internal/fullnode/pod_control_test.go +++ b/internal/fullnode/pod_control_test.go @@ -5,9 +5,7 @@ import ( "fmt" "testing" - "github.com/samber/lo" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" - "github.com/strangelove-ventures/cosmos-operator/internal/cosmos" "github.com/strangelove-ventures/cosmos-operator/internal/diff" "github.com/strangelove-ventures/cosmos-operator/internal/kube" "github.com/stretchr/testify/require" @@ -17,19 +15,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type mockPodFilter func(ctx context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus - -func (fn mockPodFilter) PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - if ctx == nil { - panic("nil context") - } - return fn(ctx, crd) -} - -var panicPodFilter = mockPodFilter(func(context.Context, *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - panic("SyncedPods should not be called") -}) - func TestPodControl_Reconcile(t *testing.T) { t.Parallel() @@ -53,8 +38,17 @@ func TestPodControl_Reconcile(t *testing.T) { Items: []corev1.Pod{*existing}, } - control := NewPodControl(&mClient, panicPodFilter) - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + syncInfo := &cosmosv1.SyncInfoStatus{ + Pods: []cosmosv1.SyncInfoPodStatus{ + { + Pod: "hub-0", + InSync: ptr(true), + }, + }, + } + + control := NewPodControl(&mClient) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) require.False(t, requeue) @@ -82,8 +76,8 @@ func TestPodControl_Reconcile(t *testing.T) { }, } - control := NewPodControl(&mClient, panicPodFilter) - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + control := NewPodControl(&mClient) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, nil) require.NoError(t, err) require.True(t, requeue) @@ -109,72 +103,79 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), + mClient := mockPodClient{ + ObjectList: corev1.PodList{ + Items: valueSlice(existing), + }, + } + + syncInfo := &cosmosv1.SyncInfoStatus{ + Pods: []cosmosv1.SyncInfoPodStatus{ + { + Pod: "hub-0", + InSync: ptr(true), + }, + { + Pod: "hub-1", + InSync: ptr(true), + }, + { + Pod: "hub-2", + InSync: ptr(true), + }, + { + Pod: "hub-3", + InSync: ptr(true), + }, + { + Pod: "hub-4", + InSync: ptr(true), + }, + }, } - var didFilter bool - podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - return cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - }) - }) - - control := NewPodControl(&mClient, podFilter) - const stubRollout = 5 + control := NewPodControl(&mClient) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) - require.Equal(t, stubRollout, ready) // mockPodFilter only returns 1 candidate as ready + require.Equal(t, 5, ready) // mockPodFilter only returns 1 candidate as ready return kube.ComputeRollout(maxUnavail, desired, ready) } // Trigger updates crd.Spec.PodTemplate.Image = "new-image" - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) require.Equal(t, 2, mClient.DeleteCount) - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i < 2 { - ps.RPCReachable = false - ps.Synced = false - } - return ps - }) - }) - - control = NewPodControl(&mClient, podFilter) - - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + existing[0].Spec.Containers[0].Image = "new-image" + existing[1].Spec.Containers[0].Image = "new-image" + + recalculatePodRevision(existing[0], 0) + recalculatePodRevision(existing[1], 1) + mClient.ObjectList = corev1.PodList{ + Items: valueSlice(existing), + } + + syncInfo.Pods[0].InSync = nil + syncInfo.Pods[0].Error = ptr("upgrade in progress") + + syncInfo.Pods[1].InSync = nil + syncInfo.Pods[1].Error = ptr("upgrade in progress") + + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) // mockPodFilter only returns 1 candidate as ready + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) // should not delete any more yet. @@ -206,28 +207,44 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), + mClient := mockPodClient{ + ObjectList: corev1.PodList{ + Items: valueSlice(existing), + }, + } + + // pods are at upgrade height and reachable + syncInfo := &cosmosv1.SyncInfoStatus{ + Pods: []cosmosv1.SyncInfoPodStatus{ + { + Pod: "hub-0", + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + { + Pod: "hub-1", + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + { + Pod: "hub-2", + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + { + Pod: "hub-3", + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + { + Pod: "hub-4", + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + }, } - var didFilter bool - podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - return cosmos.PodStatus{ - Pod: pod, - // pods are at or above upgrade height and not reachable - AwaitingUpgrade: true, - RPCReachable: true, - Synced: false, - } - }) - }) - - control := NewPodControl(&mClient, podFilter) + control := NewPodControl(&mClient) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -242,14 +259,12 @@ func TestPodControl_Reconcile(t *testing.T) { // Reconcile 1, should update 0 and 1 - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // only handled 2 updates, so should requeue. require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) require.Equal(t, 2, mClient.DeleteCount) @@ -262,82 +277,51 @@ func TestPodControl_Reconcile(t *testing.T) { Items: valueSlice(existing), } - // 2 are now unavailable, working on upgrade - - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i < 2 { - ps.RPCReachable = false - ps.Synced = false - } else { - ps.AwaitingUpgrade = true - } - return ps - }) - }) - - control = NewPodControl(&mClient, podFilter) + // 0 and 1 are now unavailable, working on upgrade + syncInfo.Pods[0].InSync = nil + syncInfo.Pods[0].Error = ptr("upgrade in progress") + + syncInfo.Pods[1].InSync = nil + syncInfo.Pods[1].Error = ptr("upgrade in progress") // Reconcile 2, should not update anything because 0 and 1 are still in progress. - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // no further updates yet, should requeue. require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) // should not delete any more yet. require.Equal(t, 2, mClient.DeleteCount) // mock out that one of the pods completed the upgrade. should begin upgrading one more + syncInfo.Pods[0].InSync = ptr(true) + syncInfo.Pods[0].Height = ptr(uint64(101)) + syncInfo.Pods[0].Error = nil + + // Reconcile 3, should update pod 2 (only one) because 1 is still in progress, but 0 is done. + + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 4, ready) + return kube.ComputeRollout(maxUnavail, desired, ready) + } - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i == 1 { - ps.RPCReachable = false - ps.Synced = false - } - if i >= 2 { - ps.AwaitingUpgrade = true - } - return ps - }) - }) - - control = NewPodControl(&mClient, podFilter) - - // Reconcile 3, should update 2 (only one) because 1 is still in progress, but 0 is done. - - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // only handled 1 updates, so should requeue. require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) // should delete one more @@ -350,37 +334,28 @@ func TestPodControl_Reconcile(t *testing.T) { } // mock out that both pods completed the upgrade. should begin upgrading the last 2 + syncInfo.Pods[1].InSync = ptr(true) + syncInfo.Pods[1].Height = ptr(uint64(101)) + syncInfo.Pods[1].Error = nil - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i >= 3 { - ps.AwaitingUpgrade = true - } - return ps - }) - }) - - control = NewPodControl(&mClient, podFilter) + syncInfo.Pods[2].InSync = ptr(true) + syncInfo.Pods[2].Height = ptr(uint64(101)) + syncInfo.Pods[2].Error = nil // Reconcile 4, should update 3 and 4 because the rest are done. - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 5, ready) + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // all updates are now handled, no longer need requeue. require.False(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) // should delete the last 2 @@ -413,28 +388,44 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), + mClient := mockPodClient{ + ObjectList: corev1.PodList{ + Items: valueSlice(existing), + }, } - var didFilter bool - podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - return cosmos.PodStatus{ - Pod: pod, - // pods are at or above upgrade height and not reachable - AwaitingUpgrade: true, - RPCReachable: false, - Synced: false, - } - }) - }) - - control := NewPodControl(&mClient, podFilter) + // pods are at upgrade height and reachable + syncInfo := &cosmosv1.SyncInfoStatus{ + Pods: []cosmosv1.SyncInfoPodStatus{ + { + Pod: "hub-0", + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + { + Pod: "hub-1", + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + { + Pod: "hub-2", + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + { + Pod: "hub-3", + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + { + Pod: "hub-4", + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + }, + } + + control := NewPodControl(&mClient) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -447,14 +438,12 @@ func TestPodControl_Reconcile(t *testing.T) { crd.Status.Height[pod.Name] = 100 } - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // all updates are handled, so should not requeue require.False(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) require.Equal(t, 5, mClient.DeleteCount) }) From 9f5ee71b00d2688fb47bf7482d5d6b0d750385bb Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 24 Oct 2023 22:01:55 -0600 Subject: [PATCH 2/3] add tests for deletion timestamps --- internal/fullnode/pod_control_test.go | 75 ++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/internal/fullnode/pod_control_test.go b/internal/fullnode/pod_control_test.go index 6d9a1c53..bafa123d 100644 --- a/internal/fullnode/pod_control_test.go +++ b/internal/fullnode/pod_control_test.go @@ -149,10 +149,35 @@ func TestPodControl_Reconcile(t *testing.T) { require.True(t, requeue) require.Zero(t, mClient.CreateCount) + + now := metav1.Now() + existing[0].DeletionTimestamp = ptr(now) + existing[1].DeletionTimestamp = ptr(now) + + mClient.ObjectList = corev1.PodList{ + Items: valueSlice(existing), + } + + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) // only 3 should be marked ready because 2 are in the deleting state. + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) + require.NoError(t, err) + + require.True(t, requeue) + + // pod status has not changed, but 0 and 1 are now in deleting state. + // should not delete any more. require.Equal(t, 2, mClient.DeleteCount) + // once pod deletion is complete, new pods are created with new image. existing[0].Spec.Containers[0].Image = "new-image" existing[1].Spec.Containers[0].Image = "new-image" + existing[0].DeletionTimestamp = nil + existing[1].DeletionTimestamp = nil recalculatePodRevision(existing[0], 0) recalculatePodRevision(existing[1], 1) @@ -168,7 +193,7 @@ func TestPodControl_Reconcile(t *testing.T) { control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) - require.Equal(t, 3, ready) // mockPodFilter only returns 1 candidate as ready + require.Equal(t, 3, ready) return kube.ComputeRollout(maxUnavail, desired, ready) } @@ -268,8 +293,33 @@ func TestPodControl_Reconcile(t *testing.T) { require.Zero(t, mClient.CreateCount) require.Equal(t, 2, mClient.DeleteCount) + now := metav1.Now() + existing[0].DeletionTimestamp = ptr(now) + existing[1].DeletionTimestamp = ptr(now) + + mClient.ObjectList = corev1.PodList{ + Items: valueSlice(existing), + } + + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) // only 3 should be marked ready because 2 are in the deleting state. + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) + require.NoError(t, err) + + require.True(t, requeue) + + // pod status has not changed, but 0 and 1 are now in deleting state. + // should not delete any more. + require.Equal(t, 2, mClient.DeleteCount) + existing[0].Spec.Containers[0].Image = "new-image" existing[1].Spec.Containers[0].Image = "new-image" + existing[0].DeletionTimestamp = nil + existing[1].DeletionTimestamp = nil recalculatePodRevision(existing[0], 0) recalculatePodRevision(existing[1], 1) @@ -327,7 +377,30 @@ func TestPodControl_Reconcile(t *testing.T) { // should delete one more require.Equal(t, 3, mClient.DeleteCount) + now = metav1.Now() + existing[2].DeletionTimestamp = ptr(now) + + mClient.ObjectList = corev1.PodList{ + Items: valueSlice(existing), + } + + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) // only 3 should be marked ready because 2 is in the deleting state and 1 is still in progress upgrading. + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) + require.NoError(t, err) + + require.True(t, requeue) + + // pod status has not changed, but 2 is now in deleting state. + // should not delete any more. + require.Equal(t, 3, mClient.DeleteCount) + existing[2].Spec.Containers[0].Image = "new-image" + existing[2].DeletionTimestamp = nil recalculatePodRevision(existing[2], 2) mClient.ObjectList = corev1.PodList{ Items: valueSlice(existing), From f81358f6590bd3e878242f61d2b556ea95ec87f6 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Wed, 25 Oct 2023 01:29:54 -0600 Subject: [PATCH 3/3] cleanup tests and add cache invalidation --- api/v1/cosmosfullnode_types.go | 9 +- api/v1/zz_generated.deepcopy.go | 36 +- .../cosmos.strange.love_cosmosfullnodes.yaml | 53 ++- controllers/cosmosfullnode_controller.go | 12 +- internal/cosmos/cache_controller.go | 16 + internal/fullnode/pod_control.go | 112 +++--- internal/fullnode/pod_control_test.go | 335 ++++++++---------- internal/fullnode/status.go | 16 +- internal/fullnode/status_test.go | 32 +- 9 files changed, 295 insertions(+), 326 deletions(-) diff --git a/api/v1/cosmosfullnode_types.go b/api/v1/cosmosfullnode_types.go index d536390a..062c4313 100644 --- a/api/v1/cosmosfullnode_types.go +++ b/api/v1/cosmosfullnode_types.go @@ -138,21 +138,14 @@ type FullNodeStatus struct { // Current sync information. Collected every 60s. // +optional - SyncInfo *SyncInfoStatus `json:"syncInfo,omitempty"` + SyncInfo map[string]*SyncInfoPodStatus `json:"sync,omitempty"` // Latest Height information. collected when node starts up and when RPC is successfully queried. // +optional Height map[string]uint64 `json:"height,omitempty"` } -type SyncInfoStatus struct { - // The latest consensus state of pods. - Pods []SyncInfoPodStatus `json:"pods"` -} - type SyncInfoPodStatus struct { - // Pod's name. - Pod string `json:"pod"` // When consensus information was fetched. Timestamp metav1.Time `json:"timestamp"` // Latest height if no error encountered. diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b528590a..b7e88e0a 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -320,8 +320,18 @@ func (in *FullNodeStatus) DeepCopyInto(out *FullNodeStatus) { } if in.SyncInfo != nil { in, out := &in.SyncInfo, &out.SyncInfo - *out = new(SyncInfoStatus) - (*in).DeepCopyInto(*out) + *out = make(map[string]*SyncInfoPodStatus, len(*in)) + for key, val := range *in { + var outVal *SyncInfoPodStatus + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(SyncInfoPodStatus) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } } if in.Height != nil { in, out := &in.Height, &out.Height @@ -762,25 +772,3 @@ func (in *SyncInfoPodStatus) DeepCopy() *SyncInfoPodStatus { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SyncInfoStatus) DeepCopyInto(out *SyncInfoStatus) { - *out = *in - if in.Pods != nil { - in, out := &in.Pods, &out.Pods - *out = make([]SyncInfoPodStatus, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SyncInfoStatus. -func (in *SyncInfoStatus) DeepCopy() *SyncInfoStatus { - if in == nil { - return nil - } - out := new(SyncInfoStatus) - in.DeepCopyInto(out) - return out -} diff --git a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml index 1b340d34..a76cf9d2 100644 --- a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml +++ b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml @@ -6009,39 +6009,28 @@ spec: status: description: A generic message for the user. May contain errors. type: string - syncInfo: + sync: + additionalProperties: + properties: + error: + description: Error message if unable to fetch consensus state. + type: string + height: + description: Latest height if no error encountered. + format: int64 + type: integer + inSync: + description: If the pod reports itself as in sync with chain + tip. + type: boolean + timestamp: + description: When consensus information was fetched. + format: date-time + type: string + required: + - timestamp + type: object description: Current sync information. Collected every 60s. - properties: - pods: - description: The latest consensus state of pods. - items: - properties: - error: - description: Error message if unable to fetch consensus - state. - type: string - height: - description: Latest height if no error encountered. - format: int64 - type: integer - inSync: - description: If the pod reports itself as in sync with chain - tip. - type: boolean - pod: - description: Pod's name. - type: string - timestamp: - description: When consensus information was fetched. - format: date-time - type: string - required: - - pod - - timestamp - type: object - type: array - required: - - pods type: object required: - observedGeneration diff --git a/controllers/cosmosfullnode_controller.go b/controllers/cosmosfullnode_controller.go index aec64940..f60d370a 100644 --- a/controllers/cosmosfullnode_controller.go +++ b/controllers/cosmosfullnode_controller.go @@ -71,7 +71,7 @@ func NewFullNode( configMapControl: fullnode.NewConfigMapControl(client), nodeKeyControl: fullnode.NewNodeKeyControl(client), peerCollector: fullnode.NewPeerCollector(client), - podControl: fullnode.NewPodControl(client), + podControl: fullnode.NewPodControl(client, cacheController), pvcControl: fullnode.NewPVCControl(client), recorder: recorder, serviceControl: fullnode.NewServiceControl(client), @@ -121,7 +121,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController) - defer r.updateStatus(ctx, crd, &syncInfo) + defer r.updateStatus(ctx, crd, syncInfo) errs := &kube.ReconcileErrors{} @@ -172,7 +172,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Reconcile pods. - podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, &syncInfo) + podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, syncInfo) if err != nil { errs.Append(err) } @@ -221,19 +221,19 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e return stopResult, err } -func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo *cosmosv1.SyncInfoStatus) { +func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo map[string]*cosmosv1.SyncInfoPodStatus) { if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) { status.ObservedGeneration = crd.Status.ObservedGeneration status.Phase = crd.Status.Phase status.StatusMessage = crd.Status.StatusMessage status.Peers = crd.Status.Peers status.SyncInfo = syncInfo - for _, v := range syncInfo.Pods { + for k, v := range syncInfo { if v.Height != nil && *v.Height > 0 { if status.Height == nil { status.Height = make(map[string]uint64) } - status.Height[v.Pod] = *v.Height + status.Height[k] = *v.Height } } }); err != nil { diff --git a/internal/cosmos/cache_controller.go b/internal/cosmos/cache_controller.go index a4b71d92..9db49969 100644 --- a/internal/cosmos/cache_controller.go +++ b/internal/cosmos/cache_controller.go @@ -149,6 +149,22 @@ func (c *CacheController) Reconcile(ctx context.Context, req reconcile.Request) return finishResult, nil } +// Invalidate removes the given pods status from the cache. +func (c *CacheController) Invalidate(controller client.ObjectKey, pods []string) { + v, _ := c.cache.Get(controller) + now := time.Now() + for _, s := range v { + for _, pod := range pods { + if s.Pod.Name == pod { + s.Status = CometStatus{} + s.Err = fmt.Errorf("invalidated") + s.TS = now + } + } + } + c.cache.Update(controller, v) +} + // Collect returns a StatusCollection for the given controller. Only returns cached CometStatus. func (c *CacheController) Collect(ctx context.Context, controller client.ObjectKey) StatusCollection { pods, err := c.listPods(ctx, controller) diff --git a/internal/fullnode/pod_control.go b/internal/fullnode/pod_control.go index 28a81bd9..8fc8a0d0 100644 --- a/internal/fullnode/pod_control.go +++ b/internal/fullnode/pod_control.go @@ -23,17 +23,23 @@ type Client interface { Scheme() *runtime.Scheme } +type CacheInvalidator interface { + Invalidate(controller client.ObjectKey, pods []string) +} + // PodControl reconciles pods for a CosmosFullNode. type PodControl struct { - client Client - computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int + client Client + cacheInvalidator CacheInvalidator + computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int } // NewPodControl returns a valid PodControl. -func NewPodControl(client Client) PodControl { +func NewPodControl(client Client, cacheInvalidator CacheInvalidator) PodControl { return PodControl{ - client: client, - computeRollout: kube.ComputeRollout, + client: client, + cacheInvalidator: cacheInvalidator, + computeRollout: kube.ComputeRollout, } } @@ -44,7 +50,7 @@ func (pc PodControl) Reconcile( reporter kube.Reporter, crd *cosmosv1.CosmosFullNode, cksums ConfigChecksums, - syncInfo *cosmosv1.SyncInfoStatus, + syncInfo map[string]*cosmosv1.SyncInfoPodStatus, ) (bool, kube.ReconcileError) { var pods corev1.PodList if err := pc.client.List(ctx, &pods, @@ -70,11 +76,24 @@ func (pc PodControl) Reconcile( } } + var invalidateCache []string + + defer func() { + if pc.cacheInvalidator == nil { + return + } + if len(invalidateCache) > 0 { + pc.cacheInvalidator.Invalidate(client.ObjectKeyFromObject(crd), invalidateCache) + } + }() + for _, pod := range diffed.Deletes() { reporter.Info("Deleting pod", "name", pod.Name) if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); kube.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("delete pod %q: %w", pod.Name, err)) } + delete(syncInfo, pod.Name) + invalidateCache = append(invalidateCache, pod.Name) } if len(diffed.Creates())+len(diffed.Deletes()) > 0 { @@ -85,54 +104,49 @@ func (pc PodControl) Reconcile( diffedUpdates := diffed.Updates() if len(diffedUpdates) > 0 { var ( - upgradePods = make(map[string]bool) - otherUpdates = make(map[string]*corev1.Pod) + updatedPods = 0 rpcReachablePods = 0 inSyncPods = 0 + otherUpdates = []*corev1.Pod{} ) - PodsLoop: - for _, ps := range syncInfo.Pods { - for _, existing := range pods.Items { - if existing.Name == ps.Pod { - if existing.DeletionTimestamp != nil { - continue PodsLoop - } - break - } - } + for _, existing := range pods.Items { + podName := existing.Name - if ps.InSync != nil && *ps.InSync { - inSyncPods++ + if existing.DeletionTimestamp != nil { + // Pod is being deleted, so we skip it. + continue } - rpcReachable := ps.Error == nil - if rpcReachable { - rpcReachablePods++ + + var rpcReachable bool + if ps, ok := syncInfo[podName]; ok { + if ps.InSync != nil && *ps.InSync { + inSyncPods++ + } + rpcReachable = ps.Error == nil + if rpcReachable { + rpcReachablePods++ + } } for _, update := range diffedUpdates { - if ps.Pod == update.Name { - awaitingUpgrade := false - for _, existing := range pods.Items { - if existing.Name == ps.Pod { - if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image { - awaitingUpgrade = true - } - break - } - } - if awaitingUpgrade { + if podName == update.Name { + if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image { + // awaiting upgrade if !rpcReachable { - upgradePods[ps.Pod] = true - reporter.Info("Deleting pod for version upgrade", "name", ps.Pod) + updatedPods++ + reporter.Info("Deleting pod for version upgrade", "name", podName) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. if err := pc.client.Delete(ctx, update, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { - return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod, err)) + return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", podName, err)) } + syncInfo[podName].InSync = nil + syncInfo[podName].Error = ptr("version upgrade in progress") + invalidateCache = append(invalidateCache, podName) } else { - otherUpdates[ps.Pod] = update + otherUpdates = append(otherUpdates, update) } } else { - otherUpdates[ps.Pod] = update + otherUpdates = append(otherUpdates, update) } break } @@ -148,32 +162,34 @@ func (pc PodControl) Reconcile( numUpdates := pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), ready) - updated := len(upgradePods) - - if updated == len(diffedUpdates) { + if updatedPods == len(diffedUpdates) { // All pods are updated. return false, nil } - if updated >= numUpdates { + if updatedPods >= numUpdates { // Signal requeue. return true, nil } - for podName, pod := range otherUpdates { - reporter.Info("Deleting pod for update", "podName", podName) + for _, pod := range otherUpdates { + podName := pod.Name + reporter.Info("Deleting pod for update", "name", podName) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("update pod %q: %w", podName, err)) } - updated++ - if updated >= numUpdates { + syncInfo[podName].InSync = nil + syncInfo[podName].Error = ptr("update in progress") + invalidateCache = append(invalidateCache, podName) + updatedPods++ + if updatedPods >= numUpdates { // done for this round break } } - if len(diffedUpdates) == updated { + if len(diffedUpdates) == updatedPods { // All pods are updated. return false, nil } diff --git a/internal/fullnode/pod_control_test.go b/internal/fullnode/pod_control_test.go index bafa123d..023857c2 100644 --- a/internal/fullnode/pod_control_test.go +++ b/internal/fullnode/pod_control_test.go @@ -15,11 +15,51 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +type mockPodClient struct{ mockClient[*corev1.Pod] } + +func newMockPodClient(pods []*corev1.Pod) *mockPodClient { + return &mockPodClient{ + mockClient: mockClient[*corev1.Pod]{ + ObjectList: corev1.PodList{ + Items: valueSlice(pods), + }, + }, + } +} + +func (c *mockPodClient) setPods(pods []*corev1.Pod) { + c.ObjectList = corev1.PodList{ + Items: valueSlice(pods), + } +} + +func (c *mockPodClient) upgradePods( + t *testing.T, + crdName string, + ordinals ...int, +) { + existing := ptrSlice(c.ObjectList.(corev1.PodList).Items) + for _, ordinal := range ordinals { + updatePod(t, crdName, ordinal, existing, newPodWithNewImage, true) + } + c.setPods(existing) +} + +func (c *mockPodClient) deletePods( + t *testing.T, + crdName string, + ordinals ...int, +) { + existing := ptrSlice(c.ObjectList.(corev1.PodList).Items) + for _, ordinal := range ordinals { + updatePod(t, crdName, ordinal, existing, deletedPod, false) + } + c.setPods(existing) +} + func TestPodControl_Reconcile(t *testing.T) { t.Parallel() - type mockPodClient = mockClient[*corev1.Pod] - ctx := context.Background() const namespace = "test" @@ -31,23 +71,17 @@ func TestPodControl_Reconcile(t *testing.T) { pods, err := BuildPods(&crd, nil) require.NoError(t, err) - existing := diff.New(nil, pods).Creates()[0] + existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: []corev1.Pod{*existing}, - } + require.Len(t, existing, 1) - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - InSync: ptr(true), - }, - }, + mClient := newMockPodClient(existing) + + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": {InSync: ptr(true)}, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) require.False(t, requeue) @@ -68,15 +102,12 @@ func TestPodControl_Reconcile(t *testing.T) { crd.Namespace = namespace crd.Spec.Replicas = 3 - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: []corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "hub-98"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "hub-99"}}, - }, - } + mClient := newMockPodClient([]*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "hub-98"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "hub-99"}}, + }) - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, nil) require.NoError(t, err) require.True(t, requeue) @@ -101,40 +132,18 @@ func TestPodControl_Reconcile(t *testing.T) { pods, err := BuildPods(&crd, nil) require.NoError(t, err) - existing := diff.New(nil, pods).Creates() - mClient := mockPodClient{ - ObjectList: corev1.PodList{ - Items: valueSlice(existing), - }, - } + mClient := newMockPodClient(diff.New(nil, pods).Creates()) - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - InSync: ptr(true), - }, - { - Pod: "hub-1", - InSync: ptr(true), - }, - { - Pod: "hub-2", - InSync: ptr(true), - }, - { - Pod: "hub-3", - InSync: ptr(true), - }, - { - Pod: "hub-4", - InSync: ptr(true), - }, - }, + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": {InSync: ptr(true)}, + "hub-1": {InSync: ptr(true)}, + "hub-2": {InSync: ptr(true)}, + "hub-3": {InSync: ptr(true)}, + "hub-4": {InSync: ptr(true)}, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -150,13 +159,7 @@ func TestPodControl_Reconcile(t *testing.T) { require.Zero(t, mClient.CreateCount) - now := metav1.Now() - existing[0].DeletionTimestamp = ptr(now) - existing[1].DeletionTimestamp = ptr(now) - - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.deletePods(t, crd.Name, 0, 1) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -174,22 +177,13 @@ func TestPodControl_Reconcile(t *testing.T) { require.Equal(t, 2, mClient.DeleteCount) // once pod deletion is complete, new pods are created with new image. - existing[0].Spec.Containers[0].Image = "new-image" - existing[1].Spec.Containers[0].Image = "new-image" - existing[0].DeletionTimestamp = nil - existing[1].DeletionTimestamp = nil - - recalculatePodRevision(existing[0], 0) - recalculatePodRevision(existing[1], 1) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.upgradePods(t, crd.Name, 0, 1) - syncInfo.Pods[0].InSync = nil - syncInfo.Pods[0].Error = ptr("upgrade in progress") + syncInfo["hub-0"].InSync = nil + syncInfo["hub-0"].Error = ptr("upgrade in progress") - syncInfo.Pods[1].InSync = nil - syncInfo.Pods[1].Error = ptr("upgrade in progress") + syncInfo["hub-1"].InSync = nil + syncInfo["hub-1"].Error = ptr("upgrade in progress") control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -232,44 +226,33 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - mClient := mockPodClient{ - ObjectList: corev1.PodList{ - Items: valueSlice(existing), - }, - } + mClient := newMockPodClient(existing) // pods are at upgrade height and reachable - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-1", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-2", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-3", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, - { - Pod: "hub-4", - Height: ptr(uint64(100)), - InSync: ptr(true), - }, + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-1": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-2": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-3": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-4": { + Height: ptr(uint64(100)), + InSync: ptr(true), }, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -293,13 +276,7 @@ func TestPodControl_Reconcile(t *testing.T) { require.Zero(t, mClient.CreateCount) require.Equal(t, 2, mClient.DeleteCount) - now := metav1.Now() - existing[0].DeletionTimestamp = ptr(now) - existing[1].DeletionTimestamp = ptr(now) - - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.deletePods(t, crd.Name, 0, 1) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -316,23 +293,14 @@ func TestPodControl_Reconcile(t *testing.T) { // should not delete any more. require.Equal(t, 2, mClient.DeleteCount) - existing[0].Spec.Containers[0].Image = "new-image" - existing[1].Spec.Containers[0].Image = "new-image" - existing[0].DeletionTimestamp = nil - existing[1].DeletionTimestamp = nil - - recalculatePodRevision(existing[0], 0) - recalculatePodRevision(existing[1], 1) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.upgradePods(t, crd.Name, 0, 1) // 0 and 1 are now unavailable, working on upgrade - syncInfo.Pods[0].InSync = nil - syncInfo.Pods[0].Error = ptr("upgrade in progress") + syncInfo["hub-0"].InSync = nil + syncInfo["hub-0"].Error = ptr("upgrade in progress") - syncInfo.Pods[1].InSync = nil - syncInfo.Pods[1].Error = ptr("upgrade in progress") + syncInfo["hub-1"].InSync = nil + syncInfo["hub-1"].Error = ptr("upgrade in progress") // Reconcile 2, should not update anything because 0 and 1 are still in progress. @@ -354,9 +322,9 @@ func TestPodControl_Reconcile(t *testing.T) { require.Equal(t, 2, mClient.DeleteCount) // mock out that one of the pods completed the upgrade. should begin upgrading one more - syncInfo.Pods[0].InSync = ptr(true) - syncInfo.Pods[0].Height = ptr(uint64(101)) - syncInfo.Pods[0].Error = nil + syncInfo["hub-0"].InSync = ptr(true) + syncInfo["hub-0"].Height = ptr(uint64(101)) + syncInfo["hub-0"].Error = nil // Reconcile 3, should update pod 2 (only one) because 1 is still in progress, but 0 is done. @@ -377,12 +345,7 @@ func TestPodControl_Reconcile(t *testing.T) { // should delete one more require.Equal(t, 3, mClient.DeleteCount) - now = metav1.Now() - existing[2].DeletionTimestamp = ptr(now) - - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.deletePods(t, crd.Name, 2) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -399,21 +362,16 @@ func TestPodControl_Reconcile(t *testing.T) { // should not delete any more. require.Equal(t, 3, mClient.DeleteCount) - existing[2].Spec.Containers[0].Image = "new-image" - existing[2].DeletionTimestamp = nil - recalculatePodRevision(existing[2], 2) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient.upgradePods(t, crd.Name, 2) // mock out that both pods completed the upgrade. should begin upgrading the last 2 - syncInfo.Pods[1].InSync = ptr(true) - syncInfo.Pods[1].Height = ptr(uint64(101)) - syncInfo.Pods[1].Error = nil + syncInfo["hub-1"].InSync = ptr(true) + syncInfo["hub-1"].Height = ptr(uint64(101)) + syncInfo["hub-1"].Error = nil - syncInfo.Pods[2].InSync = ptr(true) - syncInfo.Pods[2].Height = ptr(uint64(101)) - syncInfo.Pods[2].Error = nil + syncInfo["hub-2"].InSync = ptr(true) + syncInfo["hub-2"].Height = ptr(uint64(101)) + syncInfo["hub-2"].Error = nil // Reconcile 4, should update 3 and 4 because the rest are done. @@ -461,44 +419,33 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - mClient := mockPodClient{ - ObjectList: corev1.PodList{ - Items: valueSlice(existing), - }, - } + mClient := newMockPodClient(existing) // pods are at upgrade height and reachable - syncInfo := &cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "hub-0", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-1", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-2", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-3", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, - { - Pod: "hub-4", - Height: ptr(uint64(100)), - Error: ptr("panic at upgrade height"), - }, + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-1": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-2": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-3": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-4": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), }, } - control := NewPodControl(&mClient) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -530,3 +477,27 @@ func recalculatePodRevision(pod *corev1.Pod, ordinal int) { pod.Labels["app.kubernetes.io/revision"] = rev1 pod.Annotations["app.kubernetes.io/ordinal"] = fmt.Sprintf("%d", ordinal) } + +func newPodWithNewImage(pod *corev1.Pod) { + pod.DeletionTimestamp = nil + pod.Spec.Containers[0].Image = "new-image" +} + +func deletedPod(pod *corev1.Pod) { + pod.DeletionTimestamp = ptr(metav1.Now()) +} + +func updatePod(t *testing.T, crdName string, ordinal int, pods []*corev1.Pod, updateFn func(pod *corev1.Pod), recalc bool) { + podName := fmt.Sprintf("%s-%d", crdName, ordinal) + for _, pod := range pods { + if pod.Name == podName { + updateFn(pod) + if recalc { + recalculatePodRevision(pod, ordinal) + } + return + } + } + + require.FailNow(t, "pod not found", podName) +} diff --git a/internal/fullnode/status.go b/internal/fullnode/status.go index 02842c05..4ca46360 100644 --- a/internal/fullnode/status.go +++ b/internal/fullnode/status.go @@ -3,7 +3,6 @@ package fullnode import ( "context" - "github.com/samber/lo" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/cosmos" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,24 +26,25 @@ func SyncInfoStatus( ctx context.Context, crd *cosmosv1.CosmosFullNode, collector StatusCollector, -) cosmosv1.SyncInfoStatus { - var status cosmosv1.SyncInfoStatus +) map[string]*cosmosv1.SyncInfoPodStatus { + status := make(map[string]*cosmosv1.SyncInfoPodStatus, crd.Spec.Replicas) coll := collector.Collect(ctx, client.ObjectKeyFromObject(crd)) - status.Pods = lo.Map(coll, func(item cosmos.StatusItem, _ int) cosmosv1.SyncInfoPodStatus { + for _, item := range coll { var stat cosmosv1.SyncInfoPodStatus - stat.Pod = item.GetPod().Name + podName := item.GetPod().Name stat.Timestamp = metav1.NewTime(item.Timestamp()) comet, err := item.GetStatus() if err != nil { stat.Error = ptr(err.Error()) - return stat + status[podName] = &stat + continue } stat.Height = ptr(comet.LatestBlockHeight()) stat.InSync = ptr(!comet.Result.SyncInfo.CatchingUp) - return stat - }) + status[podName] = &stat + } return status } diff --git a/internal/fullnode/status_test.go b/internal/fullnode/status_test.go index b3763169..03a52ee0 100644 --- a/internal/fullnode/status_test.go +++ b/internal/fullnode/status_test.go @@ -72,24 +72,20 @@ func TestSyncInfoStatus(t *testing.T) { } wantTS := metav1.NewTime(ts) - want := cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "pod-0", - Timestamp: wantTS, - Height: ptr(uint64(9999)), - InSync: ptr(false), - }, - {Pod: "pod-1", - Timestamp: wantTS, - Height: ptr(uint64(10000)), - InSync: ptr(true), - }, - { - Pod: "pod-2", - Timestamp: wantTS, - Error: ptr("some error"), - }, + want := map[string]*cosmosv1.SyncInfoPodStatus{ + "pod-0": { + Timestamp: wantTS, + Height: ptr(uint64(9999)), + InSync: ptr(false), + }, + "pod-1": { + Timestamp: wantTS, + Height: ptr(uint64(10000)), + InSync: ptr(true), + }, + "pod-2": { + Timestamp: wantTS, + Error: ptr("some error"), }, }