From 9d6ff002ad5aa08565520a2b39dcedad0ab30676 Mon Sep 17 00:00:00 2001 From: chintansakhiya Date: Tue, 30 Jan 2024 19:28:18 +0530 Subject: [PATCH] feat: added sql broker --- .env.example | 17 ++- README.md | 4 +- cli/api.go | 2 +- cli/dead_letter_queue.go | 32 ++++- cli/worker.go | 6 +- cli/workers/worker_handler.go | 6 + config/mq.go | 14 ++- controllers/api/v1/user_controller.go | 4 +- database/watermill_mysql_schema.go | 115 ++++++++++++++++++ database/watermill_postgres_schema.go | 167 ++++++++++++++++++++++++++ go.mod | 8 +- go.sum | 45 ++++--- pkg/watermill/publisher.go | 90 +++++++++++--- pkg/watermill/subscriber.go | 71 ++++++++++- routes/main.go | 4 +- 15 files changed, 530 insertions(+), 55 deletions(-) create mode 100644 database/watermill_mysql_schema.go create mode 100644 database/watermill_postgres_schema.go diff --git a/.env.example b/.env.example index d972cc4..b33489b 100644 --- a/.env.example +++ b/.env.example @@ -24,9 +24,20 @@ SQLITE_FILEPATH=database/golang-api.db JWT_SECRET=ThisIsKey # Message queue config -# MQ_TRACK=true -# MQ_DEBUG=true -# DEAD_LETTER_QUEUE=dead_queue +MQ_TRACK=true +MQ_DEBUG=true +DEAD_LETTER_QUEUE=dead_queue +HANDLER_NAME=handler + +# sql +MQ_DIALECT=sql +MQ_DB_DIALECT=postgres +MQ_DB_HOST=localhost +MQ_DB_PORT=5432 +MQ_DB_USERNAME=golang-api +MQ_DB_PASSWORD=golang-api +MQ_DB_NAME=golang-api +MQ_DB_QUERYSTRING=sslmode=disable # Rabbitmq # MQ_DIALECT=amqp diff --git a/README.md b/README.md index e0626d3..f1fe7c7 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,7 @@ Migrations are like **version control for your database**, allowing your team to - #### Multiple Message Queue Broker Support - - We are supporting 4 types of message queue broker at this time `rabbitmq`, `redis`, `googleCloud` & `kafka` + - We are supporting 5 types of message queue broker at this time `rabbitmq`, `redis`, `googleCloud`,`sql(postges,mysql)` & `kafka` - It allows us to switch to message queue broker without changing too much stuff. - Watermill package allows us to do that. - We have environment variable `MQ_DIALECT` where you can set to message queue broker type. @@ -156,7 +156,7 @@ Migrations are like **version control for your database**, allowing your team to ``` - #### Dead Letter Queue - The `dead letter queue`, also known as the `poison queue` in watermill, is a designated destination for messages that have failed to undergo processing by a consumer. - - The name of this queue is specified in the `DEAD_LETTER_QUEUE` environment variable. + - The name of this queue is specified in the `DEAD_LETTER_QUEUE` environment variable, we are storing failed job into database. - Command to run dead letter queue ```go go run app.go dead-letter-queue diff --git a/cli/api.go b/cli/api.go index 2216b3c..d8bfacf 100644 --- a/cli/api.go +++ b/cli/api.go @@ -44,7 +44,7 @@ func GetAPICommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command { return err } - pub, err := watermill.InitPubliser(cfg) + pub, err := watermill.InitPublisher(cfg,false) if err != nil { return err } diff --git a/cli/dead_letter_queue.go b/cli/dead_letter_queue.go index 10baa12..3c163a9 100644 --- a/cli/dead_letter_queue.go +++ b/cli/dead_letter_queue.go @@ -1,7 +1,8 @@ package cli import ( - "fmt" + "encoding/json" + "log" "go.uber.org/zap" @@ -11,6 +12,13 @@ import ( "github.com/spf13/cobra" ) +type DeadLetterQ struct { + Handler string `json:"handler_poisoned"` + Reason string `json:"reason_poisoned"` + Subscriber string `json:"subscriber_poisoned"` + Topic string `json:"topic_poisoned"` +} + // GetAPICommandDef runs app func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command { @@ -21,13 +29,13 @@ func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Comm RunE: func(cmd *cobra.Command, args []string) error { // Init worker - subscriber, err := watermill.InitSubscriber(cfg) + subscriber, err := watermill.InitSubscriber(cfg, true) if err != nil { return err } // run worker with topic(queue name) and process function - err = subscriber.Run(cfg.MQ.DeadQueue, HandleFailJob) + err = subscriber.Run(cfg.MQ.DeadLetterQ,"dead_letter_queue" ,HandleFailJob) return err }, } @@ -35,7 +43,21 @@ func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Comm } func HandleFailJob(msg *message.Message) error { - fmt.Println("failed job:-", msg.UUID) - // process here + // get fail job details from metadata + var result DeadLetterQ + metadata := make(map[string]string) + for k, v := range msg.Metadata { + metadata[k] = v + } + jsonString, err := json.Marshal(metadata) + if err != nil { + return err + } + err = json.Unmarshal(jsonString, &result) + if err != nil { + return err + } + + log.Printf("Failed job handler: %s, reason: %s, subscriber: %s, topic: %s", result.Handler, result.Reason, result.Subscriber, result.Topic) return nil } diff --git a/cli/worker.go b/cli/worker.go index 6bed939..00f41ae 100644 --- a/cli/worker.go +++ b/cli/worker.go @@ -18,7 +18,7 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command Short: "To start worker", Long: `To start worker`, RunE: func(cmd *cobra.Command, args []string) error { - // Get details name from flag + // Get details from flag topic, err := cmd.Flags().GetString("topic") if err != nil { return err @@ -35,7 +35,7 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command } // Init subscriber - subscriber, err := watermill.InitSubscriber(cfg) + subscriber, err := watermill.InitSubscriber(cfg, false) if err != nil { return err } @@ -47,7 +47,7 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command } // run worker with topic(queue name) and process function - err = router.Run(topic, workers.Process) + err = router.Run(topic, cfg.MQ.HandlerName, workers.Process) return err }, diff --git a/cli/workers/worker_handler.go b/cli/workers/worker_handler.go index c3a4e81..ab430aa 100644 --- a/cli/workers/worker_handler.go +++ b/cli/workers/worker_handler.go @@ -3,6 +3,7 @@ package workers import ( "bytes" "encoding/gob" + "encoding/json" "github.com/ThreeDotsLabs/watermill/message" ) @@ -36,6 +37,11 @@ func Process(msg *message.Message) error { if err != nil { return err } + // Store the JSON payload in the msg so that it can be persisted in the database in case the job fails. + msg.Payload, err = json.Marshal(result) + if err != nil { + return err + } if err := result.Handle(); err != nil { return err } diff --git a/config/mq.go b/config/mq.go index d9b8155..6d55fb7 100644 --- a/config/mq.go +++ b/config/mq.go @@ -4,11 +4,13 @@ type MQConfig struct { Dialect string `envconfig:"MQ_DIALECT"` Debug bool `envconfig:"MQ_DEBUG"` Track bool `envconfig:"MQ_TRACK"` - DeadQueue string `envconfig:"DEAD_LETTER_QUEUE"` + DeadLetterQ string `envconfig:"DEAD_LETTER_QUEUE"` + HandlerName string `envconfig:"HANDLER_NAME"` Redis RedisConfig Amqp AmqpConfig Kafka KafkaConfig GoogleCloud GoogleCloud + Sql Sql } type RedisConfig struct { RedisUrl string `envconfig:"REDIS_URI"` @@ -29,3 +31,13 @@ type GoogleCloud struct { ProjectID string `envconfig:"PROJECT_ID"` SubscriptionId string `envconfig:"SUBSCRIPTION_ID"` } + +type Sql struct { + Dialect string `envconfig:"MQ_DB_DIALECT"` + Host string `envconfig:"MQ_DB_HOST"` + Port int `envconfig:"MQ_DB_PORT"` + Username string `envconfig:"MQ_DB_USERNAME"` + Password string `envconfig:"MQ_DB_PASSWORD"` + Db string `envconfig:"MQ_DB_NAME"` + QueryString string `envconfig:"DB_QUERYSTRING"` +} diff --git a/controllers/api/v1/user_controller.go b/controllers/api/v1/user_controller.go index 43da013..df113eb 100644 --- a/controllers/api/v1/user_controller.go +++ b/controllers/api/v1/user_controller.go @@ -25,11 +25,11 @@ type UserController struct { userService *services.UserService logger *zap.Logger event *events.Events - pub *watermill.WatermillPubliser + pub *watermill.WatermillPublisher } // NewUserController returns a user -func NewUserController(goqu *goqu.Database, logger *zap.Logger, event *events.Events, pub *watermill.WatermillPubliser) (*UserController, error) { +func NewUserController(goqu *goqu.Database, logger *zap.Logger, event *events.Events, pub *watermill.WatermillPublisher) (*UserController, error) { userModel, err := models.InitUserModel(goqu) if err != nil { return nil, err diff --git a/database/watermill_mysql_schema.go b/database/watermill_mysql_schema.go new file mode 100644 index 0000000..703bf98 --- /dev/null +++ b/database/watermill_mysql_schema.go @@ -0,0 +1,115 @@ +package database + +import ( + stdSQL "database/sql" + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/Improwised/golang-api/config" + "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/pkg/errors" +) + +// custom schema for mysql database +// source: https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/schema_adapter_mysql.go +type MySQLSchema struct { + GenerateMessagesTableName func(topic string) string + SubscribeBatchSize int +} +func (s MySQLSchema) SchemaInitializingQueries(topic string) []string { + createMessagesTable := strings.Join([]string{ + "CREATE TABLE IF NOT EXISTS " + s.MessagesTable(topic) + " (", + "`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,", + "`uuid` VARCHAR(36) NOT NULL,", + "`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,", + "`payload` BINARY(255) NULL,", + "`metadata` JSON DEFAULT NULL", + ");", + }, "\n") + + return []string{createMessagesTable} +} + +func (s MySQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) { + insertQuery := fmt.Sprintf( + `INSERT INTO %s (uuid, payload, metadata) VALUES %s`, + s.MessagesTable(topic), + strings.TrimRight(strings.Repeat(`(?,?,?),`, len(msgs)), ","), + ) + + args, err := defaultInsertArgs(msgs) + if err != nil { + return "", nil, err + } + + return insertQuery, args, nil +} + +func (s MySQLSchema) batchSize() int { + if s.SubscribeBatchSize == 0 { + return 100 + } + + return s.SubscribeBatchSize +} + +func (s MySQLSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) (string, []interface{}) { + nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup) + selectQuery := ` + SELECT offset, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + ` + WHERE + offset > (` + nextOffsetQuery + `) + ORDER BY + offset ASC + LIMIT ` + fmt.Sprintf("%d", s.batchSize()) + + return selectQuery, nextOffsetArgs +} + +func (s MySQLSchema) UnmarshalMessage(row sql.Scanner) (sql.Row, error) { + r := sql.Row{} + err := row.Scan(&r.Offset, &r.UUID, &r.Payload, &r.Metadata) + if err != nil { + return sql.Row{}, errors.Wrap(err, "could not scan message row") + } + + msg := message.NewMessage(string(r.UUID), r.Payload) + + if r.Metadata != nil { + err = json.Unmarshal(r.Metadata, &msg.Metadata) + if err != nil { + return sql.Row{}, errors.Wrap(err, "could not unmarshal metadata as JSON") + } + } + + r.Msg = msg + + return r, nil +} + +func (s MySQLSchema) MessagesTable(topic string) string { + if s.GenerateMessagesTableName != nil { + return s.GenerateMessagesTableName(topic) + } + return fmt.Sprintf("`watermill_%s`", topic) +} + +func (s MySQLSchema) SubscribeIsolationLevel() stdSQL.IsolationLevel { + // MySQL requires serializable isolation level for not losing messages. + return stdSQL.LevelSerializable +} + +func MysqlDBConnection(cfg config.Sql) (*stdSQL.DB, error) { + dbURL := cfg.Username + ":" + cfg.Password + "@tcp(" + cfg.Host + ":" + strconv.Itoa(cfg.Port) + ")/" + cfg.Db + + db, err := stdSQL.Open("mysql", dbURL) + if err != nil { + return nil, err + } + + return db, err +} diff --git a/database/watermill_postgres_schema.go b/database/watermill_postgres_schema.go new file mode 100644 index 0000000..ca8c95c --- /dev/null +++ b/database/watermill_postgres_schema.go @@ -0,0 +1,167 @@ +package database + +import ( + stdSQL "database/sql" + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/Improwised/golang-api/config" + "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" + "github.com/pkg/errors" + + "github.com/ThreeDotsLabs/watermill/message" +) + +// custom schema for postgres database +// source: https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/schema_adapter_postgresql.go + +type PostgreSQLSchema struct { + GenerateMessagesTableName func(topic string) string + SubscribeBatchSize int +} + +func (s PostgreSQLSchema) SchemaInitializingQueries(topic string) []string { + createMessagesTable := ` + CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(topic) + ` ( + "offset" SERIAL, + "uuid" VARCHAR(36) NOT NULL, + "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + "payload" bytea DEFAULT NULL, + "metadata" JSON DEFAULT NULL, + "transaction_id" xid8 NOT NULL, + PRIMARY KEY ("transaction_id", "offset") + ); + ` + + return []string{createMessagesTable} +} +func defaultInsertArgs(msgs message.Messages) ([]interface{}, error) { + var args []interface{} + for _, msg := range msgs { + metadata, err := json.Marshal(msg.Metadata) + if err != nil { + return nil, errors.Wrapf(err, "could not marshal metadata into JSON for message %s", msg.UUID) + } + + args = append(args, msg.UUID, msg.Payload, metadata) + } + + return args, nil +} + +func (s PostgreSQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) { + insertQuery := fmt.Sprintf( + `INSERT INTO %s (uuid, payload, metadata, transaction_id) VALUES %s`, + s.MessagesTable(topic), + defaultInsertMarkers(len(msgs)), + ) + + args, err := defaultInsertArgs(msgs) + if err != nil { + return "", nil, err + } + // log.Fatal("insertQuery", args) + return insertQuery, args, nil +} + +func defaultInsertMarkers(count int) string { + result := strings.Builder{} + + index := 1 + for i := 0; i < count; i++ { + result.WriteString(fmt.Sprintf("($%d,$%d,$%d,pg_current_xact_id()),", index, index+1, index+2)) + index += 3 + } + + return strings.TrimRight(result.String(), ",") +} + +func (s PostgreSQLSchema) batchSize() int { + if s.SubscribeBatchSize == 0 { + return 100 + } + + return s.SubscribeBatchSize +} + +func (s PostgreSQLSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) (string, []interface{}) { + // Query inspired by https://event-driven.io/en/ordering_in_postgres_outbox/ + + nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup) + selectQuery := ` + WITH last_processed AS ( + ` + nextOffsetQuery + ` + ) + + SELECT "offset", transaction_id, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + ` + + WHERE + ( + ( + transaction_id = (SELECT last_processed_transaction_id FROM last_processed) + AND + "offset" > (SELECT offset_acked FROM last_processed) + ) + OR + (transaction_id > (SELECT last_processed_transaction_id FROM last_processed)) + ) + AND + transaction_id < pg_snapshot_xmin(pg_current_snapshot()) + ORDER BY + transaction_id ASC, + "offset" ASC + LIMIT ` + fmt.Sprintf("%d", s.batchSize()) + + return selectQuery, nextOffsetArgs +} + +func (s PostgreSQLSchema) UnmarshalMessage(row sql.Scanner) (sql.Row, error) { + r := sql.Row{} + var transactionID int64 + + err := row.Scan(&r.Offset, &transactionID, &r.UUID, &r.Payload, &r.Metadata) + if err != nil { + return sql.Row{}, errors.Wrap(err, "could not scan message row") + } + + msg := message.NewMessage(string(r.UUID), r.Payload) + + if r.Metadata != nil { + err = json.Unmarshal(r.Metadata, &msg.Metadata) + if err != nil { + return sql.Row{}, errors.Wrap(err, "could not unmarshal metadata as JSON") + } + } + + r.Msg = msg + r.ExtraData = map[string]any{ + "transaction_id": transactionID, + } + + return r, nil +} + +func (s PostgreSQLSchema) MessagesTable(topic string) string { + if s.GenerateMessagesTableName != nil { + return s.GenerateMessagesTableName(topic) + } + return fmt.Sprintf(`"watermill_%s"`, topic) +} + +func (s PostgreSQLSchema) SubscribeIsolationLevel() stdSQL.IsolationLevel { + // For Postgres Repeatable Read is enough. + return stdSQL.LevelSerializable +} + +func PostgresDBConnection(cfg config.Sql) (*stdSQL.DB, error) { + dbURL := "postgres://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + strconv.Itoa(cfg.Port) + "/" + cfg.Db + "?" + cfg.QueryString + + db, err := stdSQL.Open("postgres", dbURL) + if err != nil { + return nil, err + } + return db, err + +} diff --git a/go.mod b/go.mod index cded92d..fbf3194 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/Improwised/golang-api -go 1.19 +go 1.21 + +toolchain go1.21.6 require ( clevergo.tech/jsend v1.1.3 @@ -45,6 +47,7 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.5 // indirect github.com/Rican7/retry v0.3.1 // indirect + github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 github.com/andybalholm/brotli v1.0.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect @@ -92,7 +95,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -100,7 +103,6 @@ require ( github.com/rabbitmq/amqp091-go v1.9.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rivo/uniseg v0.4.4 // indirect - github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sony/gobreaker v0.5.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index 9976686..0e85c46 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= +cloud.google.com/go/kms v1.15.5 h1:pj1sRfut2eRbD9pFRjNnPNg/CzJPuQAzUujMIM1vVeM= +cloud.google.com/go/kms v1.15.5/go.mod h1:cU2H5jnp6G2TDpUGZyqTCoy1n16fbubHZjmVXSMtwDI= cloud.google.com/go/pubsub v1.36.0 h1:cgaJ0mgwEM0YNNATXFXSnLfji6XcMieTc2mRjH1ZYdY= cloud.google.com/go/pubsub v1.36.0/go.mod h1:qQvGW4ANjuYcOpTMTy5+u6HBIoJF00cPfQ/ubMcc/D8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -34,6 +36,8 @@ github.com/ThreeDotsLabs/watermill-kafka/v2 v2.5.0 h1:/KYEjLlLx6nW3jn6AEcwAlWkPW github.com/ThreeDotsLabs/watermill-kafka/v2 v2.5.0/go.mod h1:w+9jhI7x5ZP67ceSUIIpkgLzjAakotfHX4sWyqsKVjs= github.com/ThreeDotsLabs/watermill-redisstream v1.2.2 h1:/fFHagJiObMBbYIDrygRoAq+RxqLPcQZdGi6b0ViG08= github.com/ThreeDotsLabs/watermill-redisstream v1.2.2/go.mod h1:ZRe0VpA0Ho/4MESUrXdqJMaWtiWhi4emxIYpqsxi98Y= +github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 h1:wswlLYY0Jc0tloj3lty4Y+VTEA8AM1vYfrIDwWtqyJk= +github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0/go.mod h1:83l/4sKaLHwoHJlrAsDLaXcHN+QOHHntAAyabNmiuO4= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= @@ -53,6 +57,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -78,6 +84,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -90,7 +98,6 @@ github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3Bop github.com/go-gorp/gorp/v3 v3.1.0 h1:ItKF/Vbuj31dmV4jxA1qblpSwkl9g1typ24xoe70IGs= github.com/go-gorp/gorp/v3 v3.1.0/go.mod h1:dLEjIyyRNiXvNZ8PSmzpt1GsWAUK8kjVhEpjH8TixEw= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -124,7 +131,6 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -148,14 +154,12 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= @@ -175,6 +179,22 @@ github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/C github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= +github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= +github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgtype v1.4.2 h1:t+6LWm5eWPLX1H5Se702JSBcirq6uWa4jiG4wV1rAWY= +github.com/jackc/pgtype v1.4.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig= +github.com/jackc/pgx/v4 v4.8.1 h1:SUbCLP2pXvf/Sr/25KsuI4aTxiFYIvpfk4l6aTSdyCw= +github.com/jackc/pgx/v4 v4.8.1/go.mod h1:4HOLxrl8wToZJReD04/yB20GDwf4KBYETvlHciCnwW0= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= @@ -329,6 +349,8 @@ github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.einride.tech/aip v0.65.0 h1:aqKEV1g9diXcR6DAxBVZoJn6ho8SuC+TOZFXzuu7kLU= +go.einride.tech/aip v0.65.0/go.mod h1:wcRZ57XFEvERWLPy9VqDBtXc/ZFj7ugsd32F5o8Th+s= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.31.0 h1:J8jI81RCB7U9a3qsTZXM/38XrvbLJCye6J32bfQctYY= @@ -337,13 +359,13 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= -go.opentelemetry.io/otel v1.6.1 h1:6r1YrcTenBvYa1x491d0GGpTVBsNECmrc/K6b+zDeis= go.opentelemetry.io/otel v1.6.1/go.mod h1:blzUabWHkX6LJewxvadmzafgh/wnvBSDBdOuwkAtrWQ= go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= -go.opentelemetry.io/otel/trace v1.6.1 h1:f8c93l5tboBYZna1nWk0W9DYyMzJXDWdZcJZ0Kb400U= +go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/trace v1.6.1/go.mod h1:RkFRM1m0puWIq10oxImnGEduNBzxiN7TXluRBtE+5j0= go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= @@ -379,7 +401,6 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -430,7 +451,6 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -452,13 +472,12 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.157.0 h1:ORAeqmbrrozeyw5NjnMxh7peHO0UzV4wWYSwZeCUb20= google.golang.org/api v0.157.0/go.mod h1:+z4v4ufbZ1WEpld6yMGHyggs+PmAHiaLNj5ytP3N01g= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -488,8 +507,6 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/watermill/publisher.go b/pkg/watermill/publisher.go index ca51466..04c3b45 100644 --- a/pkg/watermill/publisher.go +++ b/pkg/watermill/publisher.go @@ -8,9 +8,11 @@ import ( "github.com/Improwised/golang-api/cli/workers" "github.com/Improwised/golang-api/config" + "github.com/Improwised/golang-api/database" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud" + "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" @@ -18,12 +20,15 @@ import ( "github.com/redis/go-redis/v9" ) -type WatermillPubliser struct { +type WatermillPublisher struct { publisher message.Publisher } - -func InitPubliser(cfg config.AppConfig) (*WatermillPubliser, error) { +// ? single database or mualtipal database +func InitPublisher(cfg config.AppConfig,isDeadLetterQ bool) (*WatermillPublisher, error) { logger = watermill.NewStdLogger(cfg.MQ.Debug, cfg.MQ.Track) + if isDeadLetterQ { + return initSqlPub(cfg) + } switch cfg.MQ.Dialect { case "amqp": return initAmqpPub(cfg) @@ -36,8 +41,11 @@ func InitPubliser(cfg config.AppConfig) (*WatermillPubliser, error) { case "googlecloud": return initGoogleCloudPub(cfg) + + case "sql": + return initSqlPub(cfg) default: - return &WatermillPubliser{}, nil + return &WatermillPublisher{}, nil } } @@ -45,7 +53,7 @@ func InitPubliser(cfg config.AppConfig) (*WatermillPubliser, error) { // send message into queue using topic name // // struct must from worker package(/cli/workers) -func (wp *WatermillPubliser) Publish(topic string, inputStruct interface{}) error { +func (wp *WatermillPublisher) Publish(topic string, inputStruct interface{}) error { // if broker is not set then return nil if wp.publisher == nil { return nil @@ -69,14 +77,14 @@ func (wp *WatermillPubliser) Publish(topic string, inputStruct interface{}) erro return err } -func initAmqpPub(cfg config.AppConfig) (*WatermillPubliser, error) { +func initAmqpPub(cfg config.AppConfig) (*WatermillPublisher, error) { amqpConfig := amqp.NewDurableQueueConfig(cfg.MQ.Amqp.AmqbUrl) publisher, err := amqp.NewPublisher(amqpConfig, logger) - return &WatermillPubliser{publisher: publisher}, err + return &WatermillPublisher{publisher: publisher}, err } -func initRedisPub(cfg config.AppConfig) (*WatermillPubliser, error) { +func initRedisPub(cfg config.AppConfig) (*WatermillPublisher, error) { pubClient := redis.NewClient(&redis.Options{ Addr: cfg.MQ.Redis.RedisUrl, Username: cfg.MQ.Redis.UserName, @@ -89,10 +97,10 @@ func initRedisPub(cfg config.AppConfig) (*WatermillPubliser, error) { }, logger, ) - return &WatermillPubliser{publisher: publisher}, err + return &WatermillPublisher{publisher: publisher}, err } -func initKafkaPub(cfg config.AppConfig) (*WatermillPubliser, error) { +func initKafkaPub(cfg config.AppConfig) (*WatermillPublisher, error) { publisher, err := kafka.NewPublisher( kafka.PublisherConfig{ Brokers: cfg.MQ.Kafka.KafkaBroker, @@ -101,16 +109,70 @@ func initKafkaPub(cfg config.AppConfig) (*WatermillPubliser, error) { }, logger, ) - return &WatermillPubliser{publisher: publisher}, err + return &WatermillPublisher{publisher: publisher}, err } -func initGoogleCloudPub(cfg config.AppConfig) (*WatermillPubliser, error) { +func initGoogleCloudPub(cfg config.AppConfig) (*WatermillPublisher, error) { publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{ ProjectID: cfg.MQ.GoogleCloud.ProjectID, ConnectTimeout: 10 * time.Second, PublishTimeout: 10 * time.Second, - Marshaler: googlecloud.DefaultMarshalerUnmarshaler{}, + Marshaler: googlecloud.DefaultMarshalerUnmarshaler{}, }, logger) - return &WatermillPubliser{publisher: publisher}, err + return &WatermillPublisher{publisher: publisher}, err +} + +func initSqlPub(cfg config.AppConfig) (*WatermillPublisher, error) { + switch cfg.MQ.Sql.Dialect { + case "postgres": + return initPostgresPub(cfg) + case "mysql": + return initMysqlPub(cfg) + default: + return nil, nil + } + +} + +func initPostgresPub(cfg config.AppConfig) (*WatermillPublisher, error) { + db, err := database.PostgresDBConnection(cfg.MQ.Sql) + if err != nil { + return nil, err + } + publisher, err := sql.NewPublisher( + db, + sql.PublisherConfig{ + // we are customizing schema adapter because default schema adapter has some issue + SchemaAdapter: database.PostgreSQLSchema{}, + AutoInitializeSchema: true, + }, + + logger, + ) + if err != nil { + return nil, err + } + return &WatermillPublisher{publisher: publisher}, nil +} + +func initMysqlPub(cfg config.AppConfig) (*WatermillPublisher, error) { + db, err := database.MysqlDBConnection(cfg.MQ.Sql) + if err != nil { + return nil, err + } + publisher, err := sql.NewPublisher( + db, + sql.PublisherConfig{ + SchemaAdapter: database.MySQLSchema{}, + AutoInitializeSchema: true, + }, + + logger, + ) + if err != nil { + return nil, err + } + + return &WatermillPublisher{publisher: publisher}, nil } diff --git a/pkg/watermill/subscriber.go b/pkg/watermill/subscriber.go index f347217..0210137 100644 --- a/pkg/watermill/subscriber.go +++ b/pkg/watermill/subscriber.go @@ -7,10 +7,12 @@ import ( "cloud.google.com/go/pubsub" "github.com/Improwised/golang-api/config" + "github.com/Improwised/golang-api/database" "github.com/Shopify/sarama" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp" + "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud" "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" @@ -28,8 +30,11 @@ type WatermillSubscriber struct { Router *message.Router } -func InitSubscriber(cfg config.AppConfig) (*WatermillSubscriber, error) { +func InitSubscriber(cfg config.AppConfig, isDeadLetterQ bool) (*WatermillSubscriber, error) { logger = watermill.NewStdLogger(cfg.MQ.Debug, cfg.MQ.Track) + if isDeadLetterQ { + return initSqlSub(cfg) + } switch cfg.MQ.Dialect { case "amqp": return initAmqpSub(cfg) @@ -43,6 +48,8 @@ func InitSubscriber(cfg config.AppConfig) (*WatermillSubscriber, error) { case "googlecloud": return initGoogleCloudSub(cfg) + case "sql": + return initSqlSub(cfg) default: return nil, nil } @@ -55,12 +62,12 @@ func (ws *WatermillSubscriber) InitRouter(cfg config.AppConfig, delayTime, MaxRe return nil, err } - pub, err := InitPubliser(cfg) + pub, err := InitPublisher(cfg, true) if err != nil { return nil, err } - poq, err := middleware.PoisonQueue(pub.publisher, cfg.MQ.DeadQueue) + poq, err := middleware.PoisonQueue(pub.publisher, cfg.MQ.DeadLetterQ) if err != nil { return nil, err } @@ -83,7 +90,7 @@ func (ws *WatermillSubscriber) InitRouter(cfg config.AppConfig, delayTime, MaxRe return ws, nil } -func (ws *WatermillSubscriber) Run(topic string, handlerFunc message.NoPublishHandlerFunc) error { +func (ws *WatermillSubscriber) Run(topic,handlerName string ,handlerFunc message.NoPublishHandlerFunc) error { if ws.Subscriber == nil { return fmt.Errorf("subscriber is nil") } @@ -97,7 +104,7 @@ func (ws *WatermillSubscriber) Run(topic string, handlerFunc message.NoPublishHa } ws.Router.AddNoPublisherHandler( - "handler", + handlerName, topic, ws.Subscriber, handlerFunc, @@ -179,3 +186,57 @@ func initGoogleCloudSub(cfg config.AppConfig) (*WatermillSubscriber, error) { return &WatermillSubscriber{Subscriber: subscriber}, err } + +func initSqlSub(cfg config.AppConfig) (*WatermillSubscriber, error) { + switch cfg.MQ.Sql.Dialect { + case "postgres": + return initPostgresSub(cfg) + + case "mysql": + return initMysqlSub(cfg) + + default: + return nil, nil + } +} + +func initPostgresSub(cfg config.AppConfig) (*WatermillSubscriber, error) { + db, err := database.PostgresDBConnection(cfg.MQ.Sql) + if err != nil { + return nil, err + } + subscriber, err := sql.NewSubscriber( + db, + sql.SubscriberConfig{ + SchemaAdapter: database.PostgreSQLSchema{}, + OffsetsAdapter: sql.DefaultPostgreSQLOffsetsAdapter{}, + InitializeSchema: true, + }, + logger, + ) + if err != nil { + return nil, err + } + return &WatermillSubscriber{Subscriber: subscriber}, err +} + +func initMysqlSub(cfg config.AppConfig) (*WatermillSubscriber, error) { + db, err := database.MysqlDBConnection(cfg.MQ.Sql) + if err != nil { + return nil, err + } + subscriber, err := sql.NewSubscriber( + db, + sql.SubscriberConfig{ + SchemaAdapter: database.MySQLSchema{}, + OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{}, + InitializeSchema: true, + }, + logger, + ) + if err != nil { + return nil, err + } + + return &WatermillSubscriber{Subscriber: subscriber}, err +} diff --git a/routes/main.go b/routes/main.go index 3d2152e..a6a8c63 100644 --- a/routes/main.go +++ b/routes/main.go @@ -20,7 +20,7 @@ import ( var mu sync.Mutex // Setup func -func Setup(app *fiber.App, goqu *goqu.Database, logger *zap.Logger, config config.AppConfig, events *events.Events, pMetrics *pMetrics.PrometheusMetrics, pub *watermill.WatermillPubliser) error { +func Setup(app *fiber.App, goqu *goqu.Database, logger *zap.Logger, config config.AppConfig, events *events.Events, pMetrics *pMetrics.PrometheusMetrics, pub *watermill.WatermillPublisher) error { mu.Lock() app.Use(middlewares.LogHandler(logger, pMetrics)) @@ -69,7 +69,7 @@ func setupAuthController(v1 fiber.Router, goqu *goqu.Database, logger *zap.Logge return nil } -func setupUserController(v1 fiber.Router, goqu *goqu.Database, logger *zap.Logger, middlewares middlewares.Middleware, events *events.Events, pub *watermill.WatermillPubliser) error { +func setupUserController(v1 fiber.Router, goqu *goqu.Database, logger *zap.Logger, middlewares middlewares.Middleware, events *events.Events, pub *watermill.WatermillPublisher) error { userController, err := controller.NewUserController(goqu, logger, events, pub) if err != nil { return err