From 05efc9685be5e6e62639c064e525b5bb7fbb8247 Mon Sep 17 00:00:00 2001 From: Yingxin-Jiang Date: Tue, 12 Feb 2019 17:17:12 -0800 Subject: [PATCH] sdkserver: fix race condition in setLabel, setAnnotation, and setState --- pkg/sdkserver/sdkserver.go | 124 ++++++++++++++++---------------- pkg/sdkserver/sdkserver_test.go | 46 +++++++++--- 2 files changed, 101 insertions(+), 69 deletions(-) diff --git a/pkg/sdkserver/sdkserver.go b/pkg/sdkserver/sdkserver.go index cbe72e8249..e9a3230c7c 100644 --- a/pkg/sdkserver/sdkserver.go +++ b/pkg/sdkserver/sdkserver.go @@ -78,6 +78,10 @@ type SDKServer struct { connectedStreams []sdk.SDK_WatchGameServerServer stop <-chan struct{} recorder record.EventRecorder + gsLabels map[string]string + gsAnnotations map[string]string + gsState stablev1alpha1.GameServerState + gsUpdateMutex sync.RWMutex } // NewSDKServer creates a SDKServer that sets up an @@ -107,6 +111,9 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf healthMutex: sync.RWMutex{}, healthFailureCount: 0, streamMutex: sync.RWMutex{}, + gsLabels: map[string]string{}, + gsAnnotations: map[string]string{}, + gsUpdateMutex: sync.RWMutex{}, } s.informerFactory = factory @@ -202,38 +209,30 @@ func (s *SDKServer) Run(stop <-chan struct{}) error { return nil } -// syncGameServer synchronises the GameServer with the -// requested operations -// takes a key in the format of {operation}/{data} +// syncGameServer synchronises the GameServer with the requested operations. +// The format of the key is {operation}. To prevent old operation data from +// overwriting the new one, the operation data is persisted in SDKServer. func (s *SDKServer) syncGameServer(key string) error { - op := strings.Split(key, "/") - rest := op[1:] - - switch Operation(op[0]) { + switch Operation(key) { case updateState: - return s.syncState(rest) + return s.updateState() case updateLabel: - return s.syncLabel(rest) + return s.updateLabels() case updateAnnotation: - return s.syncAnnotation(rest) + return s.updateAnnotations() } return errors.Errorf("could not sync game server key: %s", key) } -// syncState converts the string array into values for updateState -func (s *SDKServer) syncState(rest []string) error { - if len(rest) == 0 { - return errors.New("could not sync state, as not state provided") +// updateState sets the GameServer Status's state to the one persisted in SDKServer, +// i.e. SDKServer.gsState. +func (s *SDKServer) updateState() error { + s.logger.WithField("state", s.gsState).Info("Updating state") + if len(s.gsState) == 0 { + return errors.Errorf("could not update GameServer %s/%s to empty state", s.namespace, s.gameServerName) } - return s.updateState(stablev1alpha1.GameServerState(rest[0])) -} - -// updateState sets the GameServer Status's state to the state -// that has been passed through -func (s *SDKServer) updateState(state stablev1alpha1.GameServerState) error { - s.logger.WithField("state", state).Info("Updating state") gameServers := s.gameServerGetter.GameServers(s.namespace) gs, err := s.gameServer() if err != nil { @@ -246,7 +245,9 @@ func (s *SDKServer) updateState(state stablev1alpha1.GameServerState) error { return nil } - gs.Status.State = state + s.gsUpdateMutex.RLock() + gs.Status.State = s.gsState + s.gsUpdateMutex.RUnlock() _, err = gameServers.Update(gs) // state specific work here @@ -254,7 +255,7 @@ func (s *SDKServer) updateState(state stablev1alpha1.GameServerState) error { s.recorder.Event(gs, corev1.EventTypeWarning, string(gs.Status.State), "No longer healthy") } - return errors.Wrapf(err, "could not update GameServer %s/%s to state %s", s.namespace, s.gameServerName, state) + return errors.Wrapf(err, "could not update GameServer %s/%s to state %s", s.namespace, s.gameServerName, gs.Status.State) } func (s *SDKServer) gameServer() (*stablev1alpha1.GameServer, error) { @@ -262,58 +263,49 @@ func (s *SDKServer) gameServer() (*stablev1alpha1.GameServer, error) { return gs, errors.Wrapf(err, "could not retrieve GameServer %s/%s", s.namespace, s.gameServerName) } -// syncLabel converts the string array values into values for -// updateLabel -func (s *SDKServer) syncLabel(rest []string) error { - if len(rest) < 2 { - return errors.Errorf("could not sync label: %#v", rest) - } - - return s.updateLabel(rest[0], rest[1]) -} - -// updateLabel updates the label on this GameServer, with the prefix of -// "stable.agones.dev/sdk-" -func (s *SDKServer) updateLabel(key, value string) error { - s.logger.WithField("key", key).WithField("value", value).Info("updating label") +// updateLabels updates the labels on this GameServer to the ones persisted in SDKServer, +// i.e. SDKServer.gsLabels, with the prefix of "stable.agones.dev/sdk-" +func (s *SDKServer) updateLabels() error { + s.logger.WithField("labels", s.gsLabels).Info("updating label") gs, err := s.gameServer() if err != nil { return err } gsCopy := gs.DeepCopy() - if gsCopy.ObjectMeta.Labels == nil { + + s.gsUpdateMutex.RLock() + if len(s.gsLabels) > 0 && gsCopy.ObjectMeta.Labels == nil { gsCopy.ObjectMeta.Labels = map[string]string{} } - gsCopy.ObjectMeta.Labels[metadataPrefix+key] = value + for k, v := range s.gsLabels { + gsCopy.ObjectMeta.Labels[metadataPrefix+k] = v + } + s.gsUpdateMutex.RUnlock() _, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy) return err } -// syncAnnotation converts the string array values into values for -// updateAnnotation -func (s *SDKServer) syncAnnotation(rest []string) error { - if len(rest) < 2 { - return errors.Errorf("could not sync annotation: %#v", rest) - } - - return s.updateAnnotation(rest[0], rest[1]) -} - -// updateAnnotation updates the Annotation on this GameServer, with the prefix of -// "stable.agones.dev/sdk-" -func (s *SDKServer) updateAnnotation(key, value string) error { +// updateAnnotations updates the Annotations on this GameServer to the ones persisted in SDKServer, +// i.e. SDKServer.gsAnnotations, with the prefix of "stable.agones.dev/sdk-" +func (s *SDKServer) updateAnnotations() error { + s.logger.WithField("annotations", s.gsAnnotations).Info("updating annotation") gs, err := s.gameServer() if err != nil { return err } gsCopy := gs.DeepCopy() - if gsCopy.ObjectMeta.Annotations == nil { + + s.gsUpdateMutex.RLock() + if len(s.gsAnnotations) > 0 && gsCopy.ObjectMeta.Annotations == nil { gsCopy.ObjectMeta.Annotations = map[string]string{} } - gsCopy.ObjectMeta.Annotations[metadataPrefix+key] = value + for k, v := range s.gsAnnotations { + gsCopy.ObjectMeta.Annotations[metadataPrefix+k] = v + } + s.gsUpdateMutex.RUnlock() _, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy) return err @@ -322,8 +314,10 @@ func (s *SDKServer) updateAnnotation(key, value string) error { // enqueueState enqueue a State change request into the // workerqueue func (s *SDKServer) enqueueState(state stablev1alpha1.GameServerState) { - key := string(updateState) + "/" + string(state) - s.workerqueue.Enqueue(cache.ExplicitKey(key)) + s.gsUpdateMutex.Lock() + s.gsState = state + s.gsUpdateMutex.Unlock() + s.workerqueue.Enqueue(cache.ExplicitKey(string(updateState))) } // Ready enters the RequestReady state change for this GameServer into @@ -363,17 +357,25 @@ func (s *SDKServer) Health(stream sdk.SDK_HealthServer) error { // metdata func (s *SDKServer) SetLabel(_ context.Context, kv *sdk.KeyValue) (*sdk.Empty, error) { s.logger.WithField("values", kv).Info("Adding SetLabel to queue") - key := string(updateLabel) + "/" + kv.Key + "/" + kv.Value - s.workerqueue.Enqueue(cache.ExplicitKey(key)) + + s.gsUpdateMutex.Lock() + s.gsLabels[kv.Key] = kv.Value + s.gsUpdateMutex.Unlock() + + s.workerqueue.Enqueue(cache.ExplicitKey(string(updateLabel))) return &sdk.Empty{}, nil } // SetAnnotation adds the Key/Value to be used to set the annotations with the metadataPrefix to the `GameServer` // metdata func (s *SDKServer) SetAnnotation(_ context.Context, kv *sdk.KeyValue) (*sdk.Empty, error) { - s.logger.WithField("values", kv).Info("Adding SetLabel to queue") - key := string(updateAnnotation) + "/" + kv.Key + "/" + kv.Value - s.workerqueue.Enqueue(cache.ExplicitKey(key)) + s.logger.WithField("values", kv).Info("Adding SetAnnotation to queue") + + s.gsUpdateMutex.Lock() + s.gsAnnotations[kv.Key] = kv.Value + s.gsUpdateMutex.Unlock() + + s.workerqueue.Enqueue(cache.ExplicitKey(string(updateAnnotation))) return &sdk.Empty{}, nil } diff --git a/pkg/sdkserver/sdkserver_test.go b/pkg/sdkserver/sdkserver_test.go index 24a8b22ce4..49a3440331 100644 --- a/pkg/sdkserver/sdkserver_test.go +++ b/pkg/sdkserver/sdkserver_test.go @@ -77,20 +77,28 @@ func TestSidecarRun(t *testing.T) { }, "label": { f: func(sc *SDKServer, ctx context.Context) { - _, err := sc.SetLabel(ctx, &sdk.KeyValue{Key: "foo", Value: "bar"}) + _, err := sc.SetLabel(ctx, &sdk.KeyValue{Key: "foo", Value: "value-foo"}) + assert.Nil(t, err) + _, err = sc.SetLabel(ctx, &sdk.KeyValue{Key: "bar", Value: "value-bar"}) assert.Nil(t, err) }, expected: expected{ - labels: map[string]string{metadataPrefix + "foo": "bar"}, + labels: map[string]string{ + metadataPrefix + "foo": "value-foo", + metadataPrefix + "bar": "value-bar"}, }, }, "annotation": { f: func(sc *SDKServer, ctx context.Context) { - _, err := sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test", Value: "annotation"}) + _, err := sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test-1", Value: "annotation-1"}) + assert.Nil(t, err) + _, err = sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test-2", Value: "annotation-2"}) assert.Nil(t, err) }, expected: expected{ - annotations: map[string]string{metadataPrefix + "test": "annotation"}, + annotations: map[string]string{ + metadataPrefix + "test-1": "annotation-1", + metadataPrefix + "test-2": "annotation-2"}, }, }, } @@ -176,24 +184,40 @@ func TestSDKServerSyncGameServer(t *testing.T) { annotations map[string]string } + type scData struct { + gsState v1alpha1.GameServerState + gsLabels map[string]string + gsAnnotations map[string]string + } + fixtures := map[string]struct { expected expected key string + scData scData }{ "ready": { - key: string(updateState) + "/" + string(v1alpha1.GameServerStateReady), + key: string(updateState), + scData: scData{ + gsState: v1alpha1.GameServerStateReady, + }, expected: expected{ state: v1alpha1.GameServerStateReady, }, }, "label": { - key: string(updateLabel) + "/foo/bar", + key: string(updateLabel), + scData: scData{ + gsLabels: map[string]string{"foo": "bar"}, + }, expected: expected{ labels: map[string]string{metadataPrefix + "foo": "bar"}, }, }, "annotation": { - key: string(updateAnnotation) + "/test/annotation", + key: string(updateAnnotation), + scData: scData{ + gsAnnotations: map[string]string{"test": "annotation"}, + }, expected: expected{ annotations: map[string]string{metadataPrefix + "test": "annotation"}, }, @@ -205,6 +229,11 @@ func TestSDKServerSyncGameServer(t *testing.T) { m := agtesting.NewMocks() sc, err := defaultSidecar(m) assert.Nil(t, err) + + sc.gsState = v.scData.gsState + sc.gsLabels = v.scData.gsLabels + sc.gsAnnotations = v.scData.gsAnnotations + updated := false m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { @@ -253,6 +282,7 @@ func TestSidecarUpdateState(t *testing.T) { m := agtesting.NewMocks() sc, err := defaultSidecar(m) assert.Nil(t, err) + sc.gsState = v1alpha1.GameServerStateReady updated := false @@ -275,7 +305,7 @@ func TestSidecarUpdateState(t *testing.T) { sc.informerFactory.Start(stop) assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced)) - err = sc.updateState(v1alpha1.GameServerStateReady) + err = sc.updateState() assert.Nil(t, err) assert.False(t, updated) })