Skip to content

Commit

Permalink
feat: added sql broker
Browse files Browse the repository at this point in the history
  • Loading branch information
chintansakhiya committed Jan 30, 2024
1 parent 59abc65 commit 9d6ff00
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 55 deletions.
17 changes: 14 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,20 @@ SQLITE_FILEPATH=database/golang-api.db
JWT_SECRET=ThisIsKey

# Message queue config
# MQ_TRACK=true
# MQ_DEBUG=true
# DEAD_LETTER_QUEUE=dead_queue
MQ_TRACK=true
MQ_DEBUG=true
DEAD_LETTER_QUEUE=dead_queue
HANDLER_NAME=handler

# sql
MQ_DIALECT=sql
MQ_DB_DIALECT=postgres
MQ_DB_HOST=localhost
MQ_DB_PORT=5432
MQ_DB_USERNAME=golang-api
MQ_DB_PASSWORD=golang-api
MQ_DB_NAME=golang-api
MQ_DB_QUERYSTRING=sslmode=disable

# Rabbitmq
# MQ_DIALECT=amqp
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Migrations are like **version control for your database**, allowing your team to


- #### Multiple Message Queue Broker Support
- We are supporting 4 types of message queue broker at this time `rabbitmq`, `redis`, `googleCloud` & `kafka`
- We are supporting 5 types of message queue broker at this time `rabbitmq`, `redis`, `googleCloud`,`sql(postges,mysql)` & `kafka`
- It allows us to switch to message queue broker without changing too much stuff.
- Watermill package allows us to do that.
- We have environment variable `MQ_DIALECT` where you can set to message queue broker type.
Expand Down Expand Up @@ -156,7 +156,7 @@ Migrations are like **version control for your database**, allowing your team to
```
- #### Dead Letter Queue
- The `dead letter queue`, also known as the `poison queue` in watermill, is a designated destination for messages that have failed to undergo processing by a consumer.
- The name of this queue is specified in the `DEAD_LETTER_QUEUE` environment variable.
- The name of this queue is specified in the `DEAD_LETTER_QUEUE` environment variable, we are storing failed job into database.
- Command to run dead letter queue
```go
go run app.go dead-letter-queue
Expand Down
2 changes: 1 addition & 1 deletion cli/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func GetAPICommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command {
return err
}

pub, err := watermill.InitPubliser(cfg)
pub, err := watermill.InitPublisher(cfg,false)
if err != nil {
return err
}
Expand Down
32 changes: 27 additions & 5 deletions cli/dead_letter_queue.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package cli

import (
"fmt"
"encoding/json"
"log"

"go.uber.org/zap"

Expand All @@ -11,6 +12,13 @@ import (
"github.com/spf13/cobra"
)

type DeadLetterQ struct {
Handler string `json:"handler_poisoned"`
Reason string `json:"reason_poisoned"`
Subscriber string `json:"subscriber_poisoned"`
Topic string `json:"topic_poisoned"`
}

// GetAPICommandDef runs app
func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command {

Expand All @@ -21,21 +29,35 @@ func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Comm
RunE: func(cmd *cobra.Command, args []string) error {

// Init worker
subscriber, err := watermill.InitSubscriber(cfg)
subscriber, err := watermill.InitSubscriber(cfg, true)
if err != nil {
return err
}

// run worker with topic(queue name) and process function
err = subscriber.Run(cfg.MQ.DeadQueue, HandleFailJob)
err = subscriber.Run(cfg.MQ.DeadLetterQ,"dead_letter_queue" ,HandleFailJob)
return err
},
}
return workerCommand
}

func HandleFailJob(msg *message.Message) error {
fmt.Println("failed job:-", msg.UUID)
// process here
// get fail job details from metadata
var result DeadLetterQ
metadata := make(map[string]string)
for k, v := range msg.Metadata {
metadata[k] = v
}
jsonString, err := json.Marshal(metadata)
if err != nil {
return err
}
err = json.Unmarshal(jsonString, &result)
if err != nil {
return err
}

log.Printf("Failed job handler: %s, reason: %s, subscriber: %s, topic: %s", result.Handler, result.Reason, result.Subscriber, result.Topic)
return nil
}
6 changes: 3 additions & 3 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command
Short: "To start worker",
Long: `To start worker`,
RunE: func(cmd *cobra.Command, args []string) error {
// Get details name from flag
// Get details from flag
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
Expand All @@ -35,7 +35,7 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command
}

// Init subscriber
subscriber, err := watermill.InitSubscriber(cfg)
subscriber, err := watermill.InitSubscriber(cfg, false)
if err != nil {
return err
}
Expand All @@ -47,7 +47,7 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command
}

