From abd9c669b7bce04ac6493dc9e5a2967f2cdb7a43 Mon Sep 17 00:00:00 2001 From: kiwiidb Date: Tue, 18 Apr 2023 11:07:49 +0200 Subject: [PATCH 1/5] add program for republishing invoices --- rabbitmq/rabbitmq.go | 7 ++-- republish_invoices/main.go | 84 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 republish_invoices/main.go diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index 47d378dd..486fd6ca 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -39,6 +39,7 @@ type ( type Client interface { SubscribeToLndInvoices(context.Context, IncomingInvoiceHandler) error StartPublishInvoices(context.Context, SubscribeToInvoicesFunc, EncodeOutgoingInvoiceFunc) error + PublishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeOutgoingInvoiceFunc) error // Close will close all connections to rabbitmq Close() error } @@ -274,13 +275,13 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS case <-ctx.Done(): return context.Canceled case incomingInvoice := <-in: - err = client.publishToLndhubExchange(ctx, incomingInvoice, payloadFunc) + err = client.PublishToLndhubExchange(ctx, incomingInvoice, payloadFunc) if err != nil { captureErr(client.logger, err) } case outgoing := <-out: - err = client.publishToLndhubExchange(ctx, outgoing, payloadFunc) + err = client.PublishToLndhubExchange(ctx, outgoing, payloadFunc) if err != nil { captureErr(client.logger, err) @@ -289,7 +290,7 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS } } -func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeOutgoingInvoiceFunc) error { +func (client *DefaultClient) PublishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeOutgoingInvoiceFunc) error { payload := bufPool.Get().(*bytes.Buffer) err := payloadFunc(ctx, payload, invoice) if err != nil { diff --git a/republish_invoices/main.go b/republish_invoices/main.go new file mode 100644 index 00000000..28bcd324 --- /dev/null +++ b/republish_invoices/main.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + + "github.com/getAlby/lndhub.go/db" + "github.com/getAlby/lndhub.go/db/models" + "github.com/getAlby/lndhub.go/lib/service" + "github.com/getAlby/lndhub.go/rabbitmq" + "github.com/joho/godotenv" + "github.com/kelseyhightower/envconfig" + "github.com/sirupsen/logrus" +) + +func main() { + + c := &service.Config{} + // Load configruation from environment variables + err := godotenv.Load(".env") + if err != nil { + fmt.Println("Failed to load .env file") + } + startId, endId, err := loadStartAndEndIdFromEnv() + if err != nil { + log.Fatalf("Could not load start and end id from env %v", err) + } + err = envconfig.Process("", c) + if err != nil { + log.Fatalf("Error loading environment variables: %v", err) + } + // Open a DB connection based on the configured DATABASE_URI + dbConn, err := db.Open(c) + if err != nil { + log.Fatalf("Error initializing db connection: %v", err) + } + rabbitmqClient, err := rabbitmq.Dial(c.RabbitMQUri, + rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange), + rabbitmq.WithLndHubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange), + rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName), + ) + if err != nil { + log.Fatal(err) + } + + // close the connection gently at the end of the runtime + defer rabbitmqClient.Close() + + result := []models.Invoice{} + err = dbConn.NewSelect().Model(&result).Where("id > ?", startId).Where("id < ?", endId).Scan(context.Background()) + if err != nil { + log.Fatal(err) + } + logrus.Infof("Found %d invoices", len(result)) + svc := &service.LndhubService{ + Config: c, + DB: dbConn, + RabbitMQClient: rabbitmqClient, + InvoicePubSub: service.NewPubsub(), + } + for _, inv := range result { + logrus.Infof("Publishing invoice with hash %s", inv.RHash) + err = svc.RabbitMQClient.PublishToLndhubExchange(context.Background(), inv, svc.EncodeInvoiceWithUserLogin) + if err != nil { + logrus.Error(err) + } + } + +} + +func loadStartAndEndIdFromEnv() (start, end int, err error) { + start, err = strconv.Atoi(os.Getenv("START_ID")) + if err != nil { + return 0, 0, err + } + end, err = strconv.Atoi(os.Getenv("END_ID")) + if err != nil { + return 0, 0, err + } + return start, end, nil +} From a933371bf667842db5d4cf31a5dfa34c694736c7 Mon Sep 17 00:00:00 2001 From: kiwiidb Date: Tue, 18 Apr 2023 11:10:51 +0200 Subject: [PATCH 2/5] add dryrun --- republish_invoices/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/republish_invoices/main.go b/republish_invoices/main.go index 28bcd324..3dbfd931 100644 --- a/republish_invoices/main.go +++ b/republish_invoices/main.go @@ -61,8 +61,12 @@ func main() { RabbitMQClient: rabbitmqClient, InvoicePubSub: service.NewPubsub(), } + dryRun := os.Getenv("DRY_RUN") == "true" for _, inv := range result { logrus.Infof("Publishing invoice with hash %s", inv.RHash) + if dryRun { + continue + } err = svc.RabbitMQClient.PublishToLndhubExchange(context.Background(), inv, svc.EncodeInvoiceWithUserLogin) if err != nil { logrus.Error(err) From 99ef9a06c290f859f02ff0075e9ff0a8b77c2d04 Mon Sep 17 00:00:00 2001 From: kiwiidb Date: Tue, 18 Apr 2023 11:46:57 +0200 Subject: [PATCH 3/5] use date instead of id --- republish_invoices/main.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/republish_invoices/main.go b/republish_invoices/main.go index 3dbfd931..33548e76 100644 --- a/republish_invoices/main.go +++ b/republish_invoices/main.go @@ -5,7 +5,7 @@ import ( "fmt" "log" "os" - "strconv" + "time" "github.com/getAlby/lndhub.go/db" "github.com/getAlby/lndhub.go/db/models" @@ -24,7 +24,7 @@ func main() { if err != nil { fmt.Println("Failed to load .env file") } - startId, endId, err := loadStartAndEndIdFromEnv() + startDate, endDate, err := loadStartAndEndIdFromEnv() if err != nil { log.Fatalf("Could not load start and end id from env %v", err) } @@ -50,7 +50,7 @@ func main() { defer rabbitmqClient.Close() result := []models.Invoice{} - err = dbConn.NewSelect().Model(&result).Where("id > ?", startId).Where("id < ?", endId).Scan(context.Background()) + err = dbConn.NewSelect().Model(&result).Where("settled_at > ?", startDate).Where("settled_at < ?", endDate).Scan(context.Background()) if err != nil { log.Fatal(err) } @@ -75,14 +75,11 @@ func main() { } -func loadStartAndEndIdFromEnv() (start, end int, err error) { - start, err = strconv.Atoi(os.Getenv("START_ID")) +func loadStartAndEndIdFromEnv() (start, end time.Time, err error) { + start, err = time.Parse(time.RFC3339, os.Getenv("START_DATE")) if err != nil { - return 0, 0, err + return } - end, err = strconv.Atoi(os.Getenv("END_ID")) - if err != nil { - return 0, 0, err - } - return start, end, nil + end, err = time.Parse(time.RFC3339, os.Getenv("END_DATE")) + return } From aaf886fa3094cfdaebe68519060ed15635e6faae Mon Sep 17 00:00:00 2001 From: kiwiidb Date: Tue, 18 Apr 2023 11:53:01 +0200 Subject: [PATCH 4/5] add log line --- republish_invoices/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/republish_invoices/main.go b/republish_invoices/main.go index 33548e76..f384ce0f 100644 --- a/republish_invoices/main.go +++ b/republish_invoices/main.go @@ -72,6 +72,7 @@ func main() { logrus.Error(err) } } + logrus.Infof("Published %d invoices", len(result)) } From 232b5b5740d0f6c6bd4669106d92a2ef008ac23a Mon Sep 17 00:00:00 2001 From: kiwiidb Date: Tue, 18 Apr 2023 13:47:29 +0200 Subject: [PATCH 5/5] json logger / count errors --- republish_invoices/main.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/republish_invoices/main.go b/republish_invoices/main.go index f384ce0f..51e517ac 100644 --- a/republish_invoices/main.go +++ b/republish_invoices/main.go @@ -3,12 +3,12 @@ package main import ( "context" "fmt" - "log" "os" "time" "github.com/getAlby/lndhub.go/db" "github.com/getAlby/lndhub.go/db/models" + "github.com/getAlby/lndhub.go/lib" "github.com/getAlby/lndhub.go/lib/service" "github.com/getAlby/lndhub.go/rabbitmq" "github.com/joho/godotenv" @@ -24,18 +24,19 @@ func main() { if err != nil { fmt.Println("Failed to load .env file") } + logger := lib.Logger(c.LogFilePath) startDate, endDate, err := loadStartAndEndIdFromEnv() if err != nil { - log.Fatalf("Could not load start and end id from env %v", err) + logger.Fatalf("Could not load start and end id from env %v", err) } err = envconfig.Process("", c) if err != nil { - log.Fatalf("Error loading environment variables: %v", err) + logger.Fatalf("Error loading environment variables: %v", err) } // Open a DB connection based on the configured DATABASE_URI dbConn, err := db.Open(c) if err != nil { - log.Fatalf("Error initializing db connection: %v", err) + logger.Fatalf("Error initializing db connection: %v", err) } rabbitmqClient, err := rabbitmq.Dial(c.RabbitMQUri, rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange), @@ -43,7 +44,7 @@ func main() { rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName), ) if err != nil { - log.Fatal(err) + logger.Fatal(err) } // close the connection gently at the end of the runtime @@ -52,27 +53,30 @@ func main() { result := []models.Invoice{} err = dbConn.NewSelect().Model(&result).Where("settled_at > ?", startDate).Where("settled_at < ?", endDate).Scan(context.Background()) if err != nil { - log.Fatal(err) + logger.Fatal(err) } logrus.Infof("Found %d invoices", len(result)) svc := &service.LndhubService{ Config: c, DB: dbConn, + Logger: logger, RabbitMQClient: rabbitmqClient, InvoicePubSub: service.NewPubsub(), } dryRun := os.Getenv("DRY_RUN") == "true" + errCount := 0 for _, inv := range result { - logrus.Infof("Publishing invoice with hash %s", inv.RHash) + logger.Infof("Publishing invoice with hash %s", inv.RHash) if dryRun { continue } err = svc.RabbitMQClient.PublishToLndhubExchange(context.Background(), inv, svc.EncodeInvoiceWithUserLogin) if err != nil { - logrus.Error(err) + errCount += 1 + logger.Error(err) } } - logrus.Infof("Published %d invoices", len(result)) + logger.Infof("Published %d invoices, # errors %d", len(result), errCount) }