Skip to content

Commit

Permalink
Improve and clarify lastHealthyAt and lastFailedAt functionality
Browse files Browse the repository at this point in the history
Change definitelyHealthy to safeAsLastReplica
Change variable names in TimestampAfterTimestamp function
Set LastHealthyAt every time a replica appears RW in an engine
Add an atomic utility function for replica failed fields
Explain replica health related fields in comments and CRD
TimestampAfterTimestamp returns an error
Fix one more unit test
Further clarify the reason for getSafeAsLastReplicaCount

Signed-off-by: Eric Weber <[email protected]>
  • Loading branch information
ejweber authored and shuo-wu committed Jan 29, 2024
1 parent 8df86d9 commit 305ff2f
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 58 deletions.
4 changes: 1 addition & 3 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1893,9 +1893,7 @@ func (ec *EngineController) startRebuilding(e *longhorn.Engine, replicaName, add
return
}

now := util.Now()
replica.Spec.FailedAt = now
replica.Spec.LastFailedAt = now
setReplicaFailedAt(replica, util.Now())
replica.Spec.DesireState = longhorn.InstanceStateStopped
if _, err := ec.ds.UpdateReplica(replica); err != nil {
log.WithError(err).Errorf("Unable to mark failed rebuild on replica %v", replicaName)
Expand Down
9 changes: 9 additions & 0 deletions controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ func handleReconcileErrorLogging(logger logrus.FieldLogger, err error, mesg stri
logger.WithError(err).Error(mesg)
}
}

// r.Spec.FailedAt and r.Spec.LastFailedAt should both be set when a replica failure occurs.
// r.Spec.FailedAt may be cleared (before rebuilding), but r.Spec.LastFailedAt must not be.
func setReplicaFailedAt(r *longhorn.Replica, timestamp string) {
r.Spec.FailedAt = timestamp
if timestamp != "" {
r.Spec.LastFailedAt = timestamp
}
}
79 changes: 41 additions & 38 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,7 @@ func (c *VolumeController) ReconcileEngineReplicaState(v *longhorn.Volume, es ma
r.Name, r.Status.CurrentState, r.Spec.EngineName, r.Spec.Active, isNoAvailableBackend)
e.Spec.LogRequested = true
r.Spec.LogRequested = true
now := c.nowHandler()
r.Spec.FailedAt = now
r.Spec.LastFailedAt = now
setReplicaFailedAt(r, c.nowHandler())
r.Spec.DesireState = longhorn.InstanceStateStopped
}
}
Expand Down Expand Up @@ -710,24 +708,21 @@ func (c *VolumeController) ReconcileEngineReplicaState(v *longhorn.Volume, es ma
}
if r.Spec.FailedAt == "" {
log.Warnf("Replica %v is marked as failed, current state %v, mode %v, engine name %v, active %v", r.Name, r.Status.CurrentState, mode, r.Spec.EngineName, r.Spec.Active)
now := c.nowHandler()
r.Spec.FailedAt = now
r.Spec.LastFailedAt = now
setReplicaFailedAt(r, c.nowHandler())
e.Spec.LogRequested = true
r.Spec.LogRequested = true
}
r.Spec.DesireState = longhorn.InstanceStateStopped
} else if mode == longhorn.ReplicaModeRW {
// record once replica became healthy, so if it
// failed in the future, we can tell it apart
// from replica failed during rebuilding
now := c.nowHandler()
if r.Spec.HealthyAt == "" {
c.backoff.DeleteEntry(r.Name)
now := c.nowHandler()
// Set HealthyAt to distinguish this replica from one that has never been rebuilt.
r.Spec.HealthyAt = now
r.Spec.LastHealthyAt = now
r.Spec.RebuildRetryCount = 0
}
// Set LastHealthyAt to record the last time this replica became RW in an engine.
r.Spec.LastHealthyAt = now
healthyCount++
}
}
Expand All @@ -739,9 +734,7 @@ func (c *VolumeController) ReconcileEngineReplicaState(v *longhorn.Volume, es ma
r.Name, r.Status.CurrentState, r.Spec.EngineName, r.Spec.Active)
e.Spec.LogRequested = true
r.Spec.LogRequested = true
now := c.nowHandler()
r.Spec.FailedAt = now
r.Spec.LastFailedAt = now
setReplicaFailedAt(r, c.nowHandler())
r.Spec.DesireState = longhorn.InstanceStateStopped
}
}
Expand Down Expand Up @@ -909,16 +902,29 @@ func isHealthyAndActiveReplica(r *longhorn.Replica) bool {
return true
}

