Skip to content

Commit

Permalink
feat: improve eventbus with lib
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 12, 2023
1 parent 800c885 commit 80a9a6a
Show file tree
Hide file tree
Showing 17 changed files with 79 additions and 63 deletions.
18 changes: 11 additions & 7 deletions couchbase/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/asaskevich/EventBus"

"github.com/Trendyol/go-dcp/config"

"github.com/Trendyol/go-dcp/helpers"
Expand All @@ -23,7 +25,7 @@ import (

type cbMembership struct {
client Client
bus helpers.Bus
bus EventBus.Bus
info *membership.Model
infoChan chan *membership.Model
heartbeatTicker *time.Ticker
Expand Down Expand Up @@ -256,7 +258,7 @@ func (h *cbMembership) rebalance(instances []Instance) {
logger.Log.Error("error while rebalance, self = %v, err: %v", string(h.id), err)
panic(err)
} else {
h.bus.Emit(helpers.MembershipChangedBusEventName, &membership.Model{
h.bus.Publish(helpers.MembershipChangedBusEventName, &membership.Model{
MemberNumber: selfOrder,
TotalMembers: len(instances),
})
Expand Down Expand Up @@ -293,16 +295,14 @@ func (h *cbMembership) Close() {
h.heartbeatTicker.Stop()
}

func (h *cbMembership) membershipChangedListener(event interface{}) {
model := event.(*membership.Model)

func (h *cbMembership) membershipChangedListener(model *membership.Model) {
h.info = model
go func() {
h.infoChan <- model
}()
}

func NewCBMembership(config *config.Dcp, client Client, bus helpers.Bus) membership.Membership {
func NewCBMembership(config *config.Dcp, client Client, bus EventBus.Bus) membership.Membership {
if !config.IsCouchbaseMetadata() {
err := errors.New("unsupported metadata type")
logger.Log.Error("cannot initialize couchbase membership, err: %v", err)
Expand All @@ -328,7 +328,11 @@ func NewCBMembership(config *config.Dcp, client Client, bus helpers.Bus) members
cbm.startHeartbeat()
cbm.startMonitor()

bus.Subscribe(helpers.MembershipChangedBusEventName, cbm.membershipChangedListener)
err := bus.Subscribe(helpers.MembershipChangedBusEventName, cbm.membershipChangedListener)
if err != nil {
logger.Log.Error("error while subscribe membership changed event: %v", err)
panic(err)
}

return cbm
}
21 changes: 15 additions & 6 deletions couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package couchbase
import (
"time"

"github.com/asaskevich/EventBus"

"github.com/Trendyol/go-dcp/wrapper"

"github.com/Trendyol/go-dcp/logger"
Expand Down Expand Up @@ -60,7 +62,7 @@ func (om *ObserverMetric) AddExpiration() {
}

type observer struct {
bus helpers.Bus
bus EventBus.Bus
metrics *wrapper.ConcurrentSwissMap[uint16, *ObserverMetric]
listenerEndCh models.ListenerEndCh
collectionIDs map[uint32]string
Expand All @@ -79,9 +81,7 @@ func (so *observer) AddCatchup(vbID uint16, seqNo gocbcore.SeqNo) {
so.catchupNeededVbIDCount++
}

func (so *observer) persistSeqNoChangedListener(event interface{}) {
persistSeqNo := event.(models.PersistSeqNo)

func (so *observer) persistSeqNoChangedListener(persistSeqNo models.PersistSeqNo) {
if persistSeqNo.SeqNo != 0 {
currentPersistSeqNo, _ := so.persistSeqNo.Load(persistSeqNo.VbID)

Expand Down Expand Up @@ -391,6 +391,11 @@ func (so *observer) Close() {
}
}()

err := so.bus.Unsubscribe(helpers.PersistSeqNoChangedBusEventName, so.persistSeqNoChangedListener)
if err != nil {
logger.Log.Error("error while unsubscribe: %v", err)
}

so.closed = true
close(so.listenerCh)

Expand Down Expand Up @@ -418,7 +423,7 @@ func (so *observer) CloseEnd() {
func NewObserver(
config *dcp.Dcp,
collectionIDs map[uint32]string,
bus helpers.Bus,
bus EventBus.Bus,
) Observer {
observer := &observer{
currentSnapshots: wrapper.CreateConcurrentSwissMap[uint16, *models.SnapshotMarker](1024),
Expand All @@ -433,7 +438,11 @@ func NewObserver(
config: config,
}

observer.bus.Subscribe(helpers.PersistSeqNoChangedBusEventName, observer.persistSeqNoChangedListener)
err := observer.bus.Subscribe(helpers.PersistSeqNoChangedBusEventName, observer.persistSeqNoChangedListener)
if err != nil {
logger.Log.Error("cannot subscribe to persistSeqNo changed event: %v", err)
panic(err)
}

return observer
}
8 changes: 5 additions & 3 deletions couchbase/rollback_mitigation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

"github.com/asaskevich/EventBus"

"github.com/couchbase/gocbcore/v10"

"github.com/Trendyol/go-dcp/wrapper"
Expand Down Expand Up @@ -50,7 +52,7 @@ func (v *vbUUIDAndSeqNo) SetVbUUID(vbUUID gocbcore.VbUUID) {
type rollbackMitigation struct {
client Client
config *config.Dcp
bus helpers.Bus
bus EventBus.Bus
configSnapshot *gocbcore.ConfigSnapshot
persistedSeqNos *wrapper.ConcurrentSwissMap[uint16, []*vbUUIDAndSeqNo]
vbUUIDMap *wrapper.ConcurrentSwissMap[uint16, gocbcore.VbUUID]
Expand Down Expand Up @@ -289,7 +291,7 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU
replicas[replica].SetSeqNo(result.PersistSeqNo)
replicas[replica].SetVbUUID(result.VbUUID)

r.bus.Emit(helpers.PersistSeqNoChangedBusEventName, models.PersistSeqNo{
r.bus.Publish(helpers.PersistSeqNoChangedBusEventName, models.PersistSeqNo{
VbID: vbID,
SeqNo: r.getMinSeqNo(vbID),
})
Expand Down Expand Up @@ -380,7 +382,7 @@ func (r *rollbackMitigation) Stop() {
logger.Log.Info("rollback mitigation stopped")
}

func NewRollbackMitigation(client Client, config *config.Dcp, vbIds []uint16, bus helpers.Bus) RollbackMitigation {
func NewRollbackMitigation(client Client, config *config.Dcp, vbIds []uint16, bus EventBus.Bus) RollbackMitigation {
return &rollbackMitigation{
client: client,
config: config,
Expand Down
14 changes: 11 additions & 3 deletions dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"syscall"
"time"

"github.com/asaskevich/EventBus"

"github.com/Trendyol/go-dcp/membership"

"github.com/sirupsen/logrus"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -92,7 +96,7 @@ func (s *dcp) SetEventHandler(eventHandler models.EventHandler) {
s.eventHandler = eventHandler
}

func (s *dcp) membershipChangedListener(_ interface{}) {
func (s *dcp) membershipChangedListener(_ *membership.Model) {
s.stream.Rebalance()
}

Expand All @@ -115,7 +119,7 @@ func (s *dcp) Start() {

logger.Log.Info("using %v metadata", reflect.TypeOf(s.metadata))

bus := helpers.NewBus()
bus := EventBus.New()

vBuckets := s.client.GetNumVBuckets()

Expand All @@ -137,7 +141,11 @@ func (s *dcp) Start() {

s.stream.Open()

bus.Subscribe(helpers.MembershipChangedBusEventName, s.membershipChangedListener)
err := bus.Subscribe(helpers.MembershipChangedBusEventName, s.membershipChangedListener)
if err != nil {
logger.Log.Error("cannot subscribe to membership changed event: %v", err)
panic(err)
}

if !s.config.API.Disabled {
go func() {
Expand Down
1 change: 1 addition & 0 deletions example/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require github.com/Trendyol/go-dcp v0.0.0
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/couchbase/gocbcore/v10 v10.2.9 // indirect
Expand Down
2 changes: 2 additions & 0 deletions example/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=
github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/couchbase/gocbcore/v10 v10.2.9
github.com/gofiber/fiber/v2 v2.50.0
github.com/google/uuid v1.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=
github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down
28 changes: 0 additions & 28 deletions helpers/bus.go

This file was deleted.

5 changes: 3 additions & 2 deletions kubernetes/ha_membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/helpers"
"github.com/Trendyol/go-dcp/membership"
"github.com/asaskevich/EventBus"
)

type haMembership struct {
Expand Down Expand Up @@ -31,12 +32,12 @@ func (h *haMembership) membershipChangedListener(event interface{}) {
}()
}

func NewHaMembership(_ *config.Dcp, bus helpers.Bus) membership.Membership {
func NewHaMembership(_ *config.Dcp, bus EventBus.Bus) membership.Membership {
ham := &haMembership{
infoChan: make(chan *membership.Model),
}

bus.Subscribe(helpers.MembershipChangedBusEventName, ham.membershipChangedListener)
bus.Publish(helpers.MembershipChangedBusEventName, ham.membershipChangedListener)

return ham
}
14 changes: 9 additions & 5 deletions kubernetes/leader_elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/asaskevich/EventBus"

"github.com/Trendyol/go-dcp/config"

"github.com/Trendyol/go-dcp/membership"
Expand Down Expand Up @@ -80,9 +82,7 @@ func (le *leaderElector) Run(ctx context.Context) {
}()
}

func (le *leaderElector) membershipChangedListener(event interface{}) {
model := event.(*membership.Model)

func (le *leaderElector) membershipChangedListener(model *membership.Model) {
le.client.AddLabel(
le.leaseLockNamespace,
"member",
Expand All @@ -95,7 +95,7 @@ func NewLeaderElector(
config *config.Dcp,
myIdentity *models.Identity,
handler leaderelector.Handler,
bus helpers.Bus,
bus EventBus.Bus,
) leaderelector.LeaderElector {
var leaseLockName string
var leaseLockNamespace string
Expand Down Expand Up @@ -124,7 +124,11 @@ func NewLeaderElector(
leaseLockNamespace: leaseLockNamespace,
}

bus.Subscribe(helpers.MembershipChangedBusEventName, le.membershipChangedListener)
err := bus.Subscribe(helpers.MembershipChangedBusEventName, le.membershipChangedListener)
if err != nil {
logger.Log.Error("cannot subscribe to membership changed event: %v", err)
panic(err)
}

return le
}
8 changes: 5 additions & 3 deletions servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"time"

"github.com/asaskevich/EventBus"

"github.com/Trendyol/go-dcp/wrapper"

"github.com/Trendyol/go-dcp/config"
Expand Down Expand Up @@ -34,7 +36,7 @@ type ServiceDiscovery interface {
}

type serviceDiscovery struct {
bus helpers.Bus
bus EventBus.Bus
leaderService *Service
services *wrapper.ConcurrentSwissMap[string, *Service]
heartbeatTicker *time.Ticker
Expand Down Expand Up @@ -197,11 +199,11 @@ func (s *serviceDiscovery) SetInfo(memberNumber int, totalMembers int) {

logger.Log.Info("new info arrived for member: %v/%v", memberNumber, totalMembers)

s.bus.Emit(helpers.MembershipChangedBusEventName, newInfo)
s.bus.Publish(helpers.MembershipChangedBusEventName, newInfo)
}
}

func NewServiceDiscovery(config *config.Dcp, bus helpers.Bus) ServiceDiscovery {
func NewServiceDiscovery(config *config.Dcp, bus EventBus.Bus) ServiceDiscovery {
return &serviceDiscovery{
services: wrapper.CreateConcurrentSwissMap[string, *Service](0),
bus: bus,
Expand Down
7 changes: 4 additions & 3 deletions stream/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"github.com/asaskevich/EventBus"

"github.com/Trendyol/go-dcp/config"

"github.com/Trendyol/go-dcp/leaderelector"
Expand All @@ -14,7 +16,6 @@ import (

"github.com/Trendyol/go-dcp/logger"

"github.com/Trendyol/go-dcp/helpers"
"github.com/Trendyol/go-dcp/servicediscovery"
)

Expand All @@ -30,7 +31,7 @@ type LeaderElection interface {
type leaderElection struct {
rpcServer servicediscovery.Server
serviceDiscovery servicediscovery.ServiceDiscovery
bus helpers.Bus
bus EventBus.Bus
myIdentity *models.Identity
config *config.Dcp
newLeaderLock *sync.Mutex
Expand Down Expand Up @@ -91,7 +92,7 @@ func (l *leaderElection) Stop() {
func NewLeaderElection(
config *config.Dcp,
serviceDiscovery servicediscovery.ServiceDiscovery,
bus helpers.Bus,
bus EventBus.Bus,
) LeaderElection {
return &leaderElection{
config: config,
Expand Down
Loading

0 comments on commit 80a9a6a

Please sign in to comment.