Skip to content

Commit

Permalink
compare status update event sequence ID
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Sep 14, 2024
1 parent 889250d commit bb9c49a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pkg/cloudevents/work/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const (
// This annotation is used for tracing the ManifestWork specific changes, the value of this annotation
// should be a sequence number representing the ManifestWork specific generation.
CloudEventsResourceVersionAnnotationKey = "cloudevents.open-cluster-management.io/resourceversion"

// CloudEventsSequenceIDAnnotationKey is the key of the status update event sequence ID.
// The sequence id represents the order in which status update events occur on a single agent.
CloudEventsSequenceIDAnnotationKey = "cloudevents.open-cluster-management.io/sequenceid"
)

// CloudEventsOriginalSourceLabelKey is the key of the cloudevents original source label.
Expand Down
10 changes: 10 additions & 0 deletions pkg/cloudevents/work/source/codec/manifestbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)

Expand Down Expand Up @@ -95,6 +96,11 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
}

sequenceID, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionStatusUpdateSequenceID])
if err != nil {
return nil, fmt.Errorf("failed to get sequenceid extension: %v", err)
}

metaObj := metav1.ObjectMeta{}

// the agent sends the work meta data back, restore the meta to the received work, otherwise only set the
Expand All @@ -113,6 +119,10 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo

metaObj.UID = kubetypes.UID(resourceID)
metaObj.ResourceVersion = fmt.Sprintf("%d", resourceVersion)
if metaObj.Annotations == nil {
metaObj.Annotations = map[string]string{}
}
metaObj.Annotations[common.CloudEventsSequenceIDAnnotationKey] = sequenceID

work := &workv1.ManifestWork{
TypeMeta: metav1.TypeMeta{},
Expand Down
18 changes: 18 additions & 0 deletions pkg/cloudevents/work/store/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,31 @@ func (b *workProcessor) handleWork(work *workv1.ManifestWork) error {
return nil
}

if updatedWork.Annotations == nil {
updatedWork.Annotations = map[string]string{}
}
lastSequenceID := lastWork.Annotations[common.CloudEventsSequenceIDAnnotationKey]
sequenceID := work.Annotations[common.CloudEventsSequenceIDAnnotationKey]
greater, err := utils.CompareSnowflakeSequenceIDs(lastSequenceID, sequenceID)
if err != nil {
klog.Errorf("invalid resource version for work %s/%s, %v", lastWork.Namespace, lastWork.Name, err)
return nil
}

if !greater {
klog.Warningf("the work %s/%s current sequenceID %s is less than its last %s, ignore",
lastWork.Namespace, lastWork.Name, sequenceID, lastSequenceID)
return nil
}

// no status change
if equality.Semantic.DeepEqual(lastWork.Status, work.Status) {
return nil
}

// the work has been handled by agent, we ensure a finalizer on the work
updatedWork.Finalizers = ensureFinalizers(updatedWork.Finalizers)
updatedWork.Annotations[common.CloudEventsSequenceIDAnnotationKey] = sequenceID
updatedWork.Status = work.Status
// update the work with status in the local cache.
return b.store.Update(updatedWork)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudevents/work/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
for _, work := range works {
if errs := utils.Validate(work); len(errs) != 0 {
return nil, fmt.Errorf(errs.ToAggregate().Error())
return nil, fmt.Errorf("%s", errs.ToAggregate().Error())
}

if err := store.Add(work.DeepCopy()); err != nil {
Expand Down
30 changes: 30 additions & 0 deletions pkg/cloudevents/work/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"

"github.com/bwmarrin/snowflake"
jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"

Expand Down Expand Up @@ -176,3 +177,32 @@ func Encode(work *workv1.ManifestWork) error {

return nil
}

// CompareSnowflakeSequenceIDs compares two snowflake sequence IDs.
// Returns true if the current ID is greater than the last.
// If the last sequence ID is empty, then the current is greater.
func CompareSnowflakeSequenceIDs(last, current string) (bool, error) {
if current != "" && last == "" {
return true, nil
}

lastSID, err := snowflake.ParseString(last)
if err != nil {
return false, fmt.Errorf("unable to parse last sequence ID: %s, %v", last, err)
}

currentSID, err := snowflake.ParseString(current)
if err != nil {
return false, fmt.Errorf("unable to parse current sequence ID: %s %v", current, err)
}

if currentSID.Node() != lastSID.Node() {
return false, fmt.Errorf("sequence IDs (%s,%s) are not from the same node", last, current)
}

if currentSID.Time() != lastSID.Time() {
return currentSID.Time() > lastSID.Time(), nil
}

return currentSID.Step() > lastSID.Step(), nil
}
36 changes: 36 additions & 0 deletions pkg/cloudevents/work/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,42 @@ func TestEncode(t *testing.T) {
}
}

func TestCompareSnowflakeSequenceIDs(t *testing.T) {
cases := []struct {
name string
lastSID string
currentSID string
expected bool
}{
{
name: "last sid is empty",
lastSID: "",
currentSID: "1834773391719010304",
expected: true,
},
{
name: "compare two sids",
lastSID: "1834773391719010304",
currentSID: "1834773613329256448",
expected: true,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
actual, err := CompareSnowflakeSequenceIDs(c.lastSID, c.currentSID)
if err != nil {
t.Fatal(err)
}

if actual != c.expected {
t.Errorf("expected %v, but %v", c.expected, actual)
}

})
}
}

func configMap() *corev1.ConfigMap {
return &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Expand Down

0 comments on commit bb9c49a

Please sign in to comment.