diff --git a/main.go b/main.go index a61027f..05416de 100644 --- a/main.go +++ b/main.go @@ -121,8 +121,20 @@ func main() { return float64(count) }, ), + prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "awk_bot_telegram_subscribers_total", + Help: "Awakari Telegram Bot: total subscribers count", + }, + func() (v float64) { + count, err := chatStor.CountUsers(context.TODO()) + if err != nil { + log.Error(fmt.Sprintf("Chat storage CountUsers(): err=%s", err)) + } + return float64(count) + }, + ), ) - // init events format, see https://core.telegram.org/bots/api#html-style for details htmlPolicy := bluemonday.NewPolicy() htmlPolicy.AllowStandardURLs() diff --git a/service/chats/storage.go b/service/chats/storage.go index 00ecad8..4ba561e 100644 --- a/service/chats/storage.go +++ b/service/chats/storage.go @@ -13,4 +13,5 @@ type Storage interface { Delete(ctx context.Context, id int64) (count int64, err error) GetBatch(ctx context.Context, idRem, idDiv uint32, limit uint32, cursor string) (page []Chat, err error) Count(ctx context.Context) (count int64, err error) + CountUsers(ctx context.Context) (count int64, err error) } diff --git a/service/chats/storage_mongo.go b/service/chats/storage_mongo.go index 7a75ba3..ba1d605 100644 --- a/service/chats/storage_mongo.go +++ b/service/chats/storage_mongo.go @@ -31,6 +31,8 @@ const attrSubId = "subId" const attrGroupId = "groupId" const attrUserId = "userId" +const keyCountUsers = "countUsers" + var optsSrvApi = options.ServerAPI(options.ServerAPIVersion1) var indices = []mongo.IndexModel{ { @@ -99,6 +101,20 @@ var projGetBatch = bson.D{ }, } +var pipelineCountUsers = mongo.Pipeline{ + bson.D{{ + "$group", + bson.D{{ + "_id", + "$" + attrUserId, + }}, + }}, + bson.D{{ + "$count", + keyCountUsers, + }}, +} + func NewStorage(ctx context.Context, cfgDb config.ChatsDbConfig) (s Storage, err error) { clientOpts := options. Client(). @@ -235,6 +251,27 @@ func (sm storageMongo) Count(ctx context.Context) (count int64, err error) { return sm.coll.EstimatedDocumentCount(ctx) } +func (s storageMongo) CountUsers(ctx context.Context) (count int64, err error) { + var cursor *mongo.Cursor + cursor, err = s.coll.Aggregate(ctx, pipelineCountUsers) + var result bson.M + if err == nil && cursor.Next(ctx) { + err = cursor.Decode(&result) + } + if err == nil { + rawCount := result[keyCountUsers] + switch rawCount.(type) { + case int32: + count = int64(rawCount.(int32)) + case int64: + count = rawCount.(int64) + default: + err = fmt.Errorf("%w: failed to convert result to int: %+v", ErrInternal, rawCount) + } + } + return +} + func decodeMongoError(src error) (dst error) { switch { case src == nil: diff --git a/service/chats/storage_mongo_test.go b/service/chats/storage_mongo_test.go index f0ba39b..d320d76 100644 --- a/service/chats/storage_mongo_test.go +++ b/service/chats/storage_mongo_test.go @@ -273,3 +273,62 @@ func TestStorageMongo_Count(t *testing.T) { }) } } + +func TestStorageMongo_CountUsers(t *testing.T) { + // + collName := fmt.Sprintf("chats-test-%d", time.Now().UnixMicro()) + dbCfg := config.ChatsDbConfig{ + Uri: dbUri, + Name: "bot-telegram", + } + dbCfg.Table.Name = collName + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + s, err := NewStorage(ctx, dbCfg) + require.NotNil(t, s) + require.Nil(t, err) + sm := s.(storageMongo) + defer clear(ctx, t, sm) + // + cases := map[string]struct { + stored []Chat + out int64 + err error + }{ + "ok": { + stored: []Chat{ + { + Id: -1001875128866, + SubId: "sub1", + GroupId: "group1", + UserId: "user1", + }, + { + Id: -1001778619305, + SubId: "sub2", + GroupId: "group1", + UserId: "user2", + }, + { + Id: -1001733378662, + SubId: "sub3", + GroupId: "group2", + UserId: "user2", + }, + }, + out: 2, + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + for _, chat := range c.stored { + err = s.LinkSubscription(ctx, chat) + require.Nil(t, err) + } + count, err := s.CountUsers(ctx) + assert.Equal(t, c.out, count) + assert.Nil(t, err) + }) + } +}