Skip to content

Commit

Permalink
ensure correct order of Stop
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Dec 12, 2024
1 parent 5ece33e commit 6ae4ae4
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,29 @@ func NewSubscriber[H header.Header[H]](
// Start starts the Subscriber and joins the instance's topic. SetVerifier must
// be called separately to ensure a validator is mounted on the topic.
func (s *Subscriber[H]) Start(context.Context) (err error) {
log.Infow("joining topic", "topic ID", s.pubsubTopicID)
log.Debugf("joining topic", "topic ID", s.pubsubTopicID)
err = s.pubsub.RegisterTopicValidator(s.pubsubTopicID, s.verifyMessage)
if err != nil {
return err
}

s.topic, err = s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID))
topic, err := s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID))
if err != nil {
return err
}

s.topic = topic
return err
}

// Stop closes the topic and unregisters its validator.
func (s *Subscriber[H]) Stop(context.Context) error {
regErr := s.pubsub.UnregisterTopicValidator(s.pubsubTopicID)
if regErr != nil {
// do not return this error as it is non-critical and usually
// means that a validator was not mounted.
log.Warnf("unregistering validator: %s", regErr)
}

err := s.topic.Close()
return errors.Join(err, s.metrics.Close())
func (s *Subscriber[H]) Stop(context.Context) (err error) {
err = errors.Join(err, s.metrics.Close())
// we must close the topic first and then unregister the validator
// this ensures we never get a message after the validator is unregistered
err = errors.Join(err, s.topic.Close())
err = errors.Join(err, s.pubsub.UnregisterTopicValidator(s.pubsubTopicID))
return err
}

// SetVerifier set given verification func as Header PubSub topic validator.
Expand Down

0 comments on commit 6ae4ae4

Please sign in to comment.