diff --git a/connector.go b/connector.go index 2d712fd..abc9c00 100644 --- a/connector.go +++ b/connector.go @@ -49,11 +49,11 @@ func (c *connector) listener(ctx *models.ListenerContext) { var e couchbase.Event switch event := ctx.Event.(type) { case models.DcpMutation: - e = couchbase.NewMutateEvent(event.Key, event.Value, event.CollectionName, event.Cas, event.EventTime) + e = couchbase.NewMutateEvent(event.Key, event.Value, event.CollectionName, event.Cas, event.EventTime, event.VbID) case models.DcpExpiration: - e = couchbase.NewExpireEvent(event.Key, nil, event.CollectionName, event.Cas, event.EventTime) + e = couchbase.NewExpireEvent(event.Key, nil, event.CollectionName, event.Cas, event.EventTime, event.VbID) case models.DcpDeletion: - e = couchbase.NewDeleteEvent(event.Key, nil, event.CollectionName, event.Cas, event.EventTime) + e = couchbase.NewDeleteEvent(event.Key, nil, event.CollectionName, event.Cas, event.EventTime, event.VbID) default: return } diff --git a/couchbase/event.go b/couchbase/event.go index 70bf41f..9e8d62f 100644 --- a/couchbase/event.go +++ b/couchbase/event.go @@ -8,12 +8,13 @@ type Event struct { Key []byte Value []byte Cas uint64 + VbID uint16 IsDeleted bool IsExpired bool IsMutated bool } -func NewDeleteEvent(key []byte, value []byte, collectionName string, cas uint64, eventTime time.Time) Event { +func NewDeleteEvent(key []byte, value []byte, collectionName string, cas uint64, eventTime time.Time, vbID uint16) Event { return Event{ Key: key, Value: value, @@ -21,10 +22,11 @@ func NewDeleteEvent(key []byte, value []byte, collectionName string, cas uint64, CollectionName: collectionName, Cas: cas, EventTime: eventTime, + VbID: vbID, } } -func NewExpireEvent(key []byte, value []byte, collectionName string, cas uint64, eventTime time.Time) Event { +func NewExpireEvent(key []byte, value []byte, collectionName string, cas uint64, eventTime time.Time, vbID uint16) Event { return Event{ Key: key, Value: value, @@ -32,10 +34,11 @@ func NewExpireEvent(key []byte, value []byte, collectionName string, cas uint64, CollectionName: collectionName, Cas: cas, EventTime: eventTime, + VbID: vbID, } } -func NewMutateEvent(key []byte, value []byte, collectionName string, cas uint64, eventTime time.Time) Event { +func NewMutateEvent(key []byte, value []byte, collectionName string, cas uint64, eventTime time.Time, vbID uint16) Event { return Event{ Key: key, Value: value, @@ -43,5 +46,6 @@ func NewMutateEvent(key []byte, value []byte, collectionName string, cas uint64, CollectionName: collectionName, Cas: cas, EventTime: eventTime, + VbID: vbID, } }