Skip to content

Commit

Permalink
Fix scenarios where error messages aren't propagated correctly (#424) (
Browse files Browse the repository at this point in the history
…#425)

* Fix scenarios where error messages aren't propagated correctly (#424)

* Improve tests for agent and service/remote to expose bad behaviors
* Breakdown test into multiple ones to make them more clear
* Fix scenarios where error messages aren't propagated correctly

---------

Co-authored-by: Felippe Durán <[email protected]>

* fix(groups): add cancelFunc and Close callback

When running tests we need to ensure a graceful cleanup by
terminating/closing the context by calling the Close() method,
which should call the cancelFunc of the group service context.
This should graceful handle goroutines like groupTTLCleanup
with a sane context cancellation

---------

Co-authored-by: Felippe Durán <[email protected]>
Co-authored-by: Felippe Durán <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent 7a8c904 commit afce0c5
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 56 deletions.
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ github.com/onsi/ginkgo/v2 v2.1.6/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7
github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
Expand Down
134 changes: 119 additions & 15 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package agent

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
Expand All @@ -33,6 +34,8 @@ import (
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/topfreegames/pitaya/v3/pkg/conn/codec"
codecmocks "github.com/topfreegames/pitaya/v3/pkg/conn/codec/mocks"
"github.com/topfreegames/pitaya/v3/pkg/conn/message"
messagemocks "github.com/topfreegames/pitaya/v3/pkg/conn/message/mocks"
Expand All @@ -45,6 +48,7 @@ import (
metricsmocks "github.com/topfreegames/pitaya/v3/pkg/metrics/mocks"
"github.com/topfreegames/pitaya/v3/pkg/mocks"
"github.com/topfreegames/pitaya/v3/pkg/protos"
"github.com/topfreegames/pitaya/v3/pkg/serialize"
serializemocks "github.com/topfreegames/pitaya/v3/pkg/serialize/mocks"
"github.com/topfreegames/pitaya/v3/pkg/session"
)
Expand Down Expand Up @@ -520,6 +524,7 @@ func TestAgentResponseMID(t *testing.T) {
expected := pendingWrite{ctx: ctx, data: []byte("ok!"), err: nil}
var err error
if table.msgErr {
mockSerializer.EXPECT().Unmarshal(gomock.Any(), gomock.Any()).Return(nil)
err = ag.ResponseMID(ctx, table.mid, table.data, table.msgErr)
} else {
err = ag.ResponseMID(ctx, table.mid, table.data)
Expand Down Expand Up @@ -837,19 +842,39 @@ func TestAgentSendHandshakeResponse(t *testing.T) {
}

func TestAnswerWithError(t *testing.T) {
tables := []struct {
unknownError := e.NewError(errors.New(""), e.ErrUnknownCode)
table := []struct {
name string
answeredErr error
encoderErr error
getPayloadErr error
resErr error
err error
expectedErr error
}{
{"success", nil, nil, nil},
{"failure_get_payload", errors.New("serialize err"), nil, errors.New("serialize err")},
{"failure_response_mid", nil, errors.New("responsemid err"), errors.New("responsemid err")},
{
name: "should succeed with unknown error",
answeredErr: assert.AnError,
encoderErr: nil,
getPayloadErr: nil,
expectedErr: unknownError,
},
{
name: "should not answer if fails to get payload",
answeredErr: assert.AnError,
encoderErr: nil,
getPayloadErr: errors.New("serialize err"),
expectedErr: nil,
},
{
name: "should not answer if fails to send",
answeredErr: assert.AnError,
encoderErr: assert.AnError,
getPayloadErr: nil,
expectedErr: nil,
},
}

for _, table := range tables {
t.Run(table.name, func(t *testing.T) {
for _, row := range table {
t.Run(row.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -862,18 +887,97 @@ func TestAnswerWithError(t *testing.T) {
ag := newAgent(nil, nil, mockEncoder, mockSerializer, time.Second, 1, nil, messageEncoder, nil, sessionPool).(*agentImpl)
assert.NotNil(t, ag)

mockSerializer.EXPECT().Marshal(gomock.Any()).Return(nil, table.getPayloadErr)
if table.getPayloadErr == nil {
mockEncoder.EXPECT().Encode(packet.Type(packet.Data), gomock.Any())
}
ag.AnswerWithError(nil, uint(rand.Int()), errors.New("something went wrong"))
if table.err == nil {
helpers.ShouldEventuallyReceive(t, ag.chSend)
mockSerializer.EXPECT().Marshal(gomock.Any()).Return(nil, row.getPayloadErr).AnyTimes()
mockSerializer.EXPECT().Unmarshal(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockEncoder.EXPECT().Encode(packet.Type(packet.Data), gomock.Any()).Return(nil, row.encoderErr).AnyTimes()

ag.AnswerWithError(nil, uint(rand.Int()), row.answeredErr)
if row.expectedErr != nil {
pWrite := helpers.ShouldEventuallyReceive(t, ag.chSend)
assert.Equal(t, pendingWrite{err: row.expectedErr}, pWrite)
}
})
}
}

type customSerializer struct{}

func (*customSerializer) Marshal(obj interface{}) ([]byte, error) { return json.Marshal(obj) }
func (*customSerializer) Unmarshal(data []byte, obj interface{}) error {
return json.Unmarshal(data, obj)
}
func (*customSerializer) GetName() string { return "custom" }

func TestAgentAnswerWithError(t *testing.T) {
jsonSerializer, err := serialize.NewSerializer(serialize.JSON)
require.NoError(t, err)

protobufSerializer, err := serialize.NewSerializer(serialize.PROTOBUF)
require.NoError(t, err)

customSerializer := &customSerializer{}

table := []struct {
name string
answeredErr error
serializer serialize.Serializer
expectedErr error
}{
{
name: "should return unknown code for generic error and JSON serializer",
answeredErr: assert.AnError,
serializer: jsonSerializer,
expectedErr: e.NewError(assert.AnError, e.ErrUnknownCode),
},
{
name: "should return custom code for pitaya error and JSON serializer",
answeredErr: e.NewError(assert.AnError, "CUSTOM-123"),
serializer: jsonSerializer,
expectedErr: e.NewError(assert.AnError, "CUSTOM-123"),
},
{
name: "should return unknown code for generic error and Protobuf serializer",
answeredErr: assert.AnError,
serializer: protobufSerializer,
expectedErr: e.NewError(assert.AnError, e.ErrUnknownCode),
},
{
name: "should return custom code for pitaya error and Protobuf serializer",
answeredErr: e.NewError(assert.AnError, "CUSTOM-123"),
serializer: protobufSerializer,
expectedErr: e.NewError(assert.AnError, "CUSTOM-123"),
},
{
name: "should return unknown code for generic error and custom serializer",
answeredErr: assert.AnError,
serializer: customSerializer,
expectedErr: e.NewError(assert.AnError, e.ErrUnknownCode),
},
{
name: "should return custom code for pitaya error and custom serializer",
answeredErr: e.NewError(assert.AnError, "CUSTOM-123"),
serializer: customSerializer,
expectedErr: e.NewError(assert.AnError, "CUSTOM-123"),
},
}

for _, row := range table {
t.Run(row.name, func(t *testing.T) {
encoder := codec.NewPomeloPacketEncoder()

messageEncoder := message.NewMessagesEncoder(false)
sessionPool := session.NewSessionPool()
ag := newAgent(nil, nil, encoder, row.serializer, time.Second, 1, nil, messageEncoder, nil, sessionPool).(*agentImpl)
assert.NotNil(t, ag)

ag.AnswerWithError(nil, uint(rand.Int()), row.answeredErr)

pWrite := helpers.ShouldEventuallyReceive(t, ag.chSend)
assert.Equal(t, row.expectedErr, pWrite.(pendingWrite).err)
})
}
}

func TestAgentHeartbeat(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
7 changes: 5 additions & 2 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package errors

import "errors"

// ErrUnknownCode is a string code representing an unknown error
// This will be used when no error code is sent by the handler
const ErrUnknownCode = "PIT-000"
Expand All @@ -46,9 +48,10 @@ type Error struct {
Metadata map[string]string
}

//NewError ctor
// NewError ctor
func NewError(err error, code string, metadata ...map[string]string) *Error {
if pitayaErr, ok := err.(*Error); ok {
var pitayaErr *Error
if ok := errors.As(err, &pitayaErr); ok {
if len(metadata) > 0 {
mergeMetadatas(pitayaErr, metadata[0])
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/groups/etcd_group_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"sync"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
"go.etcd.io/etcd/api/v3/mvccpb"
"github.com/topfreegames/pitaya/v3/pkg/config"
"github.com/topfreegames/pitaya/v3/pkg/constants"
"github.com/topfreegames/pitaya/v3/pkg/logger"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
)

var (
Expand All @@ -22,6 +22,7 @@ var (

// EtcdGroupService base ETCD struct solution
type EtcdGroupService struct {
cancelFunc context.CancelFunc
}

// NewEtcdGroupService returns a new group instance
Expand Down Expand Up @@ -282,3 +283,9 @@ func (c *EtcdGroupService) GroupRenewTTL(ctx context.Context, groupName string)
}
return constants.ErrEtcdLeaseNotFound
}

func (c *EtcdGroupService) Close() {
if c.cancelFunc != nil {
c.cancelFunc()
}
}
11 changes: 11 additions & 0 deletions pkg/groups/etcd_group_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,65 +41,76 @@ func setup(t *testing.T) (*integration.ClusterV3, GroupService) {
func TestEtcdCreateDuplicatedGroup(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testCreateDuplicatedGroup(etcdGroupService, t)
}

func TestEtcdCreateGroup(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testCreateGroup(etcdGroupService, t)
}

func TestEtcdCreateGroupWithTTL(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testCreateGroupWithTTL(etcdGroupService, t)
}

func TestEtcdGroupAddMember(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testGroupAddMember(etcdGroupService, t)
}

func TestEtcdGroupAddDuplicatedMember(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testGroupAddDuplicatedMember(etcdGroupService, t)
}

func TestEtcdGroupContainsMember(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testGroupContainsMember(etcdGroupService, t)
}

func TestEtcdRemove(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testRemove(etcdGroupService, t)
}

func TestEtcdDelete(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testDelete(etcdGroupService, t)
}

func TestEtcdRemoveAll(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testRemoveAll(etcdGroupService, t)
}

func TestEtcdCount(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testCount(etcdGroupService, t)
}

func TestEtcdMembers(t *testing.T) {
cluster, etcdGroupService := setup(t)
defer cluster.Terminate(t)
defer etcdGroupService.Close()
testMembers(etcdGroupService, t)
}
1 change: 1 addition & 0 deletions pkg/groups/group_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type (
GroupRemoveAll(ctx context.Context, groupName string) error
GroupRemoveMember(ctx context.Context, groupName, uid string) error
GroupRenewTTL(ctx context.Context, groupName string) error
Close()
}
)

Expand Down
39 changes: 31 additions & 8 deletions pkg/groups/memory_group_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (

// MemoryGroupService base in server memory solution
type MemoryGroupService struct {
cancelFunc context.CancelFunc
}

// MemoryGroup is the struct stored in each group key(which is the name of the group)
Expand All @@ -28,22 +29,38 @@ type MemoryGroup struct {

// NewMemoryGroupService returns a new group instance
func NewMemoryGroupService(config config.MemoryGroupConfig) *MemoryGroupService {
ctx, cancel := context.WithCancel(context.Background())

service := &MemoryGroupService{cancelFunc: cancel}
memoryOnce.Do(func() {
memoryGroups = make(map[string]*MemoryGroup)
go groupTTLCleanup(config.TickDuration)
go groupTTLCleanup(ctx, config.TickDuration)
})
return &MemoryGroupService{}
return service
}

func groupTTLCleanup(duration time.Duration) {
for now := range time.Tick(duration) {
memoryGroupsMu.Lock()
for groupName, mg := range memoryGroups {
if mg.TTL != 0 && now.UnixNano()-mg.LastRefresh > mg.TTL {
func groupTTLCleanup(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case now := <-ticker.C:
memoryGroupsMu.Lock()
for groupName, mg := range memoryGroups {
if mg.TTL != 0 && now.UnixNano()-mg.LastRefresh > mg.TTL {
delete(memoryGroups, groupName)
}
}
memoryGroupsMu.Unlock()
case <-ctx.Done():
memoryGroupsMu.Lock()
for groupName := range memoryGroups {
delete(memoryGroups, groupName)
}
memoryGroupsMu.Unlock()
return
}
memoryGroupsMu.Unlock()
}
}

Expand Down Expand Up @@ -200,3 +217,9 @@ func (c *MemoryGroupService) GroupRenewTTL(ctx context.Context, groupName string
}
return constants.ErrMemoryTTLNotFound
}

func (c *MemoryGroupService) Close() {
if c.cancelFunc != nil {
c.cancelFunc()
}
}
Loading

0 comments on commit afce0c5

Please sign in to comment.