Skip to content

Commit

Permalink
Merge pull request #93 from getAlby/task-response-received
Browse files Browse the repository at this point in the history
feat: add response received column
  • Loading branch information
im-adithya authored Jul 31, 2024
2 parents 58669eb + 2080d8c commit 341aada
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 34 deletions.
19 changes: 10 additions & 9 deletions internal/nostr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ type Subscription struct {
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *nostr.Event `gorm:"-"`
RequestEventDB RequestEvent `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`

// TODO: fix an elegant solution to store datatypes
IdsString string
Expand Down Expand Up @@ -130,13 +129,15 @@ type OnReceiveEOSFunc func(ctx context.Context, subscription *Subscription)
type HandleEventFunc func(event *nostr.Event, subscription *Subscription)

type RequestEvent struct {
ID uint
SubscriptionId *uint
NostrId string `validate:"required"`
Content string
State string
CreatedAt time.Time
UpdatedAt time.Time
ID uint
SubscriptionId *uint
NostrId string `validate:"required"`
Content string
State string
ResponseReceivedAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
SignedEvent *nostr.Event `gorm:"-"`
}

type ResponseEvent struct {
Expand Down
51 changes: 26 additions & 25 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,9 @@ func (svc *Service) NIP47Handler(c echo.Context) error {
}

requestEvent := RequestEvent{
NostrId: requestData.SignedEvent.ID,
Content: requestData.SignedEvent.Content,
NostrId: requestData.SignedEvent.ID,
Content: requestData.SignedEvent.Content,
SignedEvent: requestData.SignedEvent,
}

if err := svc.db.Create(&requestEvent).Error; err != nil {
Expand All @@ -348,11 +349,7 @@ func (svc *Service) NIP47Handler(c echo.Context) error {
})
}

subscription := svc.prepareNIP47Subscription(NIP47WebhookRequest{
RelayUrl: requestData.RelayUrl,
WalletPubkey: requestData.WalletPubkey,
SignedEvent: requestData.SignedEvent,
}, requestEvent)
subscription := svc.prepareNIP47Subscription(requestData.RelayUrl, requestData.WalletPubkey, "", requestEvent)

ctx, cancel := context.WithTimeout(c.Request().Context(), 90*time.Second)
defer cancel()
Expand Down Expand Up @@ -434,8 +431,9 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
}

requestEvent := RequestEvent{
NostrId: requestData.SignedEvent.ID,
Content: requestData.SignedEvent.Content,
NostrId: requestData.SignedEvent.ID,
Content: requestData.SignedEvent.Content,
SignedEvent: requestData.SignedEvent,
}

if err := svc.db.Create(&requestEvent).Error; err != nil {
Expand All @@ -451,7 +449,7 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
})
}

subscription := svc.prepareNIP47Subscription(requestData, requestEvent)
subscription := svc.prepareNIP47Subscription(requestData.RelayUrl, requestData.WalletPubkey, requestData.WebhookUrl, requestEvent)

ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second)
defer cancel()
Expand All @@ -462,18 +460,17 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
})
}

func (svc *Service) prepareNIP47Subscription(requestData NIP47WebhookRequest, requestEvent RequestEvent) (Subscription) {
func (svc *Service) prepareNIP47Subscription(relayUrl, walletPubkey, webhookUrl string, requestEvent RequestEvent) (Subscription) {
return Subscription{
RelayUrl: requestData.RelayUrl,
WebhookUrl: requestData.WebhookUrl,
RelayUrl: relayUrl,
WebhookUrl: webhookUrl,
Open: true,
Authors: &[]string{requestData.WalletPubkey},
Authors: &[]string{walletPubkey},
Kinds: &[]int{NIP_47_RESPONSE_KIND},
Tags: &nostr.TagMap{"e": []string{requestData.SignedEvent.ID}},
Tags: &nostr.TagMap{"e": []string{requestEvent.NostrId}},
Since: time.Now(),
Limit: 1,
RequestEvent: requestData.SignedEvent,
RequestEventDB: requestEvent,
RequestEvent: &requestEvent,
EventChan: make(chan *nostr.Event, 1),
Uuid: uuid.New().String(),
}
Expand Down Expand Up @@ -742,10 +739,10 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
// Save the request event state and stop the
// subscription if it's an NIP47 request
if (subscription.RequestEvent != nil) {
if (subscription.RequestEventDB.State == "") {
subscription.RequestEventDB.State = REQUEST_EVENT_PUBLISH_FAILED
if (subscription.RequestEvent.State == "") {
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED
}
svc.db.Save(&subscription.RequestEventDB)
svc.db.Save(&subscription.RequestEvent)
// stop the subscription as it is one time
svc.stopSubscription(subscription)
}
Expand All @@ -764,7 +761,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
err := sub.Relay.Publish(ctx, *subscription.RequestEvent)
err := sub.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
Expand All @@ -781,7 +778,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Published request event successfully")
subscription.RequestEventDB.State = REQUEST_EVENT_PUBLISH_CONFIRMED
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_CONFIRMED
}
}

Expand All @@ -795,11 +792,15 @@ func (svc *Service) handleResponseEvent(event *nostr.Event, subscription *Subscr
"client_pubkey": clientPubkey,
"relay_url": subscription.RelayUrl,
}).Info("Received response event")
if (subscription.RequestEvent != nil) {
subscription.RequestEvent.ResponseReceivedAt = time.Now()
svc.db.Save(&subscription.RequestEvent)
}
responseEvent := ResponseEvent{
NostrId: event.ID,
Content: event.Content,
RepliedAt: event.CreatedAt.Time(),
RequestId: &subscription.RequestEventDB.ID,
RequestId: &subscription.RequestEvent.ID,
}
svc.db.Save(&responseEvent)
if subscription.WebhookUrl != "" {
Expand Down Expand Up @@ -952,8 +953,8 @@ func getPubkeys(subscription *Subscription) (string, string) {
clientPubkey := ""

if (subscription.RequestEvent != nil) {
walletPubkey = getWalletPubkey(&subscription.RequestEvent.Tags)
clientPubkey = subscription.RequestEvent.PubKey
walletPubkey = getWalletPubkey(&subscription.RequestEvent.SignedEvent.Tags)
clientPubkey = subscription.RequestEvent.SignedEvent.PubKey
}

return walletPubkey, clientPubkey
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package migrations

import (
"github.com/go-gormigrate/gormigrate/v2"
"gorm.io/gorm"
)

// Add response_received_at column to request_events table
var _202407171220_add_response_received_at_to_request_events = &gormigrate.Migration{
ID: "202407171220_add_response_received_at_to_request_events",
Migrate: func(tx *gorm.DB) error {
if err := tx.Exec("ALTER TABLE request_events ADD COLUMN response_received_at TIMESTAMP NULL").Error; err != nil {
return err
}

// Update response_received_at if there is a corresponding row in response_events
if err := tx.Exec(`
UPDATE request_events SET response_received_at = created_at
`).Error; err != nil {
return err
}

return nil
},
Rollback: func(tx *gorm.DB) error {
if err := tx.Exec("ALTER TABLE request_events DROP COLUMN response_received_at").Error; err != nil {
return err
}
return nil
},
}
1 change: 1 addition & 0 deletions migrations/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func Migrate(db *gorm.DB) error {
_202402161653_initial_migration,
_202404021628_add_uuid_to_subscriptions,
_202404031539_add_indexes,
_202407171220_add_response_received_at_to_request_events,
})

return m.Migrate()
Expand Down

0 comments on commit 341aada

Please sign in to comment.