Skip to content

Commit

Permalink
feat_: log error and stacktrace when panic in goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
qfrank committed Sep 23, 2024
1 parent 821481f commit a70c582
Show file tree
Hide file tree
Showing 24 changed files with 61 additions and 0 deletions.
2 changes: 2 additions & 0 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
)

type filterArgument struct {
Expand Down Expand Up @@ -74,6 +75,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m

for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanicAndRethrow()
for envelope := range subscriptionDetails.C {
send(instance, "message", toSubscriptionMessage(envelope))
}
Expand Down
2 changes: 2 additions & 0 deletions library/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// RelayEnoughPeers determines if there are enough peers to publish a message on a topic
Expand Down Expand Up @@ -66,6 +67,7 @@ func relaySubscribe(instance *WakuInstance, filterJSON string) error {

for _, sub := range subscriptions {
go func(subscription *relay.Subscription) {
defer utils.LogOnPanicAndRethrow()
for envelope := range subscription.Ch {
send(instance, "message", toSubscriptionMessage(envelope))
}
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -213,12 +214,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails
go func(subDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanicAndRethrow()
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
for env := range subDetails.C {
apiSub.DataCh <- env
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanicAndRethrow()
select {
case <-apiSub.ctx.Done():
return
Expand Down
4 changes: 4 additions & 0 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -102,6 +103,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.C = c

go func() {
defer utils.LogOnPanicAndRethrow()
t := time.NewTicker(m.params.interval)
defer t.Stop()

Expand All @@ -123,6 +125,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
default:
semaphore <- struct{}{}
go func(interest criteriaInterest) {
defer utils.LogOnPanicAndRethrow()
m.fetchHistory(c, interest)
<-semaphore
}(interest)
Expand Down Expand Up @@ -276,6 +279,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,

wg.Add(1)
go func(messageHashes []pb.MessageHash) {
defer utils.LogOnPanicAndRethrow()
defer wg.Wait()

result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/api/publish/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// MessagePriority determines the ordering for the message priority queue
Expand Down Expand Up @@ -182,6 +183,7 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
ch := make(chan *protocol.Envelope)

go func() {
defer utils.LogOnPanicAndRethrow()
defer close(ch)

select {
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.WaitGroup().Add(1)
go func() {
defer utils.LogOnPanicAndRethrow()
defer d.WaitGroup().Done()
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
}()
Expand Down Expand Up @@ -217,6 +218,7 @@ func (d *DiscoveryV5) start() error {
if d.params.autoFindPeers {
d.WaitGroup().Add(1)
go func() {
defer utils.LogOnPanicAndRethrow()
defer d.WaitGroup().Done()
d.runDiscoveryV5Loop(d.Context())
}()
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/discv5/mock_peer_discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// TestPeerDiscoverer is mock peer discoverer for testing
Expand All @@ -26,6 +27,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer {
// Subscribe is for subscribing to peer discoverer
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) {
go func() {
defer utils.LogOnPanicAndRethrow()
for {
select {
case <-ctx.Done():
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/node/localnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -358,6 +359,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
}

go func() {
defer utils.LogOnPanicAndRethrow()
defer evtRelaySubscribed.Close()
defer evtRelayUnsubscribed.Close()

Expand Down Expand Up @@ -411,6 +413,7 @@ func (w *WakuNode) registerAndMonitorReachability(ctx context.Context) {
}
w.wg.Add(1)
go func() {
defer utils.LogOnPanicAndRethrow()
defer myEventSub.Close()
defer w.wg.Done()

Expand Down
1 change: 1 addition & 0 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
r := make(chan peer.AddrInfo)
go func() {
defer utils.LogOnPanicAndRethrow()
defer close(r)
for ; numPeers != 0; numPeers-- {
select {
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/peermanager/fastest_peer_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -69,9 +70,11 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic
pinged := make(map[peer.ID]struct{})

go func() {
defer utils.LogOnPanicAndRethrow()
// Ping any peer with no latency recorded
for peerToPing := range pingCh {
go func(p peer.ID) {
defer utils.LogOnPanicAndRethrow()
defer wg.Done()
rtt := time.Hour
result, err := r.PingPeer(ctx, p)
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"

"go.uber.org/zap"

Expand Down Expand Up @@ -104,6 +105,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan servic
// if running start a goroutine to consume the subscription
c.WaitGroup().Add(1)
go func() {
defer utils.LogOnPanicAndRethrow()
defer c.WaitGroup().Done()
c.consumeSubscription(subscription{ctx, ch})
}()
Expand Down Expand Up @@ -187,6 +189,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions {
c.WaitGroup().Add(1)
go func(s subscription) {
defer utils.LogOnPanicAndRethrow()
defer c.WaitGroup().Done()
c.consumeSubscription(s)
}(subs)
Expand Down
5 changes: 5 additions & 0 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -127,6 +128,7 @@ func (wf *WakuFilterLightNode) Stop() {
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
if wf.subscriptions.Count() > 0 {
go func() {
defer utils.LogOnPanicAndRethrow()
defer func() {
_ = recover()
}()
Expand Down Expand Up @@ -414,6 +416,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
for i, peerID := range selectedPeers {
wg.Add(1)
go func(index int, ID peer.ID) {
defer utils.LogOnPanicAndRethrow()
defer wg.Done()
err := wf.request(
reqCtx,
Expand Down Expand Up @@ -565,6 +568,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
// send unsubscribe request to all the peers
for peerID := range peers {
go func(peerID peer.ID) {
defer utils.LogOnPanicAndRethrow()
defer func() {
if params.wg != nil {
params.wg.Done()
Expand Down Expand Up @@ -687,6 +691,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
}
for peerId := range peers {
go func(peerID peer.ID) {
defer utils.LogOnPanicAndRethrow()
defer func() {
if params.wg != nil {
params.wg.Done()
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
logger.Debug("pushing message to light node")
wf.WaitGroup().Add(1)
go func(subscriber peer.ID) {
defer utils.LogOnPanicAndRethrow()
defer wf.WaitGroup().Done()
start := time.Now()
err := wf.pushMessage(ctx, logger, subscriber, envelope)
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/protocol/legacy_store/waku_store_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)

func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.WakuMessage, *pb.PagingInfo, error) {
Expand Down Expand Up @@ -162,6 +163,7 @@ func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
defer store.wg.Done()
for envelope := range store.MsgC.Ch {
go func(env *protocol.Envelope) {
defer utils.LogOnPanicAndRethrow()
_ = store.storeMessage(env)
}(envelope)
}
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
for i, peerID := range params.selectedPeers {
wg.Add(1)
go func(index int, id peer.ID) {
defer utils.LogOnPanicAndRethrow()
paramsValue := *params
paramsValue.requestID = protocol.GenerateRequestID()
defer wg.Done()
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/protocol/metadata/waku_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -225,6 +226,7 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) {
// Connected is called when a connection is opened
func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) {
go func() {
defer utils.LogOnPanicAndRethrow()
wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer()))
// Metadata verification is done only if a clusterID is specified
if wakuM.clusterID == 0 {
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/protocol/noise/pairing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

n "github.com/waku-org/go-noise"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -210,6 +211,7 @@ func (p *Pairing) initiatorHandshake(ctx context.Context, msgCh <-chan *pb.WakuM
doneCh = make(chan error, 1)

go func() {
defer utils.LogOnPanicAndRethrow()
defer close(doneCh)
// The handshake initiator writes a Waku2 payload v2 containing the handshake message
// and the (encrypted) transport message
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/protocol/noise/pairing_relay_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -67,6 +68,7 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic
}

go func() {
defer utils.LogOnPanicAndRethrow()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -97,6 +99,7 @@ func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-c
r.subscriptionChPerContentTopic[contentTopic] = subscriptionCh

go func() {
defer utils.LogOnPanicAndRethrow()
for {
select {
case <-ctx.Done():
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -154,6 +155,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(discoveredPeers)))
wakuPX.WaitGroup().Add(1)
go func() {
defer utils.LogOnPanicAndRethrow()
defer wakuPX.WaitGroup().Done()

peerCh := make(chan service.PeerData)
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/protocol/relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,6 +57,7 @@ func newMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics {
// RecordMessage is used to increase the counter for the number of messages received via waku relay
func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) {
go func() {
defer utils.LogOnPanicAndRethrow()
payloadSizeInBytes := len(envelope.Message().Payload)
payloadSizeInKb := float64(payloadSizeInBytes) / 1000
messageSize.Observe(payloadSizeInKb)
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont

subscriptions = append(subscriptions, subscription)
go func() {
defer utils.LogOnPanicAndRethrow()
<-ctx.Done()
subscription.Unsubscribe()
}()
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/rendezvous/rendezvous.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -158,6 +159,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string
for _, m := range rendezvousPoints {
r.WaitGroup().Add(1)
go func(m *RendezvousPoint) {
defer utils.LogOnPanicAndRethrow()
r.WaitGroup().Done()

rendezvousClient := rvs.NewRendezvousClient(r.host, m.id)
Expand Down
Loading

0 comments on commit a70c582

Please sign in to comment.