// run worker with topic(queue name) and process function
err = router.Run(topic, workers.Process)
err = router.Run(topic, cfg.MQ.HandlerName, workers.Process)
return err

},
Expand Down
6 changes: 6 additions & 0 deletions cli/workers/worker_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workers
import (
"bytes"
"encoding/gob"
"encoding/json"

"github.com/ThreeDotsLabs/watermill/message"
)
Expand Down Expand Up @@ -36,6 +37,11 @@ func Process(msg *message.Message) error {
if err != nil {
return err
}
// Store the JSON payload in the msg so that it can be persisted in the database in case the job fails.
msg.Payload, err = json.Marshal(result)
if err != nil {
return err
}
if err := result.Handle(); err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion config/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ type MQConfig struct {
Dialect string `envconfig:"MQ_DIALECT"`
Debug bool `envconfig:"MQ_DEBUG"`
Track bool `envconfig:"MQ_TRACK"`
DeadQueue string `envconfig:"DEAD_LETTER_QUEUE"`
DeadLetterQ string `envconfig:"DEAD_LETTER_QUEUE"`
HandlerName string `envconfig:"HANDLER_NAME"`
Redis RedisConfig
Amqp AmqpConfig
Kafka KafkaConfig
GoogleCloud GoogleCloud
Sql Sql
}
type RedisConfig struct {
RedisUrl string `envconfig:"REDIS_URI"`
Expand All @@ -29,3 +31,13 @@ type GoogleCloud struct {
ProjectID string `envconfig:"PROJECT_ID"`
SubscriptionId string `envconfig:"SUBSCRIPTION_ID"`
}

type Sql struct {
Dialect string `envconfig:"MQ_DB_DIALECT"`
Host string `envconfig:"MQ_DB_HOST"`
Port int `envconfig:"MQ_DB_PORT"`
Username string `envconfig:"MQ_DB_USERNAME"`
Password string `envconfig:"MQ_DB_PASSWORD"`
Db string `envconfig:"MQ_DB_NAME"`
QueryString string `envconfig:"DB_QUERYSTRING"`
}
4 changes: 2 additions & 2 deletions controllers/api/v1/user_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type UserController struct {
userService *services.UserService
logger *zap.Logger
event *events.Events
pub *watermill.WatermillPubliser
pub *watermill.WatermillPublisher
}

// NewUserController returns a user
func NewUserController(goqu *goqu.Database, logger *zap.Logger, event *events.Events, pub *watermill.WatermillPubliser) (*UserController, error) {
func NewUserController(goqu *goqu.Database, logger *zap.Logger, event *events.Events, pub *watermill.WatermillPublisher) (*UserController, error) {
userModel, err := models.InitUserModel(goqu)
if err != nil {
return nil, err
Expand Down
115 changes: 115 additions & 0 deletions database/watermill_mysql_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package database

import (
stdSQL "database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/Improwised/golang-api/config"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)

// custom schema for mysql database
// source: https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/schema_adapter_mysql.go
type MySQLSchema struct {
GenerateMessagesTableName func(topic string) string
SubscribeBatchSize int
}
func (s MySQLSchema) SchemaInitializingQueries(topic string) []string {
createMessagesTable := strings.Join([]string{
"CREATE TABLE IF NOT EXISTS " + s.MessagesTable(topic) + " (",
"`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
"`uuid` VARCHAR(36) NOT NULL,",
"`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
"`payload` BINARY(255) NULL,",
"`metadata` JSON DEFAULT NULL",
");",
}, "\n")

return []string{createMessagesTable}
}

func (s MySQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) {
insertQuery := fmt.Sprintf(
`INSERT INTO %s (uuid, payload, metadata) VALUES %s`,
s.MessagesTable(topic),
strings.TrimRight(strings.Repeat(`(?,?,?),`, len(msgs)), ","),
)

args, err := defaultInsertArgs(msgs)
if err != nil {
return "", nil, err
}

return insertQuery, args, nil
}

func (s MySQLSchema) batchSize() int {
if s.SubscribeBatchSize == 0 {
return 100
}

return s.SubscribeBatchSize
}

func (s MySQLSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) (string, []interface{}) {
nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup)
selectQuery := `
SELECT offset, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + `
WHERE
offset > (` + nextOffsetQuery + `)
ORDER BY
offset ASC
LIMIT ` + fmt.Sprintf("%d", s.batchSize())

return selectQuery, nextOffsetArgs
}

func (s MySQLSchema) UnmarshalMessage(row sql.Scanner) (sql.Row, error) {
r := sql.Row{}
err := row.Scan(&r.Offset, &r.UUID, &r.Payload, &r.Metadata)
if err != nil {
return sql.Row{}, errors.Wrap(err, "could not scan message row")
}

msg := message.NewMessage(string(r.UUID), r.Payload)

if r.Metadata != nil {
err = json.Unmarshal(r.Metadata, &msg.Metadata)
if err != nil {
return sql.Row{}, errors.Wrap(err, "could not unmarshal metadata as JSON")
}
}

r.Msg = msg

return r, nil
}

func (s MySQLSchema) MessagesTable(topic string) string {
if s.GenerateMessagesTableName != nil {
return s.GenerateMessagesTableName(topic)
}
return fmt.Sprintf("`watermill_%s`", topic)
}

func (s MySQLSchema) SubscribeIsolationLevel() stdSQL.IsolationLevel {
// MySQL requires serializable isolation level for not losing messages.
return stdSQL.LevelSerializable
}

func MysqlDBConnection(cfg config.Sql) (*stdSQL.DB, error) {
dbURL := cfg.Username + ":" + cfg.Password + "@tcp(" + cfg.Host + ":" + strconv.Itoa(cfg.Port) + ")/" + cfg.Db

db, err := stdSQL.Open("mysql", dbURL)
if err != nil {
return nil, err
}

return db, err
}
Loading

0 comments on commit 9d6ff00

Please sign in to comment.