Skip to content

Commit

Permalink
feat: use the queue for interests created events
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Aug 2, 2024
1 parent d4f9e6a commit 8a5c69a
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 16 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ tg-bot-token.txt
*.pem
*.key
tg-payment-*.txt
.idea
secret-bot-telegram-tls-server.yaml
17 changes: 6 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ type Config struct {
Uri string `envconfig:"API_ADMIN_URI" default:"api:56789" required:"true"`
}
GroupId string `envconfig:"API_GROUP_ID" default:"default" required:"true"`
Messages struct {
Uri string `envconfig:"API_MESSAGES_URI" default:"messages:50051" required:"true"`
}
Messages MessagesConfig
Telegram struct {
Bot struct {
Port uint16 `envconfig:"API_TELEGRAM_BOT_PORT" default:"50051" required:"true"`
Expand Down Expand Up @@ -70,14 +68,6 @@ type PriceConfig struct {
}
}

type FeedsConfig struct {
Uri string `envconfig:"API_SOURCE_FEEDS_URI" default:"source-feeds:50051" required:"true"`
}

type SitesConfig struct {
Uri string `envconfig:"API_SOURCE_SITES_URI" default:"source-sites:50051" required:"true"`
}

type ReaderConfig struct {
Uri string `envconfig:"API_READER_URI" default:"http://reader:8080/v1" required:"true"`
CallBack struct {
Expand All @@ -98,6 +88,11 @@ type QueueConfig struct {
}
}

type MessagesConfig struct {
Type string `envconfig:"API_MESSAGES_TYPE" default:"com_awakari_bot_telegram_v1" required:"true"`
Uri string `envconfig:"API_MESSAGES_URI" default:"messages:50051" required:"true"`
}

func NewConfigFromEnv() (cfg Config, err error) {
err = envconfig.Process("", &cfg)
return
Expand Down
2 changes: 2 additions & 0 deletions helm/bot-telegram/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ spec:
value: "{{ .Values.api.uri }}"
- name: API_ADMIN_URI
value: "{{ .Values.api.admin.uri }}"
- name: API_MESSAGES_TYPE
value: "{{ .Values.api.messages.type }}"
- name: API_MESSAGES_URI
value: "{{ .Values.api.messages.uri }}"
- name: API_READER_URI
Expand Down
1 change: 1 addition & 0 deletions helm/bot-telegram/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ api:
admin:
uri: "api:56789"
messages:
type: "com_awakari_bot_telegram_v1"
uri: "messages:50051"
reader:
uri: "http://reader:8080/v1"
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func main() {
Writers: map[string]model.Writer[*pb.CloudEvent]{},
Channels: map[string]time.Time{},
ChansLock: &sync.Mutex{},
CfgMsgs: cfg.Api.Messages,
}
defer chanPostHandler.Close()

Expand All @@ -190,7 +191,7 @@ func main() {
replyHandlers := map[string]service.ArgHandlerFunc{
subscriptions.ReqDescribe: subscriptions.DescriptionReplyHandlerFunc(clientAwk, groupId),
subscriptions.ReqSubCreate: subscriptions.CreateBasicReplyHandlerFunc(clientAwk, groupId, svcReader, urlCallbackBase),
messages.ReqMsgPub: messages.PublishBasicReplyHandlerFunc(clientAwk, groupId, svcMsgs, cfg.Payment),
messages.ReqMsgPub: messages.PublishBasicReplyHandlerFunc(clientAwk, groupId, svcMsgs, cfg),
subscriptions.ReqSubExtend: subExtHandler.HandleExtensionReply,
usage.ReqLimitExtend: limitsHandler.HandleExtension,
usage.ReqLimitIncrease: limitsHandler.HandleIncrease,
Expand Down
4 changes: 3 additions & 1 deletion service/messages/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/awakari/bot-telegram/config"
"github.com/awakari/bot-telegram/service"
"github.com/awakari/client-sdk-go/api"
"github.com/awakari/client-sdk-go/api/grpc/limits"
Expand Down Expand Up @@ -40,6 +41,7 @@ type ChanPostHandler struct {
Writers map[string]model.Writer[*pb.CloudEvent]
Channels map[string]time.Time
ChansLock *sync.Mutex
CfgMsgs config.MessagesConfig
}

const tagNoBot = "#nobot"
Expand Down Expand Up @@ -71,7 +73,7 @@ func (cp ChanPostHandler) Publish(tgCtx telebot.Context) (err error) {
Id: ksuid.New().String(),
Source: fmt.Sprintf("https://t.me/%s", chanUserName),
SpecVersion: attrValSpecVersion,
Type: "com.awakari.bot-telegram.v1",
Type: cp.CfgMsgs.Type,
}
err = toCloudEvent(tgMsg, tgCtx.Text(), &evt)
if err == nil {
Expand Down
6 changes: 3 additions & 3 deletions service/messages/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func PublishBasicReplyHandlerFunc(
clientAwk api.Client,
groupId string,
svcMsgs messages.Service,
cfgPayment config.PaymentConfig,
cfg config.Config,
) service.ArgHandlerFunc {
return func(tgCtx telebot.Context, args ...string) (err error) {
groupIdCtx := metadata.AppendToOutgoingContext(context.TODO(), service.KeyGroupId, groupId)
Expand All @@ -79,14 +79,14 @@ func PublishBasicReplyHandlerFunc(
Id: ksuid.New().String(),
Source: "https://t.me/" + tgCtx.Chat().Username,
SpecVersion: attrValSpecVersion,
Type: "com.awakari.bot-telegram.v1",
Type: cfg.Api.Messages.Type,
}
if err == nil {
defer w.Close()
err = toCloudEvent(tgCtx.Message(), args[1], &evt)
}
if err == nil {
err = publish(tgCtx, w, &evt, svcMsgs, cfgPayment)
err = publish(tgCtx, w, &evt, svcMsgs, cfg.Payment)
}
return
}
Expand Down

0 comments on commit 8a5c69a

Please sign in to comment.