Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove all mutexes and store in subscription struct #133

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions internal/nostr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,31 @@ const (
)

type Subscription struct {
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`
RelaySubscription *nostr.Subscription `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
}

func (s *Subscription) BeforeSave(tx *gorm.DB) error {
Expand Down
65 changes: 35 additions & 30 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type Service struct {
Relay *nostr.Relay
Cfg *Config
Logger *logrus.Logger
subscriptions map[string]*nostr.Subscription
subscriptionsMutex sync.Mutex
relayMutex sync.Mutex
client *expo.PushClient
subCancelFnMap map[string]context.CancelFunc
}

func NewService(ctx context.Context) (*Service, error) {
Expand Down Expand Up @@ -116,8 +116,6 @@ func NewService(ctx context.Context) (*Service, error) {
return nil, err
}

subscriptions := make(map[string]*nostr.Subscription)

client := expo.NewPushClient(&expo.ClientConfig{
Host: "https://api.expo.dev",
APIURL: "/v2",
Expand All @@ -131,7 +129,6 @@ func NewService(ctx context.Context) (*Service, error) {
Wg: &wg,
Logger: logger,
Relay: relay,
subscriptions: subscriptions,
client: client,
}

Expand All @@ -142,7 +139,7 @@ func NewService(ctx context.Context) (*Service, error) {
logger.WithError(err).Error("Failed to query open subscriptions")
return nil, err
}

cancelFnMap := make(map[string]context.CancelFunc)
for _, sub := range openSubscriptions {
// Create a copy of the loop variable to
// avoid passing address of the same variable
Expand All @@ -151,8 +148,11 @@ func NewService(ctx context.Context) (*Service, error) {
if sub.PushToken != "" {
handleEvent = svc.handleSubscribedEventForPushNotification
}
go svc.startSubscription(svc.Ctx, &subscription, nil, handleEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
cancelFnMap[subscription.Uuid] = subCancelFn
go svc.startSubscription(subCtx, &subscription, nil, handleEvent)
}
svc.subCancelFnMap = cancelFnMap

return svc, nil
}
Expand Down Expand Up @@ -569,7 +569,11 @@ func (svc *Service) NIP47NotificationHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent)

return c.JSON(http.StatusOK, SubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down Expand Up @@ -628,7 +632,11 @@ func (svc *Service) SubscriptionHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent)

return c.JSON(http.StatusOK, SubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down Expand Up @@ -686,14 +694,17 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {

func (svc *Service) stopSubscription(subscription *Subscription) error {
svc.subscriptionsMutex.Lock()
sub, exists := svc.subscriptions[subscription.Uuid]
cancelFn, exists := svc.subCancelFnMap[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
if exists {
sub.Unsub()
delete(svc.subscriptions, subscription.Uuid)
cancelFn()
}

if subscription.RelaySubscription != nil {
subscription.RelaySubscription.Unsub()
}
svc.subscriptionsMutex.Unlock()

if (!exists && !subscription.Open) {
if (!subscription.Open) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why is this check not at the start of the method?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because it only applies for .e.g subscribing to notifications?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ✅

return errors.New(SUBSCRIPTION_ALREADY_CLOSED)
}

Expand Down Expand Up @@ -739,7 +750,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
continue
}

sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
relaySubscription, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
if err != nil {
// TODO: notify user about subscription failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
Expand All @@ -751,9 +762,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
continue
}

svc.subscriptionsMutex.Lock()
svc.subscriptions[subscription.Uuid] = sub
svc.subscriptionsMutex.Unlock()
subscription.RelaySubscription = relaySubscription

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
Expand Down Expand Up @@ -791,10 +800,8 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subscription) {
walletPubkey, clientPubkey := getPubkeys(subscription)

svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
err := sub.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
relaySubscription := subscription.RelaySubscription
err := relaySubscription.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
Expand All @@ -804,7 +811,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
"client_pubkey": clientPubkey,
}).Error("Failed to publish to relay")
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED
sub.Unsub()
relaySubscription.Unsub()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should stopSubscription be called here? or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did that because it's directly available, but good to keep it consistent 👍 will change to stopSubscription

} else {
svc.Logger.WithFields(logrus.Fields{
"request_event_id": subscription.RequestEvent.NostrId,
Expand Down Expand Up @@ -860,15 +867,13 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs
}

func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error {
svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
relaySubscription := subscription.RelaySubscription

go func(){
// block till EOS is received for nip 47 handlers
// only if request event is not yet published
if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) {
<-sub.EndOfStoredEvents
<-relaySubscription.EndOfStoredEvents
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
Expand All @@ -878,7 +883,7 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}

// loop through incoming events
for event := range sub.Events {
for event := range relaySubscription.Events {
go handleEvent(event, subscription)
}

Expand All @@ -889,11 +894,11 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}()

select {
case <-sub.Relay.Context().Done():
return sub.Relay.ConnectionError
case <-relaySubscription.Relay.Context().Done():
return relaySubscription.Relay.ConnectionError
case <-ctx.Done():
return nil
case <-sub.Context.Done():
case <-relaySubscription.Context.Done():
return nil
}
}
Expand Down
7 changes: 6 additions & 1 deletion internal/nostr/push.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nostr

import (
"context"
"net/http"
"time"

Expand Down Expand Up @@ -107,7 +108,11 @@ func (svc *Service) NIP47PushNotificationHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEventForPushNotification)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEventForPushNotification)

return c.JSON(http.StatusOK, PushSubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down
Loading