func isDefinitelyHealthyAndActiveReplica(r *longhorn.Replica) bool {
// Usually, we consider a replica to be healthy if its spec.HealthyAt != "". However, a corrupted replica can also have
// spec.HealthyAt != "". In the normal flow of events, such a replica will eventually fail and be rebuilt when the
// corruption is detected. However, in rare cases, all replicas may fail and the corrupted replica may temporarily be
// the only one available for autosalvage. When this happens, we clear spec.FailedAt from the corrupted replica and
// repeatedly fail to run an engine with it. We can likely manually recover from this situation, but only if we avoid
// cleaning up the other, non-corrupted replicas. This function provides a extra check in addition to
// isHealthyAndActiveReplica. If we see a replica with spec.LastFailedAt (indicating it has failed sometime in the past,
// even if we are currently attempting to use it), we confirm that it has spec.LastHealthyAt (indicating the last time
// it successfully became read/write in an engine) after spec.LastFailedAt. If the replica does not meet this condition,
// it is not "safe as last replica", and we should not clean up the other replicas for its volume.
func isSafeAsLastReplica(r *longhorn.Replica) bool {
if !isHealthyAndActiveReplica(r) {
return false
}
// An empty r.Spec.FailedAt doesn't necessarily indicate a healthy replica. The replica could be caught in an
// autosalvage loop. If it is, its r.Spec.FailedAt is repeatedly transitioning between empty and some time. Ensure
// the replica has become healthyAt since it last became failedAt.
// We know r.Spec.LastHealthyAt != "" because r.Spec.HealthyAt != "" from isHealthyAndActiveReplica.
if r.Spec.LastFailedAt != "" && !util.TimestampAfterTimestamp(r.Spec.LastHealthyAt, r.Spec.LastFailedAt) {
return false
if r.Spec.LastFailedAt != "" {
healthyAfterFailed, err := util.TimestampAfterTimestamp(r.Spec.LastHealthyAt, r.Spec.LastFailedAt)
if err != nil {
logrus.WithField("replica", r.Name).Errorf("Failed to verify if safe as last replica: %v", err)
}
if !healthyAfterFailed || err != nil {
return false
}
}
return true
}
Expand All @@ -933,10 +939,11 @@ func getHealthyAndActiveReplicaCount(rs map[string]*longhorn.Replica) int {
return count
}

func getDefinitelyHealthyAndActiveReplicaCount(rs map[string]*longhorn.Replica) int {
// See comments for isSafeAsLastReplica for an explanation of why we need this.
func getSafeAsLastReplicaCount(rs map[string]*longhorn.Replica) int {
count := 0
for _, r := range rs {
if isDefinitelyHealthyAndActiveReplica(r) {
if isSafeAsLastReplica(r) {
count++
}
}
Expand Down Expand Up @@ -989,7 +996,9 @@ func (c *VolumeController) cleanupReplicas(v *longhorn.Volume, es map[string]*lo
}

func (c *VolumeController) cleanupCorruptedOrStaleReplicas(v *longhorn.Volume, rs map[string]*longhorn.Replica) error {
healthyCount := getDefinitelyHealthyAndActiveReplicaCount(rs)
// See comments for isSafeAsLastReplica for an explanation of why we call getSafeAsLastReplicaCount instead of
// getHealthyAndActiveReplicaCount here.
safeAsLastReplicaCount := getSafeAsLastReplicaCount(rs)
cleanupLeftoverReplicas := !c.isVolumeUpgrading(v) && !isVolumeMigrating(v)
log := getLoggerForVolume(c.logger, v)

Expand Down Expand Up @@ -1018,15 +1027,15 @@ func (c *VolumeController) cleanupCorruptedOrStaleReplicas(v *longhorn.Volume, r
}

if datastore.IsDataEngineV1(v.Spec.DataEngine) {
if shouldCleanUpFailedReplicaV1(r, v.Spec.StaleReplicaTimeout, healthyCount, v.Spec.Image) {
if shouldCleanUpFailedReplicaV1(r, v.Spec.StaleReplicaTimeout, safeAsLastReplicaCount, v.Spec.Image) {
log.WithField("replica", r.Name).Info("Cleaning up corrupted, staled replica")
if err := c.deleteReplica(r, rs); err != nil {
return errors.Wrapf(err, "cannot clean up staled replica %v", r.Name)
}
}
} else {
// TODO: check `staled` flag after v2 volume supports online replica rebuilding
if healthyCount != 0 {
if safeAsLastReplicaCount != 0 {
if err := c.deleteReplica(r, rs); err != nil {
return errors.Wrapf(err, "failed to clean up staled replica %v", r.Name)
}
Expand All @@ -1038,8 +1047,6 @@ func (c *VolumeController) cleanupCorruptedOrStaleReplicas(v *longhorn.Volume, r
}

func (c *VolumeController) cleanupFailedToScheduledReplicas(v *longhorn.Volume, rs map[string]*longhorn.Replica) (err error) {
// We don't need the more rigorous getDefinitelyHealthyAndActiveReplicaCount here, because the replicas we will
// potentially delete are definitely worthless (contain no data).
healthyCount := getHealthyAndActiveReplicaCount(rs)
hasEvictionRequestedReplicas := hasReplicaEvictionRequested(rs)

Expand All @@ -1061,7 +1068,7 @@ func (c *VolumeController) cleanupFailedToScheduledReplicas(v *longhorn.Volume,
}

func (c *VolumeController) cleanupExtraHealthyReplicas(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
healthyCount := getDefinitelyHealthyAndActiveReplicaCount(rs)
healthyCount := getHealthyAndActiveReplicaCount(rs)
if healthyCount <= v.Spec.NumberOfReplicas {
return nil
}
Expand Down Expand Up @@ -1413,7 +1420,7 @@ func (c *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[strin
// Bring up the replicas for auto-salvage
for _, r := range failedUsableReplicas {
if util.TimestampWithinLimit(lastFailedAt, r.Spec.FailedAt, AutoSalvageTimeLimit) {
r.Spec.FailedAt = ""
setReplicaFailedAt(r, "")
log.WithField("replica", r.Name).Warn("Automatically salvaging volume replica")
msg := fmt.Sprintf("Replica %v of volume %v will be automatically salvaged", r.Name, v.Name)
c.eventRecorder.Event(v, corev1.EventTypeWarning, constant.EventReasonAutoSalvaged, msg)
Expand Down Expand Up @@ -1835,9 +1842,7 @@ func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *l
}
log.WithField("replica", r.Name).Warn(msg)
if r.Spec.FailedAt == "" {
now := c.nowHandler()
r.Spec.FailedAt = now
r.Spec.LastFailedAt = now
setReplicaFailedAt(r, c.nowHandler())
}
r.Spec.DesireState = longhorn.InstanceStateStopped
}
Expand Down Expand Up @@ -1959,9 +1964,7 @@ func (c *VolumeController) closeVolumeDependentResources(v *longhorn.Volume, e *
for _, r := range rs {
if r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && dataExists {
// This replica must have been rebuilding. Mark it as failed.
now := c.nowHandler()
r.Spec.FailedAt = now
r.Spec.LastFailedAt = now
setReplicaFailedAt(r, c.nowHandler())
// Unscheduled replicas are marked failed here when volume is detached.
// Check if NodeId or DiskID is empty to avoid deleting reusableFailedReplica when replenished.
if r.Spec.NodeID == "" || r.Spec.DiskID == "" {
Expand Down Expand Up @@ -2212,7 +2215,7 @@ func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Eng
if reusableFailedReplica != nil {
if !c.backoff.IsInBackOffSinceUpdate(reusableFailedReplica.Name, time.Now()) {
log.Infof("Failed replica %v will be reused during rebuilding", reusableFailedReplica.Name)
reusableFailedReplica.Spec.FailedAt = ""
setReplicaFailedAt(reusableFailedReplica, "")
reusableFailedReplica.Spec.HealthyAt = ""

if datastore.IsReplicaRebuildingFailed(reusableFailedReplica) {
Expand Down Expand Up @@ -4512,12 +4515,12 @@ func (c *VolumeController) ReconcilePersistentVolume(volume *longhorn.Volume) er
return nil
}

func shouldCleanUpFailedReplicaV1(r *longhorn.Replica, staleReplicaTimeout, definitelyHealthyCount int,
func shouldCleanUpFailedReplicaV1(r *longhorn.Replica, staleReplicaTimeout, safeAsLastReplicaCount int,
volumeCurrentImage string) bool {
// Even if healthyAt == "", lastHealthyAt != "" indicates this replica has some (potentially invalid) data. We MUST
// NOT delete it until we're sure the engine can start with another replica. In the worst case scenario, maybe we
// can recover data from this replica.
if r.Spec.LastHealthyAt != "" && definitelyHealthyCount == 0 {
if r.Spec.LastHealthyAt != "" && safeAsLastReplicaCount == 0 {
return false
}
// Failed to rebuild too many times.
Expand Down
1 change: 1 addition & 0 deletions controller/volume_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ func (s *TestSuite) TestVolumeLifeCycle(c *C) {
r.Status.Port = randomPort()
}
r.Spec.HealthyAt = getTestNow()
r.Spec.LastHealthyAt = r.Spec.HealthyAt
for _, e := range tc.engines {
if r.Spec.FailedAt == "" {
e.Status.ReplicaModeMap[name] = "RW"
Expand Down
4 changes: 4 additions & 0 deletions k8s/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2640,16 +2640,20 @@ spec:
evictionRequested:
type: boolean
failedAt:
description: FailedAt is set when a running replica fails or when a running engine is unable to use a replica for any reason. FailedAt indicates the time the failure occurred. When FailedAt is set, a replica is likely to have useful (though possibly stale) data. A replica with FailedAt set must be rebuilt from a non-failed replica (or it can be used in a salvage if all replicas are failed). FailedAt is cleared before a rebuild or salvage.
type: string
hardNodeAffinity:
type: string
healthyAt:
description: HealthyAt is set the first time a replica becomes read/write in an engine after creation or rebuild. HealthyAt indicates the time the last successful rebuild occurred. When HealthyAt is set, a replica is likely to have useful (though possibly stale) data. HealthyAt is cleared before a rebuild.
type: string
image:
type: string
lastFailedAt:
description: LastFailedAt is always set at the same time as FailedAt. Unlike FailedAt, LastFailedAt is never cleared. LastFailedAt is not a reliable indicator of the state of a replica's data. For example, a replica with LastFailedAt may already be healthy and in use again. However, because it is never cleared, it can be compared to LastHealthyAt to help prevent dangerous replica deletion in some corner cases.
type: string
lastHealthyAt:
description: LastHealthyAt is set every time a replica becomes read/write in an engine. Unlike HealthyAt, LastHealthyAt is never cleared. LastHealthyAt is not a reliable indicator of the state of a replica's data. For example, a replica with LastHealthyAt set may be in the middle of a rebuild. However, because it is never cleared, it can be compared to LastFailedAt to help prevent dangerous replica deletion in some corner cases.
type: string
logRequested:
type: boolean
Expand Down
15 changes: 15 additions & 0 deletions k8s/pkg/apis/longhorn/v1beta2/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,27 @@ type ReplicaSpec struct {
// +optional
EngineName string `json:"engineName"`
// +optional
// HealthyAt is set the first time a replica becomes read/write in an engine after creation or rebuild. HealthyAt
// indicates the time the last successful rebuild occurred. When HealthyAt is set, a replica is likely to have
// useful (though possibly stale) data. HealthyAt is cleared before a rebuild.
HealthyAt string `json:"healthyAt"`
// +optional
// LastHealthyAt is set every time a replica becomes read/write in an engine. Unlike HealthyAt, LastHealthyAt is
// never cleared. LastHealthyAt is not a reliable indicator of the state of a replica's data. For example, a
// replica with LastHealthyAt set may be in the middle of a rebuild. However, because it is never cleared, it can be
// compared to LastFailedAt to help prevent dangerous replica deletion in some corner cases.
LastHealthyAt string `json:"lastHealthyAt"`
// +optional
// FailedAt is set when a running replica fails or when a running engine is unable to use a replica for any reason.
// FailedAt indicates the time the failure occurred. When FailedAt is set, a replica is likely to have useful
// (though possibly stale) data. A replica with FailedAt set must be rebuilt from a non-failed replica (or it can
// be used in a salvage if all replicas are failed). FailedAt is cleared before a rebuild or salvage.
FailedAt string `json:"failedAt"`
// +optional
// LastFailedAt is always set at the same time as FailedAt. Unlike FailedAt, LastFailedAt is never cleared.
// LastFailedAt is not a reliable indicator of the state of a replica's data. For example, a replica with
// LastFailedAt may already be healthy and in use again. However, because it is never cleared, it can be compared to
// LastHealthyAt to help prevent dangerous replica deletion in some corner cases.
LastFailedAt string `json:"lastFailedAt"`
// +optional
DiskID string `json:"diskID"`
Expand Down
16 changes: 8 additions & 8 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,18 +261,18 @@ func TimestampWithinLimit(latest time.Time, ts string, limit time.Duration) bool
return deadline.After(latest)
}

func TimestampAfterTimestamp(after string, before string) bool {
afterT, err := time.Parse(time.RFC3339, after)
// TimestampAfterTimestamp returns true if timestamp1 is after timestamp2. It returns false otherwise and an error if
// either timestamp cannot be parsed.
func TimestampAfterTimestamp(timestamp1 string, timestamp2 string) (bool, error) {
time1, err := time.Parse(time.RFC3339, timestamp1)
if err != nil {
logrus.Errorf("Cannot parse after time %v", after)
return false
return false, errors.Wrapf(err, "cannot parse timestamp %v", timestamp1)
}
beforeT, err := time.Parse(time.RFC3339, before)
time2, err := time.Parse(time.RFC3339, timestamp2)
if err != nil {
logrus.Errorf("Cannot parse before time %v", before)
return false
return false, errors.Wrapf(err, "cannot parse timestamp %v", timestamp2)
}
return afterT.After(beforeT)
return time1.After(time2), nil
}

func ValidateString(name string) bool {
Expand Down
24 changes: 15 additions & 9 deletions util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,22 +186,28 @@ func (s *TestSuite) TestGetValidMountPoint(c *C) {

func TestTimestampAfterTimestamp(t *testing.T) {
tests := map[string]struct {
before string
after string
want bool
timestamp1 string
timestamp2 string
want bool
wantErr bool
}{
"beforeBadFormat": {"2024-01-02T18:37Z", "2024-01-02T18:16:37Z", false},
"afterBadFormat": {"2024-01-02T18:16:37Z", "2024-01-02T18:37Z", false},
"actuallyAfter": {"2024-01-02T18:17:37Z", "2024-01-02T18:16:37Z", true},
"actuallyBefore": {"2024-01-02T18:16:37Z", "2024-01-02T18:17:37Z", false},
"sameTime": {"2024-01-02T18:16:37Z", "2024-01-02T18:16:37Z", false},
"timestamp1BadFormat": {"2024-01-02T18:37Z", "2024-01-02T18:16:37Z", false, true},
"timestamp2BadFormat": {"2024-01-02T18:16:37Z", "2024-01-02T18:37Z", false, true},
"timestamp1After": {"2024-01-02T18:17:37Z", "2024-01-02T18:16:37Z", true, false},
"timestamp2NotAfter": {"2024-01-02T18:16:37Z", "2024-01-02T18:17:37Z", false, false},
"sameTime": {"2024-01-02T18:16:37Z", "2024-01-02T18:16:37Z", false, false},
}

assert := assert.New(t)
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
got := TimestampAfterTimestamp(tc.before, tc.after)
got, err := TimestampAfterTimestamp(tc.timestamp1, tc.timestamp2)
assert.Equal(tc.want, got)
if tc.wantErr {
assert.Error(err)
} else {
assert.NoError(err)
}
})
}
}

0 comments on commit 305ff2f

Please sign in to comment.