Skip to content

Commit

Permalink
feat: move events to transactions service (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
rolznz committed Aug 19, 2024
1 parent 360be80 commit a5bc8c9
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 153 deletions.
22 changes: 11 additions & 11 deletions alby/alby_oauth_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,6 @@ func (svc *albyOAuthService) LinkAccount(ctx context.Context, lnClient lnclient.
}

func (svc *albyOAuthService) ConsumeEvent(ctx context.Context, event *events.Event, globalProperties map[string]interface{}) {
// run non-blocking
go svc.consumeEvent(ctx, event, globalProperties)
}

func (svc *albyOAuthService) consumeEvent(ctx context.Context, event *events.Event, globalProperties map[string]interface{}) {
// TODO: rename this config option to be specific to the alby API
if !svc.cfg.GetEnv().LogEvents {
logger.Logger.WithField("event", event).Debug("Skipped sending to alby events API")
Expand All @@ -478,6 +473,11 @@ func (svc *albyOAuthService) consumeEvent(ctx context.Context, event *events.Eve
return
}

if strings.HasPrefix(event.Event, "nwc_lnclient_") {
// don't consume internal LNClient events
return
}

if event.Event == "nwc_payment_received" {
type paymentReceivedEventProperties struct {
PaymentHash string `json:"payment_hash"`
Expand Down Expand Up @@ -507,24 +507,24 @@ func (svc *albyOAuthService) consumeEvent(ctx context.Context, event *events.Eve
}
}

if event.Event == "nwc_payment_failed_async" {
paymentFailedAsyncProperties, ok := event.Properties.(*events.PaymentFailedAsyncProperties)
if event.Event == "nwc_payment_failed" {
transaction, ok := event.Properties.(*db.Transaction)
if !ok {
logger.Logger.WithField("event", event).Error("Failed to cast event")
return
}

type paymentSentEventProperties struct {
type paymentFailedEventProperties struct {
PaymentHash string `json:"payment_hash"`
Reason string `json:"reason"`
}

// pass a new custom event with less detail
event = &events.Event{
Event: event.Event,
Properties: &paymentSentEventProperties{
PaymentHash: paymentFailedAsyncProperties.Transaction.PaymentHash,
Reason: paymentFailedAsyncProperties.Reason,
Properties: &paymentFailedEventProperties{
PaymentHash: transaction.PaymentHash,
Reason: transaction.FailureReason,
},
}
}
Expand Down
26 changes: 26 additions & 0 deletions db/migrations/202408191242_transaction_failure_reason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package migrations

import (
_ "embed"

"github.com/go-gormigrate/gormigrate/v2"
"gorm.io/gorm"
)

// This migration removes old app permissions for request methods (now we use scopes)
var _202408191242_transaction_failure_reason = &gormigrate.Migration{
ID: "202408191242_transaction_failure_reason",
Migrate: func(tx *gorm.DB) error {

if err := tx.Exec(`
ALTER TABLE transactions ADD failure_reason string;
`).Error; err != nil {
return err
}

return nil
},
Rollback: func(tx *gorm.DB) error {
return nil
},
}
1 change: 1 addition & 0 deletions db/migrations/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func Migrate(gormDB *gorm.DB) error {
_202407201604_transactions_indexes,
_202407262257_remove_invalid_scopes,
_202408061737_add_boostagrams_and_use_json,
_202408191242_transaction_error,

Check failure on line 23 in db/migrations/migrate.go

View workflow job for this annotation

GitHub Actions / build (x86_64, linux-amd64, x86_64-unknown-linux-gnu)

undefined: _202408191242_transaction_error

Check failure on line 23 in db/migrations/migrate.go

View workflow job for this annotation

GitHub Actions / build

undefined: _202408191242_transaction_error

Check failure on line 23 in db/migrations/migrate.go

View workflow job for this annotation

GitHub Actions / build (armv6, arm-unknown-linux-gnueabihf)

undefined: _202408191242_transaction_error

Check failure on line 23 in db/migrations/migrate.go

View workflow job for this annotation

GitHub Actions / build (aarch64, linux-aarch64, aarch64-unknown-linux-gnu)

undefined: _202408191242_transaction_error
})

return m.Migrate()
Expand Down
1 change: 1 addition & 0 deletions db/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Transaction struct {
Metadata datatypes.JSON
SelfPayment bool
Boostagram datatypes.JSON
FailureReason string
}

type DBService interface {
Expand Down
5 changes: 2 additions & 3 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ func (ep *eventPublisher) Publish(event *Event) {
defer ep.subscriberMtx.Unlock()
logger.Logger.WithFields(logrus.Fields{"event": event, "global": ep.globalProperties}).Debug("Publishing event")
for _, listener := range ep.listeners {
// events are consumed in sequence as some listeners depend on earlier consumers
// (e.g. NIP-47 notifier depends on transactions service updating transactions)
listener.ConsumeEvent(context.Background(), event, ep.globalProperties)
// consume event without blocking thread
go listener.ConsumeEvent(context.Background(), event, ep.globalProperties)
}
}

Expand Down
7 changes: 0 additions & 7 deletions events/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package events

import (
"context"

"github.com/getAlby/hub/lnclient"
)

type EventSubscriber interface {
Expand Down Expand Up @@ -34,8 +32,3 @@ type ChannelBackupInfo struct {
FundingTxID string `json:"funding_tx_id"`
FundingTxVout uint32 `json:"funding_tx_vout"`
}

type PaymentFailedAsyncProperties struct {
Transaction *lnclient.Transaction
Reason string
}
8 changes: 4 additions & 4 deletions lnclient/ldk/ldk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ func (ls *LDKService) handleLdkEvent(event *ldk_node.Event) {
}

ls.eventPublisher.Publish(&events.Event{
Event: "nwc_payment_received",
Event: "nwc_lnclient_payment_received",
Properties: transaction,
})
case ldk_node.EventPaymentSuccessful:
Expand All @@ -1330,7 +1330,7 @@ func (ls *LDKService) handleLdkEvent(event *ldk_node.Event) {
}

ls.eventPublisher.Publish(&events.Event{
Event: "nwc_payment_sent",
Event: "nwc_lnclient_payment_sent",
Properties: transaction,
})
case ldk_node.EventPaymentFailed:
Expand All @@ -1353,8 +1353,8 @@ func (ls *LDKService) handleLdkEvent(event *ldk_node.Event) {
reason := ls.getPaymentFailReason(&eventType)

ls.eventPublisher.Publish(&events.Event{
Event: "nwc_payment_failed_async",
Properties: &events.PaymentFailedAsyncProperties{
Event: "nwc_lnclient_payment_failed",
Properties: &lnclient.PaymentFailedEventProperties{
Transaction: transaction,
Reason: reason,
},
Expand Down
8 changes: 4 additions & 4 deletions lnclient/lnd/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,8 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln
continue
}
eventPublisher.Publish(&events.Event{
Event: "nwc_payment_failed_async",
Properties: &events.PaymentFailedAsyncProperties{
Event: "nwc_lnclient_payment_failed",
Properties: &lnclient.PaymentFailedEventProperties{
Transaction: transaction,
Reason: payment.FailureReason.String(),
},
Expand All @@ -492,7 +492,7 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln
continue
}
eventPublisher.Publish(&events.Event{
Event: "nwc_payment_sent",
Event: "nwc_lnclient_payment_sent",
Properties: transaction,
})
default:
Expand Down Expand Up @@ -542,7 +542,7 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln
}).Info("Received new invoice")

eventPublisher.Publish(&events.Event{
Event: "nwc_payment_received",
Event: "nwc_lnclient_payment_received",
Properties: lndInvoiceToTransaction(invoice),
})
}
Expand Down
5 changes: 5 additions & 0 deletions lnclient/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ type BalancesResponse struct {

type NetworkGraphResponse = interface{}

type PaymentFailedEventProperties struct {
Transaction *Transaction
Reason string
}

// default invoice expiry in seconds (1 day)
const DEFAULT_INVOICE_EXPIRY = 86400

Expand Down
8 changes: 0 additions & 8 deletions nip47/controllers/pay_invoice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,6 @@ func (controller *nip47Controller) pay(ctx context.Context, bolt11 string, payme
return
}

controller.eventPublisher.Publish(&events.Event{
Event: "nwc_payment_succeeded",
Properties: map[string]interface{}{
"bolt11": bolt11,
"amount": paymentRequest.MSatoshi / 1000,
},
})

publishResponse(&models.Response{
ResultType: nip47Request.Method,
Result: payResponse{
Expand Down
8 changes: 1 addition & 7 deletions nip47/controllers/pay_keysend_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ func (controller *nip47Controller) payKeysend(ctx context.Context, payKeysendPar
}, tags)
return
}
controller.eventPublisher.Publish(&events.Event{
Event: "nwc_payment_succeeded",
Properties: map[string]interface{}{
"keysend": true,
"amount": payKeysendParams.Amount / 1000,
},
})

publishResponse(&models.Response{
ResultType: nip47Request.Method,
Result: payResponse{
Expand Down
5 changes: 1 addition & 4 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,11 @@ func NewService(ctx context.Context) (*service, error) {
eventPublisher: eventPublisher,
albyOAuthSvc: alby.NewAlbyOAuthService(gormDB, cfg, keys, eventPublisher),
nip47Service: nip47.NewNip47Service(gormDB, cfg, keys, eventPublisher),
transactionsService: transactions.NewTransactionsService(gormDB),
transactionsService: transactions.NewTransactionsService(gormDB, eventPublisher),
db: gormDB,
keys: keys,
}

// Note: order is important here: transactions service will update transactions
// from payment events, which will then be consumed by the NIP-47 service to send notifications
// TODO: transactions service should fire its own events
eventPublisher.RegisterSubscriber(svc.transactionsService)
eventPublisher.RegisterSubscriber(svc.nip47Service)
eventPublisher.RegisterSubscriber(svc.albyOAuthSvc)
Expand Down
14 changes: 7 additions & 7 deletions transactions/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestNotifications_ReceivedKnownPayment(t *testing.T) {
transactionsService := NewTransactionsService(svc.DB)

transactionsService.ConsumeEvent(ctx, &events.Event{
Event: "nwc_payment_received",
Event: "nwc_lnclient_payment_received",
Properties: tests.MockLNClientTransaction,
}, map[string]interface{}{})

Expand All @@ -59,7 +59,7 @@ func TestNotifications_ReceivedUnknownPayment(t *testing.T) {
transactionsService := NewTransactionsService(svc.DB)

transactionsService.ConsumeEvent(ctx, &events.Event{
Event: "nwc_payment_received",
Event: "nwc_lnclient_payment_received",
Properties: tests.MockLNClientTransaction,
}, map[string]interface{}{})

Expand Down Expand Up @@ -108,7 +108,7 @@ func TestNotifications_ReceivedKeysend(t *testing.T) {
}

transactionsService.ConsumeEvent(ctx, &events.Event{
Event: "nwc_payment_received",
Event: "nwc_lnclient_payment_received",
Properties: transaction,
}, map[string]interface{}{})

Expand Down Expand Up @@ -161,7 +161,7 @@ func TestNotifications_SentKnownPayment(t *testing.T) {
transactionsService := NewTransactionsService(svc.DB)

transactionsService.ConsumeEvent(ctx, &events.Event{
Event: "nwc_payment_sent",
Event: "nwc_lnclient_payment_sent",
Properties: tests.MockLNClientTransaction,
}, map[string]interface{}{})

Expand Down Expand Up @@ -192,7 +192,7 @@ func TestNotifications_SentUnknownPayment(t *testing.T) {
assert.Equal(t, int64(0), result.RowsAffected)

transactionsService.ConsumeEvent(ctx, &events.Event{
Event: "nwc_payment_sent",
Event: "nwc_lnclient_payment_sent",
Properties: tests.MockLNClientTransaction,
}, map[string]interface{}{})

Expand Down Expand Up @@ -221,8 +221,8 @@ func TestNotifications_FailedKnownPayment(t *testing.T) {
transactionsService := NewTransactionsService(svc.DB)

transactionsService.ConsumeEvent(ctx, &events.Event{
Event: "nwc_payment_failed_async",
Properties: &events.PaymentFailedAsyncProperties{
Event: "nwc_lnclient_payment_failed",
Properties: &events.LNClientPaymentFailedAsyncProperties{
Transaction: tests.MockLNClientTransaction,
Reason: "Some failure reason",
},
Expand Down
Loading

0 comments on commit a5bc8c9

Please sign in to comment.