Skip to content

Commit

Permalink
Add persistent message map
Browse files Browse the repository at this point in the history
Resolves 42wim#541
  • Loading branch information
yousefmansy1 committed Mar 14, 2023
1 parent fb62e33 commit 84644ef
Show file tree
Hide file tree
Showing 106 changed files with 22,940 additions and 8 deletions.
54 changes: 50 additions & 4 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/d5/tengo/v2/stdlib"
lru "github.com/hashicorp/golang-lru"
"github.com/kyokomi/emoji/v2"
"github.com/philippgille/gokv"
"github.com/sirupsen/logrus"
)

Expand All @@ -29,14 +30,17 @@ type Gateway struct {
Message chan config.Message
Name string
Messages *lru.Cache
MessageStore gokv.Store
CanonicalStore gokv.Store

logger *logrus.Entry
}

type BrMsgID struct {
br *bridge.Bridge
ID string
Protocol string
DestName string
ChannelID string
ID string
}

const apiProtocol = "api"
Expand All @@ -59,12 +63,41 @@ func New(rootLogger *logrus.Logger, cfg *config.Gateway, r *Router) *Gateway {
if err := gw.AddConfig(cfg); err != nil {
logger.Errorf("Failed to add configuration to gateway: %#v", err)
}

persistentMessageStorePath, usePersistent := gw.Config.GetString("PersistentMessageStorePath")
if usePersistent {
rootPath := fmt.Sprintf("%s/%s", persistentMessageStorePath, gw.Name)
os.MkdirAll(rootPath, os.ModePerm)

gw.MessageStore = gw.getMessageMapStore(fmt.Sprintf("%s/Messages", rootPath))
gw.CanonicalStore = gw.getMessageMapStore(fmt.Sprintf("%s/Canonical", rootPath))
}

return gw
}

func (gw *Gateway) SetMessageMap(canonicalMsgID string, msgIDs []*BrMsgID) {
_, usePersistent := gw.Config.GetString("PersistentMessageStorePath")
if usePersistent {
gw.setDestMessagesToStore(canonicalMsgID, msgIDs)
} else {
gw.Messages.Add(canonicalMsgID, msgIDs)
}
}

// FindCanonicalMsgID returns the ID under which a message was stored in the cache.
func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string {
ID := protocol + " " + mID

_, usePersistent := gw.Config.GetString("PersistentMessageStorePath")
if usePersistent {
return gw.getCanonicalMessageFromStore(ID)
} else {
return gw.getCanonicalMessageFromMemCache(ID)
}
}

func (gw *Gateway) getCanonicalMessageFromMemCache(ID string) string {
if gw.Messages.Contains(ID) {
return ID
}
Expand Down Expand Up @@ -259,13 +292,26 @@ func (gw *Gateway) getDestChannel(msg *config.Message, dest bridge.Bridge) []con
}

