From db183688f0fe81922d11fdc57a61eda37de4b832 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Wed, 25 Sep 2024 12:06:14 +0800 Subject: [PATCH 1/4] feat: get not notify conversationIDs --- go.mod | 2 +- go.sum | 4 +-- internal/rpc/conversation/conversaion.go | 8 ++++++ .../storage/cache/cachekey/conversation.go | 5 ++++ pkg/common/storage/cache/conversation.go | 3 ++- .../storage/cache/redis/conversation.go | 20 +++++++++++++- pkg/common/storage/controller/conversation.go | 27 ++++++++++++++++--- pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 7 +++++ 9 files changed, 69 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 5fab509914..69fed1cdc9 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.25 + github.com/openimsdk/protocol v0.0.72-alpha.27 github.com/openimsdk/tools v0.0.50-alpha.12 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 4c3b4aa21c..3a3637be73 100644 --- a/go.sum +++ b/go.sum @@ -322,8 +322,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.25 h1:W8E6gnwt5V6anr/8lYOf5v/Lcsggf7gIAzJbw7YU6So= -github.com/openimsdk/protocol v0.0.72-alpha.25/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.27 h1:S6n3uj7YhKjo2NCHHSnUijaJ9YYiy8TTMquc4EJOm50= +github.com/openimsdk/protocol v0.0.72-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo= github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 66e85cb8f7..3098d57915 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -710,3 +710,11 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil } + +func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { + conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil +} diff --git a/pkg/common/storage/cache/cachekey/conversation.go b/pkg/common/storage/cache/cachekey/conversation.go index d19fcc5767..acc9d15cfd 100644 --- a/pkg/common/storage/cache/cachekey/conversation.go +++ b/pkg/common/storage/cache/cachekey/conversation.go @@ -17,6 +17,7 @@ package cachekey const ( ConversationKey = "CONVERSATION:" ConversationIDsKey = "CONVERSATION_IDS:" + NotNotifyConversationIDsKey = "NOT_NOTIFY_CONVERSATION_IDS:" ConversationIDsHashKey = "CONVERSATION_IDS_HASH:" ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" RecvMsgOptKey = "RECV_MSG_OPT:" @@ -34,6 +35,10 @@ func GetConversationIDsKey(ownerUserID string) string { return ConversationIDsKey + ownerUserID } +func GetNotNotifyConversationIDsKey(ownerUserID string) string { + return NotNotifyConversationIDsKey + ownerUserID +} + func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID } diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index bc17614836..8970db29c8 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -25,6 +25,7 @@ type ConversationCache interface { CloneConversationCache() ConversationCache // get user's conversationIDs from msgCache GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) + GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) DelConversationIDs(userIDs ...string) ConversationCache GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) @@ -54,7 +55,7 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache - + DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 95e680afb4..7c025d50a0 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -71,6 +71,10 @@ func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) strin return cachekey.GetConversationIDsKey(ownerUserID) } +func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID string) string { + return cachekey.GetNotNotifyConversationIDsKey(ownerUserID) +} + func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID) } @@ -100,11 +104,17 @@ func (c *ConversationRedisCache) getConversationUserMaxVersionKey(ownerUserID st } func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { - return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { + return getCache(ctx, c.rcClient, c.getNotNotifyConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) }) } +func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { + return getCache(ctx, c.rcClient, c.getConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) { + return c.conversationDB.FindUserIDAllNotNotifyConversationID(ctx, userID) + }) +} + func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache { keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { @@ -242,6 +252,14 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers return cache } +func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs ...string) cache.ConversationCache { + cache := c.CloneConversationCache() + for _, userID := range userIDs { + cache.AddKeys(c.getNotNotifyConversationIDsKey(userID)) + } + return cache +} + func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache { cache := c.CloneConversationCache() for _, userID := range userIDs { diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index c804d1cc51..46ac9a1f9c 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -69,6 +69,8 @@ type ConversationDatabase interface { FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error) + // GetNotNotifyConversationIDs gets not notify conversationIDs by userID + GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -108,6 +110,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, } if _, ok := fieldMap["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) + cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } @@ -144,6 +147,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...) if _, ok := args["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) + cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } return cache.ChainExecDel(ctx) } @@ -152,14 +156,22 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } - var userIDs []string + var ( + userIDs []string + notNotifyUserIDs []string + ) + cache := c.cache.CloneConversationCache() for _, conversation := range conversations { cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) + if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { + notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) + } } - return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx) + return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...). + DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).ChainExecDel(ctx) } func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error { @@ -212,7 +224,8 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() - cache = cache.DelConversationVersionUserIDs(ownerUserID) + cache = cache.DelConversationVersionUserIDs(ownerUserID).DelConversationNotNotifyMessageUserIDs(ownerUserID) + groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" })) @@ -353,3 +366,11 @@ func (c *conversationDatabase) GetOwnerConversation(ctx context.Context, ownerUs } return int64(len(conversationIDs)), conversations, nil } + +func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { + conversationIDs, err := c.cache.GetUserNotNotifyConversationIDs(ctx, userID) + if err != nil { + return nil, err + } + return conversationIDs, nil +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 85f3dd668a..2c20f73bcd 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -27,6 +27,7 @@ type Conversation interface { Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) + FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 3d505f1d34..4c936aedce 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -124,6 +124,13 @@ func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userI return mongoutil.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) } +func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) { + return mongoutil.Find[string](ctx, c.coll, bson.M{ + "owner_user_id": userID, + "recv_msg_opt": constant.ReceiveNotNotifyMessage, + }, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) +} + func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) { return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID}) } From bdcef3e5eaa397e775e1a0ee134d7ce30d48e8ad Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Wed, 25 Sep 2024 12:19:18 +0800 Subject: [PATCH 2/4] feat: api --- internal/api/conversation.go | 4 ++++ internal/api/router.go | 1 + 2 files changed, 5 insertions(+) diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 360313ea87..8e1a46d478 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -62,3 +62,7 @@ func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) { func (o *ConversationApi) GetOwnerConversation(c *gin.Context) { a2r.Call(conversation.ConversationClient.GetOwnerConversation, o.Client, c) } + +func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetNotNotifyConversationIDs, o.Client, c) +} diff --git a/internal/api/router.go b/internal/api/router.go index 72d36af112..6b0278864f 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -232,6 +232,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs) conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation) conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) + conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) } statisticsGroup := r.Group("/statistics") From 6ab8873cdc6cb86c7b3c8eee3ad96199641788bf Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Wed, 25 Sep 2024 12:22:26 +0800 Subject: [PATCH 3/4] fix: database --- pkg/common/storage/cache/redis/conversation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 7c025d50a0..40df1e57a1 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -104,13 +104,13 @@ func (c *ConversationRedisCache) getConversationUserMaxVersionKey(ownerUserID st } func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { - return getCache(ctx, c.rcClient, c.getNotNotifyConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { + return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) }) } func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { - return getCache(ctx, c.rcClient, c.getConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) { + return getCache(ctx, c.rcClient, c.getNotNotifyConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) { return c.conversationDB.FindUserIDAllNotNotifyConversationID(ctx, userID) }) } From 0dccbea6086b53279b3cdc1236aa83a49a77c97a Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Wed, 25 Sep 2024 14:16:39 +0800 Subject: [PATCH 4/4] fix: change name --- internal/rpc/conversation/{conversaion.go => conversation.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename internal/rpc/conversation/{conversaion.go => conversation.go} (100%) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversation.go similarity index 100% rename from internal/rpc/conversation/conversaion.go rename to internal/rpc/conversation/conversation.go