Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: get not notify conversationIDs #2658

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.26
github.com/openimsdk/protocol v0.0.72-alpha.27
github.com/openimsdk/tools v0.0.50-alpha.12
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.26 h1:vgRtw8uuCCD6FQqTCVN5i32I9uh8SGjz8AxHGDWjtKU=
github.com/openimsdk/protocol v0.0.72-alpha.26/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.27 h1:S6n3uj7YhKjo2NCHHSnUijaJ9YYiy8TTMquc4EJOm50=
github.com/openimsdk/protocol v0.0.72-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo=
github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
Expand Down
4 changes: 4 additions & 0 deletions internal/api/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) {
func (o *ConversationApi) GetOwnerConversation(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetOwnerConversation, o.Client, c)
}

func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetNotNotifyConversationIDs, o.Client, c)
}
1 change: 1 addition & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs)
conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation)
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
}

statisticsGroup := r.Group("/statistics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,3 +710,11 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex

return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
}

func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil
}
5 changes: 5 additions & 0 deletions pkg/common/storage/cache/cachekey/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cachekey
const (
ConversationKey = "CONVERSATION:"
ConversationIDsKey = "CONVERSATION_IDS:"
NotNotifyConversationIDsKey = "NOT_NOTIFY_CONVERSATION_IDS:"
ConversationIDsHashKey = "CONVERSATION_IDS_HASH:"
ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
RecvMsgOptKey = "RECV_MSG_OPT:"
Expand All @@ -34,6 +35,10 @@ func GetConversationIDsKey(ownerUserID string) string {
return ConversationIDsKey + ownerUserID
}

func GetNotNotifyConversationIDsKey(ownerUserID string) string {
return NotNotifyConversationIDsKey + ownerUserID
}

func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/storage/cache/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ConversationCache interface {
CloneConversationCache() ConversationCache
// get user's conversationIDs from msgCache
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
DelConversationIDs(userIDs ...string) ConversationCache

GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
Expand Down Expand Up @@ -54,7 +55,7 @@ type ConversationCache interface {

GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache

DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache

FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
Expand Down
18 changes: 18 additions & 0 deletions pkg/common/storage/cache/redis/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) strin
return cachekey.GetConversationIDsKey(ownerUserID)
}

func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID string) string {
return cachekey.GetNotNotifyConversationIDsKey(ownerUserID)
}

func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID)
}
Expand Down Expand Up @@ -105,6 +109,12 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own
})
}

func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getNotNotifyConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllNotNotifyConversationID(ctx, userID)
})
}

func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
Expand Down Expand Up @@ -242,6 +252,14 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers
return cache
}

func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getNotNotifyConversationIDsKey(userID))
}
return cache
}

func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
Expand Down
27 changes: 24 additions & 3 deletions pkg/common/storage/controller/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type ConversationDatabase interface {
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error)
FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error)
GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error)
// GetNotNotifyConversationIDs gets not notify conversationIDs by userID
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
}

func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
Expand Down Expand Up @@ -108,6 +110,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
}
if _, ok := fieldMap["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
Expand Down Expand Up @@ -144,6 +147,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...)
if _, ok := args["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
return cache.ChainExecDel(ctx)
}
Expand All @@ -152,14 +156,22 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
if err := c.conversationDB.Create(ctx, conversations); err != nil {
return err
}
var userIDs []string
var (
userIDs []string
notNotifyUserIDs []string
)

cache := c.cache.CloneConversationCache()
for _, conversation := range conversations {
cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID)
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
userIDs = append(userIDs, conversation.OwnerUserID)
if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID)
}
}
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx)
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).ChainExecDel(ctx)
}

func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
Expand Down Expand Up @@ -212,7 +224,8 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID)
cache = cache.DelConversationVersionUserIDs(ownerUserID).DelConversationNotNotifyMessageUserIDs(ownerUserID)

groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
}))
Expand Down Expand Up @@ -353,3 +366,11 @@ func (c *conversationDatabase) GetOwnerConversation(ctx context.Context, ownerUs
}
return int64(len(conversationIDs)), conversations, nil
}

func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) {
conversationIDs, err := c.cache.GetUserNotNotifyConversationIDs(ctx, userID)
if err != nil {
return nil, err
}
return conversationIDs, nil
}
1 change: 1 addition & 0 deletions pkg/common/storage/database/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Conversation interface {
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error)
Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error)
FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error)
FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error)
Expand Down
7 changes: 7 additions & 0 deletions pkg/common/storage/database/mgo/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userI
return mongoutil.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
}

func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) {
return mongoutil.Find[string](ctx, c.coll, bson.M{
"owner_user_id": userID,
"recv_msg_opt": constant.ReceiveNotNotifyMessage,
}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
}

func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) {
return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID})
}
Expand Down
Loading