func (gw *Gateway) getDestMsgID(msgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string {
var destID string

_, usePersistent := gw.Config.GetString("PersistentMessageStorePath")
if usePersistent {
destID = gw.getDestMessagesFromStore(msgID, dest, channel)
} else {
destID = gw.getDestMessageFromMemCache(msgID, dest, channel)
}

return strings.Replace(destID, dest.Protocol+" ", "", 1)
}

func (gw *Gateway) getDestMessageFromMemCache(msgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string {
if res, ok := gw.Messages.Get(msgID); ok {
IDs := res.([]*BrMsgID)
for _, id := range IDs {
// check protocol, bridge name and channelname
// for people that reuse the same bridge multiple times. see #342
if dest.Protocol == id.br.Protocol && dest.Name == id.br.Name && channel.ID == id.ChannelID {
return strings.Replace(id.ID, dest.Protocol+" ", "", 1)
if dest.Protocol == id.Protocol && dest.Name == id.DestName && channel.ID == id.ChannelID {
return id.ID
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion gateway/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,13 @@ func (gw *Gateway) handleMessage(rmsg *config.Message, dest *bridge.Bridge) []*B
if msgID == "" {
continue
}
brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + msgID, channel.ID})
brMsgIDs = append(brMsgIDs,
&BrMsgID{
Protocol: dest.Protocol,
DestName: dest.Name,
ChannelID: channel.ID,
ID: msgID,
})
}
return brMsgIDs
}
Expand Down
83 changes: 83 additions & 0 deletions gateway/persistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package gateway

import (
"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
"github.com/philippgille/gokv"
"github.com/philippgille/gokv/badgerdb"
"github.com/philippgille/gokv/encoding"
)

func (gw *Gateway) getMessageMapStore(path string) gokv.Store {
options := badgerdb.Options{
Dir: path,
Codec: encoding.Gob,
}

store, err := badgerdb.NewStore(options)
if err != nil {
gw.logger.Error(err)
gw.logger.Errorf("Could not connect to db: %s", path)
}

return store
}

func (gw *Gateway) getCanonicalMessageFromStore(messageID string) string {
if messageID == "" {
return ""
}

canonicalMsgID := new(string)
found, err := gw.CanonicalStore.Get(messageID, canonicalMsgID)
if err != nil {
gw.logger.Error(err)
}

if found {
return *canonicalMsgID
}

return ""
}

func (gw *Gateway) setCanonicalMessageToStore(messageID string, canonicalMsgID string) {
err := gw.CanonicalStore.Set(messageID, canonicalMsgID)
if err != nil {
gw.logger.Error(err)
}
}

func (gw *Gateway) getDestMessagesFromStore(canonicalMsgID string, dest *bridge.Bridge, channel *config.ChannelInfo) string {
if canonicalMsgID == "" {
return ""
}

destMessageIds := new([]BrMsgID)
found, err := gw.MessageStore.Get(canonicalMsgID, destMessageIds)
if err != nil {
gw.logger.Error(err)
}

if found {
for _, id := range *destMessageIds {
// check protocol, bridge name and channelname
// for people that reuse the same bridge multiple times. see #342
if dest.Protocol == id.Protocol && dest.Name == id.DestName && channel.ID == id.ChannelID {
return id.ID
}
}
}
return ""
}

func (gw *Gateway) setDestMessagesToStore(canonicalMsgID string, msgIDs []*BrMsgID) {
for _, msgID := range msgIDs {
gw.setCanonicalMessageToStore(msgID.Protocol+" "+msgID.ID, canonicalMsgID)
}

err := gw.MessageStore.Set(canonicalMsgID, msgIDs)
if err != nil {
gw.logger.Error(err)
}
}
12 changes: 9 additions & 3 deletions gateway/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,15 @@ func (r *Router) handleReceive() {
// we're adding the original message as a "dest message"
// as when we get the dest messages for a delete the source message isnt in the list
// therefore the delete doesnt happen on the source platform.
msgIDs = append(msgIDs, &BrMsgID{srcBridge, srcBridge.Protocol + " " + msg.ID, msg.Channel + srcBridge.Account})

gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs)
msgIDs = append(msgIDs,
&BrMsgID{
Protocol: srcBridge.Protocol,
DestName: srcBridge.Name,
ChannelID: msg.Channel + srcBridge.Account,
ID: msg.ID,
})

gw.SetMessageMap(msg.Protocol+" "+msg.ID, msgIDs)
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ require (

require (
filippo.io/edwards25519 v1.0.0 // indirect
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/Benau/go_rlottie v0.0.0-20210807002906-98c1b2421989 // indirect
github.com/Jeffail/gabs v1.4.0 // indirect
github.com/apex/log v1.9.0 // indirect
github.com/av-elier/go-decimal-to-rational v0.0.0-20191127152832-89e6aad02ecf // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/dgraph-io/badger v1.6.0 // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dyatlov/go-opengraph v0.0.0-20210112100619-dae8665a5b09 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down Expand Up @@ -107,6 +110,10 @@ require (
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/philippgille/gokv v0.6.0 // indirect
github.com/philippgille/gokv/badgerdb v0.6.0 // indirect
github.com/philippgille/gokv/encoding v0.0.0-20191011213304-eb77f15b9c61 // indirect
github.com/philippgille/gokv/util v0.0.0-20191011213304-eb77f15b9c61 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqbl
github.com/42wim/go-gitter v0.0.0-20170828205020-017310c2d557 h1:IZtuWGfzQnKnCSu+vl8WGLhpVQ5Uvy3rlSwqXSg+sQg=
github.com/42wim/go-gitter v0.0.0-20170828205020-017310c2d557/go.mod h1:jL0YSXMs/txjtGJ4PWrmETOk6KUHMDPMshgQZlTeB3Y=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v26.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
Expand Down Expand Up @@ -459,10 +461,12 @@ github.com/dchote/go-openal v0.0.0-20171116030048-f4a9a141d372/go.mod h1:74z+CYu
github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/dgoogauth v0.0.0-20190221195224-5a805980a5f3/go.mod h1:hEfFauPHz7+NnjR/yHJGhrKo1Za+zStgwUETx3yzqgY=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
Expand Down Expand Up @@ -1344,6 +1348,17 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philippgille/gokv v0.0.0-20191001201555-5ac9a20de634/go.mod h1:OCoWPt+mbYuTO1FUVrQ2SxQU0oaaHBsn6lRhFX3JHOc=
github.com/philippgille/gokv v0.5.1-0.20191011213304-eb77f15b9c61/go.mod h1:OCoWPt+mbYuTO1FUVrQ2SxQU0oaaHBsn6lRhFX3JHOc=
github.com/philippgille/gokv v0.6.0 h1:fNEx/tSwV73nzlYd3iRYB8F+SEVJNNFzH1gsaT8SK2c=
github.com/philippgille/gokv v0.6.0/go.mod h1:tjXRFw9xDHgxLS8WJdfYotKGWp8TWqu4RdXjMDG/XBo=
github.com/philippgille/gokv/badgerdb v0.6.0 h1:4Qigf2SpyXLF8KaM5nA5/D/0aD/bZevuAnrW4ZsDsjA=
github.com/philippgille/gokv/badgerdb v0.6.0/go.mod h1:3u2avs8gtmCc0R0Bw4jKV8aaDfLb5V9JToSASyhpFGM=
github.com/philippgille/gokv/encoding v0.0.0-20191011213304-eb77f15b9c61 h1:IgQDuUPuEFVf22mBskeCLAtvd5c9XiiJG2UYud6eGHI=
github.com/philippgille/gokv/encoding v0.0.0-20191011213304-eb77f15b9c61/go.mod h1:SjxSrCoeYrYn85oTtroyG1ePY8aE72nvLQlw8IYwAN8=
github.com/philippgille/gokv/test v0.0.0-20191011213304-eb77f15b9c61/go.mod h1:EUc+s9ONc1+VOr9NUEd8S0YbGRrQd/gz/p+2tvwt12s=
github.com/philippgille/gokv/util v0.0.0-20191011213304-eb77f15b9c61 h1:ril/jI0JgXNjPWwDkvcRxlZ09kgHXV2349xChjbsQ4o=
github.com/philippgille/gokv/util v0.0.0-20191011213304-eb77f15b9c61/go.mod h1:2dBhsJgY/yVIkjY5V3AnDUxUbEPzT6uQ3LvoVT8TR20=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
Expand Down Expand Up @@ -1916,6 +1931,7 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191011234655-491137f69257/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -2049,6 +2065,7 @@ golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
1 change: 1 addition & 0 deletions vendor/github.com/AndreasBriese/bbloom/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions vendor/github.com/AndreasBriese/bbloom/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 84644ef

Please sign in to comment.