diff --git a/couchbase/membership.go b/couchbase/membership.go index 6cf04ad..89c38a0 100644 --- a/couchbase/membership.go +++ b/couchbase/membership.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/asaskevich/EventBus" + "github.com/Trendyol/go-dcp/config" "github.com/Trendyol/go-dcp/helpers" @@ -23,7 +25,7 @@ import ( type cbMembership struct { client Client - bus helpers.Bus + bus EventBus.Bus info *membership.Model infoChan chan *membership.Model heartbeatTicker *time.Ticker @@ -256,7 +258,7 @@ func (h *cbMembership) rebalance(instances []Instance) { logger.Log.Error("error while rebalance, self = %v, err: %v", string(h.id), err) panic(err) } else { - h.bus.Emit(helpers.MembershipChangedBusEventName, &membership.Model{ + h.bus.Publish(helpers.MembershipChangedBusEventName, &membership.Model{ MemberNumber: selfOrder, TotalMembers: len(instances), }) @@ -293,16 +295,14 @@ func (h *cbMembership) Close() { h.heartbeatTicker.Stop() } -func (h *cbMembership) membershipChangedListener(event interface{}) { - model := event.(*membership.Model) - +func (h *cbMembership) membershipChangedListener(model *membership.Model) { h.info = model go func() { h.infoChan <- model }() } -func NewCBMembership(config *config.Dcp, client Client, bus helpers.Bus) membership.Membership { +func NewCBMembership(config *config.Dcp, client Client, bus EventBus.Bus) membership.Membership { if !config.IsCouchbaseMetadata() { err := errors.New("unsupported metadata type") logger.Log.Error("cannot initialize couchbase membership, err: %v", err) @@ -328,7 +328,11 @@ func NewCBMembership(config *config.Dcp, client Client, bus helpers.Bus) members cbm.startHeartbeat() cbm.startMonitor() - bus.Subscribe(helpers.MembershipChangedBusEventName, cbm.membershipChangedListener) + err := bus.Subscribe(helpers.MembershipChangedBusEventName, cbm.membershipChangedListener) + if err != nil { + logger.Log.Error("error while subscribe membership changed event: %v", err) + panic(err) + } return cbm } diff --git a/couchbase/observer.go b/couchbase/observer.go index 46c5abf..b849668 100644 --- a/couchbase/observer.go +++ b/couchbase/observer.go @@ -3,6 +3,8 @@ package couchbase import ( "time" + "github.com/asaskevich/EventBus" + "github.com/Trendyol/go-dcp/wrapper" "github.com/Trendyol/go-dcp/logger" @@ -60,7 +62,7 @@ func (om *ObserverMetric) AddExpiration() { } type observer struct { - bus helpers.Bus + bus EventBus.Bus metrics *wrapper.ConcurrentSwissMap[uint16, *ObserverMetric] listenerEndCh models.ListenerEndCh collectionIDs map[uint32]string @@ -79,9 +81,7 @@ func (so *observer) AddCatchup(vbID uint16, seqNo gocbcore.SeqNo) { so.catchupNeededVbIDCount++ } -func (so *observer) persistSeqNoChangedListener(event interface{}) { - persistSeqNo := event.(models.PersistSeqNo) - +func (so *observer) persistSeqNoChangedListener(persistSeqNo models.PersistSeqNo) { if persistSeqNo.SeqNo != 0 { currentPersistSeqNo, _ := so.persistSeqNo.Load(persistSeqNo.VbID) @@ -391,6 +391,11 @@ func (so *observer) Close() { } }() + err := so.bus.Unsubscribe(helpers.PersistSeqNoChangedBusEventName, so.persistSeqNoChangedListener) + if err != nil { + logger.Log.Error("error while unsubscribe: %v", err) + } + so.closed = true close(so.listenerCh) @@ -418,7 +423,7 @@ func (so *observer) CloseEnd() { func NewObserver( config *dcp.Dcp, collectionIDs map[uint32]string, - bus helpers.Bus, + bus EventBus.Bus, ) Observer { observer := &observer{ currentSnapshots: wrapper.CreateConcurrentSwissMap[uint16, *models.SnapshotMarker](1024), @@ -433,7 +438,11 @@ func NewObserver( config: config, } - observer.bus.Subscribe(helpers.PersistSeqNoChangedBusEventName, observer.persistSeqNoChangedListener) + err := observer.bus.Subscribe(helpers.PersistSeqNoChangedBusEventName, observer.persistSeqNoChangedListener) + if err != nil { + logger.Log.Error("cannot subscribe to persistSeqNo changed event: %v", err) + panic(err) + } return observer } diff --git a/couchbase/rollback_mitigation.go b/couchbase/rollback_mitigation.go index 5e0954c..a8b2ba9 100644 --- a/couchbase/rollback_mitigation.go +++ b/couchbase/rollback_mitigation.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "github.com/asaskevich/EventBus" + "github.com/couchbase/gocbcore/v10" "github.com/Trendyol/go-dcp/wrapper" @@ -50,7 +52,7 @@ func (v *vbUUIDAndSeqNo) SetVbUUID(vbUUID gocbcore.VbUUID) { type rollbackMitigation struct { client Client config *config.Dcp - bus helpers.Bus + bus EventBus.Bus configSnapshot *gocbcore.ConfigSnapshot persistedSeqNos *wrapper.ConcurrentSwissMap[uint16, []*vbUUIDAndSeqNo] vbUUIDMap *wrapper.ConcurrentSwissMap[uint16, gocbcore.VbUUID] @@ -289,7 +291,7 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU replicas[replica].SetSeqNo(result.PersistSeqNo) replicas[replica].SetVbUUID(result.VbUUID) - r.bus.Emit(helpers.PersistSeqNoChangedBusEventName, models.PersistSeqNo{ + r.bus.Publish(helpers.PersistSeqNoChangedBusEventName, models.PersistSeqNo{ VbID: vbID, SeqNo: r.getMinSeqNo(vbID), }) @@ -380,7 +382,7 @@ func (r *rollbackMitigation) Stop() { logger.Log.Info("rollback mitigation stopped") } -func NewRollbackMitigation(client Client, config *config.Dcp, vbIds []uint16, bus helpers.Bus) RollbackMitigation { +func NewRollbackMitigation(client Client, config *config.Dcp, vbIds []uint16, bus EventBus.Bus) RollbackMitigation { return &rollbackMitigation{ client: client, config: config, diff --git a/dcp.go b/dcp.go index 0d05821..7c0f187 100644 --- a/dcp.go +++ b/dcp.go @@ -9,6 +9,10 @@ import ( "syscall" "time" + "github.com/asaskevich/EventBus" + + "github.com/Trendyol/go-dcp/membership" + "github.com/sirupsen/logrus" jsoniter "github.com/json-iterator/go" @@ -92,7 +96,7 @@ func (s *dcp) SetEventHandler(eventHandler models.EventHandler) { s.eventHandler = eventHandler } -func (s *dcp) membershipChangedListener(_ interface{}) { +func (s *dcp) membershipChangedListener(_ *membership.Model) { s.stream.Rebalance() } @@ -115,7 +119,7 @@ func (s *dcp) Start() { logger.Log.Info("using %v metadata", reflect.TypeOf(s.metadata)) - bus := helpers.NewBus() + bus := EventBus.New() vBuckets := s.client.GetNumVBuckets() @@ -137,7 +141,11 @@ func (s *dcp) Start() { s.stream.Open() - bus.Subscribe(helpers.MembershipChangedBusEventName, s.membershipChangedListener) + err := bus.Subscribe(helpers.MembershipChangedBusEventName, s.membershipChangedListener) + if err != nil { + logger.Log.Error("cannot subscribe to membership changed event: %v", err) + panic(err) + } if !s.config.API.Disabled { go func() { diff --git a/example/go.mod b/example/go.mod index f922a04..9b4c9f4 100644 --- a/example/go.mod +++ b/example/go.mod @@ -9,6 +9,7 @@ require github.com/Trendyol/go-dcp v0.0.0 require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect + github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/couchbase/gocbcore/v10 v10.2.9 // indirect diff --git a/example/go.sum b/example/go.sum index 560e644..034d02e 100644 --- a/example/go.sum +++ b/example/go.sum @@ -6,6 +6,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= diff --git a/go.mod b/go.mod index 3896da9..908c7b0 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/ansrivas/fiberprometheus/v2 v2.6.1 + github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/couchbase/gocbcore/v10 v10.2.9 github.com/gofiber/fiber/v2 v2.50.0 github.com/google/uuid v1.4.0 diff --git a/go.sum b/go.sum index 7f856b9..9323f8c 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= diff --git a/helpers/bus.go b/helpers/bus.go deleted file mode 100644 index d871e41..0000000 --- a/helpers/bus.go +++ /dev/null @@ -1,28 +0,0 @@ -package helpers - -type Listener = func(event interface{}) - -type Bus interface { - Emit(eventName string, event interface{}) - Subscribe(eventName string, listener Listener) -} - -type bus struct { - listeners map[string]Listener -} - -func (i *bus) Emit(eventName string, event interface{}) { - if listener, ok := i.listeners[eventName]; ok { - listener(event) - } -} - -func (i *bus) Subscribe(eventName string, listener Listener) { - i.listeners[eventName] = listener -} - -func NewBus() Bus { - return &bus{ - listeners: map[string]Listener{}, - } -} diff --git a/kubernetes/ha_membership.go b/kubernetes/ha_membership.go index 39a2a06..a0981fc 100644 --- a/kubernetes/ha_membership.go +++ b/kubernetes/ha_membership.go @@ -4,6 +4,7 @@ import ( "github.com/Trendyol/go-dcp/config" "github.com/Trendyol/go-dcp/helpers" "github.com/Trendyol/go-dcp/membership" + "github.com/asaskevich/EventBus" ) type haMembership struct { @@ -31,12 +32,12 @@ func (h *haMembership) membershipChangedListener(event interface{}) { }() } -func NewHaMembership(_ *config.Dcp, bus helpers.Bus) membership.Membership { +func NewHaMembership(_ *config.Dcp, bus EventBus.Bus) membership.Membership { ham := &haMembership{ infoChan: make(chan *membership.Model), } - bus.Subscribe(helpers.MembershipChangedBusEventName, ham.membershipChangedListener) + bus.Publish(helpers.MembershipChangedBusEventName, ham.membershipChangedListener) return ham } diff --git a/kubernetes/leader_elector.go b/kubernetes/leader_elector.go index 6c70eb1..399dab8 100644 --- a/kubernetes/leader_elector.go +++ b/kubernetes/leader_elector.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/asaskevich/EventBus" + "github.com/Trendyol/go-dcp/config" "github.com/Trendyol/go-dcp/membership" @@ -80,9 +82,7 @@ func (le *leaderElector) Run(ctx context.Context) { }() } -func (le *leaderElector) membershipChangedListener(event interface{}) { - model := event.(*membership.Model) - +func (le *leaderElector) membershipChangedListener(model *membership.Model) { le.client.AddLabel( le.leaseLockNamespace, "member", @@ -95,7 +95,7 @@ func NewLeaderElector( config *config.Dcp, myIdentity *models.Identity, handler leaderelector.Handler, - bus helpers.Bus, + bus EventBus.Bus, ) leaderelector.LeaderElector { var leaseLockName string var leaseLockNamespace string @@ -124,7 +124,11 @@ func NewLeaderElector( leaseLockNamespace: leaseLockNamespace, } - bus.Subscribe(helpers.MembershipChangedBusEventName, le.membershipChangedListener) + err := bus.Subscribe(helpers.MembershipChangedBusEventName, le.membershipChangedListener) + if err != nil { + logger.Log.Error("cannot subscribe to membership changed event: %v", err) + panic(err) + } return le } diff --git a/servicediscovery/service_discovery.go b/servicediscovery/service_discovery.go index 4060690..d1be683 100644 --- a/servicediscovery/service_discovery.go +++ b/servicediscovery/service_discovery.go @@ -5,6 +5,8 @@ import ( "sort" "time" + "github.com/asaskevich/EventBus" + "github.com/Trendyol/go-dcp/wrapper" "github.com/Trendyol/go-dcp/config" @@ -34,7 +36,7 @@ type ServiceDiscovery interface { } type serviceDiscovery struct { - bus helpers.Bus + bus EventBus.Bus leaderService *Service services *wrapper.ConcurrentSwissMap[string, *Service] heartbeatTicker *time.Ticker @@ -197,11 +199,11 @@ func (s *serviceDiscovery) SetInfo(memberNumber int, totalMembers int) { logger.Log.Info("new info arrived for member: %v/%v", memberNumber, totalMembers) - s.bus.Emit(helpers.MembershipChangedBusEventName, newInfo) + s.bus.Publish(helpers.MembershipChangedBusEventName, newInfo) } } -func NewServiceDiscovery(config *config.Dcp, bus helpers.Bus) ServiceDiscovery { +func NewServiceDiscovery(config *config.Dcp, bus EventBus.Bus) ServiceDiscovery { return &serviceDiscovery{ services: wrapper.CreateConcurrentSwissMap[string, *Service](0), bus: bus, diff --git a/stream/leader_election.go b/stream/leader_election.go index 87f9222..f1bbaba 100644 --- a/stream/leader_election.go +++ b/stream/leader_election.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "github.com/asaskevich/EventBus" + "github.com/Trendyol/go-dcp/config" "github.com/Trendyol/go-dcp/leaderelector" @@ -14,7 +16,6 @@ import ( "github.com/Trendyol/go-dcp/logger" - "github.com/Trendyol/go-dcp/helpers" "github.com/Trendyol/go-dcp/servicediscovery" ) @@ -30,7 +31,7 @@ type LeaderElection interface { type leaderElection struct { rpcServer servicediscovery.Server serviceDiscovery servicediscovery.ServiceDiscovery - bus helpers.Bus + bus EventBus.Bus myIdentity *models.Identity config *config.Dcp newLeaderLock *sync.Mutex @@ -91,7 +92,7 @@ func (l *leaderElection) Stop() { func NewLeaderElection( config *config.Dcp, serviceDiscovery servicediscovery.ServiceDiscovery, - bus helpers.Bus, + bus EventBus.Bus, ) LeaderElection { return &leaderElection{ config: config, diff --git a/stream/stream.go b/stream/stream.go index a7c4007..d4bd7cd 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/asaskevich/EventBus" + "github.com/couchbase/gocbcore/v10" "github.com/Trendyol/go-dcp/wrapper" @@ -48,7 +50,7 @@ type stream struct { rollbackMitigation couchbase.RollbackMitigation observer couchbase.Observer vBucketDiscovery VBucketDiscovery - bus helpers.Bus + bus EventBus.Bus eventHandler models.EventHandler stopCh chan struct{} finishStreamWithCloseCh chan struct{} @@ -356,7 +358,7 @@ func NewStream(client couchbase.Client, listener models.Listener, collectionIDs map[uint32]string, stopCh chan struct{}, - bus helpers.Bus, + bus EventBus.Bus, eventHandler models.EventHandler, ) Stream { return &stream{ diff --git a/stream/vbucket_discovery.go b/stream/vbucket_discovery.go index 6c8d18a..583ca8b 100644 --- a/stream/vbucket_discovery.go +++ b/stream/vbucket_discovery.go @@ -3,6 +3,8 @@ package stream import ( "errors" + "github.com/asaskevich/EventBus" + "github.com/Trendyol/go-dcp/config" "github.com/Trendyol/go-dcp/kubernetes" @@ -75,7 +77,7 @@ func (s *vBucketDiscovery) GetMetric() *VBucketDiscoveryMetric { func NewVBucketDiscovery(client couchbase.Client, config *config.Dcp, vBucketNumber int, - bus helpers.Bus, + bus EventBus.Bus, ) VBucketDiscovery { var ms membership.Membership diff --git a/test/integration/go.mod b/test/integration/go.mod index fd5bf15..036c24b 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -9,6 +9,7 @@ require github.com/Trendyol/go-dcp v0.0.0 require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect + github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/couchbase/gocbcore/v10 v10.2.9 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 560e644..034d02e 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -6,6 +6,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=