Skip to content

Commit

Permalink
Implement mailserver Deliver() with MessagesRequest (#1653)
Browse files Browse the repository at this point in the history
The main difference is that it takes `whisper.MessagesRequest` as an argument instead of `whisper.Envelope`.
  • Loading branch information
Adam Babik authored Nov 4, 2019
1 parent c19f07f commit 183d626
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 89 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/status-im/migrate/v4 v4.6.2-status.2
github.com/status-im/rendezvous v1.3.0
github.com/status-im/status-protocol-go v0.4.4-0.20191102185313-c99310ece510
github.com/status-im/whisper v1.5.1
github.com/status-im/whisper v1.5.2
github.com/stretchr/testify v1.4.0
github.com/syndtr/goleveldb v1.0.0
go.uber.org/zap v1.10.0
Expand Down
47 changes: 2 additions & 45 deletions go.sum

Large diffs are not rendered by default.

173 changes: 155 additions & 18 deletions mailserver/mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type WMailServer struct {
cleaner *dbCleaner // removes old envelopes
}

var _ whisper.MailServer = (*WMailServer)(nil)

// Init initializes mailServer.
func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) error {
if len(config.DataDir) == 0 {
Expand Down Expand Up @@ -195,19 +197,19 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
return
}

requestID := request.Hash().String()
requestID := request.Hash()
peerID := peerIDString(peer)

log.Info("[mailserver:DeliverMail] delivering mail",
"peerID", peerID,
"requestID", requestID)
"requestID", requestID.String())

if s.exceedsPeerRequests(peer.ID()) {
deliveryFailuresCounter.WithLabelValues("peer_req_limit").Inc()
log.Error("[mailserver:DeliverMail] peer exceeded the limit",
"peerID", peerID,
"requestID", requestID)
s.trySendHistoricMessageErrorResponse(peer, request, fmt.Errorf("rate limit exceeded"))
"requestID", requestID.String())
s.trySendHistoricMessageErrorResponse(peer, requestID, fmt.Errorf("rate limit exceeded"))
return
}

Expand All @@ -231,7 +233,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
log.Debug("[mailserver:DeliverMail] failed to decode request",
"err", err,
"peerID", peerID,
"requestID", requestID)
"requestID", requestID.String())
lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request)
}

Expand All @@ -243,15 +245,15 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
deliveryFailuresCounter.WithLabelValues("validation").Inc()
log.Error("[mailserver:DeliverMail] request failed validaton",
"peerID", peerID,
"requestID", requestID,
"requestID", requestID.String(),
"err", err)
s.trySendHistoricMessageErrorResponse(peer, request, err)
s.trySendHistoricMessageErrorResponse(peer, requestID, err)
return
}

log.Info("[mailserver:DeliverMail] processing request",
"peerID", peerID,
"requestID", requestID,
"requestID", requestID.String(),
"lower", lower,
"upper", upper,
"bloom", bloom,
Expand Down Expand Up @@ -292,7 +294,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
close(errCh)
log.Info("[mailserver:DeliverMail] finished sending bundles",
"peerID", peerID,
"requestID", requestID,
"requestID", requestID.String(),
"counter", counter)
}()

Expand All @@ -301,7 +303,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
bloom,
int(limit),
processRequestTimeout,
requestID,
requestID.String(),
bundles,
cancelProcessing,
)
Expand All @@ -313,7 +315,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
"err", err,
"peerID", peerID,
"requestID", requestID)
s.trySendHistoricMessageErrorResponse(peer, request, err)
s.trySendHistoricMessageErrorResponse(peer, requestID, err)
return
}

Expand All @@ -324,7 +326,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
"err", err,
"peerID", peerID,
"requestID", requestID)
s.trySendHistoricMessageErrorResponse(peer, request, err)
s.trySendHistoricMessageErrorResponse(peer, requestID, err)
return
}

Expand All @@ -334,14 +336,149 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
"last", lastEnvelopeHash,
"next", nextPageCursor)

if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil {
if err := s.sendHistoricMessageResponse(peer, request.Hash(), lastEnvelopeHash, nextPageCursor); err != nil {
deliveryFailuresCounter.WithLabelValues("historic_msg_resp").Inc()
log.Error("[mailserver:DeliverMail] error sending historic message response",
"err", err,
"peerID", peerID,
"requestID", requestID)
// we still want to try to report error even it it is a p2p error and it is unlikely
s.trySendHistoricMessageErrorResponse(peer, request, err)
s.trySendHistoricMessageErrorResponse(peer, requestID, err)
}
}

func (s *WMailServer) Deliver(peer *whisper.Peer, r whisper.MessagesRequest) {
timer := prom.NewTimer(mailDeliveryDuration)
defer timer.ObserveDuration()

deliveryAttemptsCounter.Inc()

var (
requestIDHash = common.BytesToHash(r.ID)
requestIDStr = requestIDHash.String()
peerID = peerIDString(peer)
err error
)

defer func() {
if err != nil {
log.Error("[mailserver:DeliverMail] failed to process",
"err", err,
"peerID", peerID,
"requestID", requestIDStr,
)
s.trySendHistoricMessageErrorResponse(peer, requestIDHash, err)
}
}()

log.Info("[mailserver:Deliver] delivering mail", "peerID", peerID, "requestID", requestIDStr)

if peer == nil {
deliveryFailuresCounter.WithLabelValues("no_peer_set").Inc()
log.Error("[mailserver:Deliver] peer is nil")
return
}

if s.exceedsPeerRequests(peer.ID()) {
deliveryFailuresCounter.WithLabelValues("peer_req_limit").Inc()
err = errors.New("exceeded the number of requests limit")
return
}

err = r.Validate()
if err != nil {
deliveryFailuresCounter.WithLabelValues("validation").Inc()
err = fmt.Errorf("invalid request: %v", err)
return
}

var (
lower, upper = r.From, r.To
bloom = r.Bloom
limit = r.Limit
cursor = r.Cursor
batch = true // batch requests are default
)

log.Info("[mailserver:Deliver] processing request",
"peerID", peerID,
"requestID", requestIDStr,
"lower", lower,
"upper", upper,
"bloom", bloom,
"limit", limit,
"cursor", cursor,
"batch", batch,
)
requestsBatchedCounter.Inc()

iter, err := s.createIterator(lower, upper, cursor, bloom, limit)
if err != nil {
err = fmt.Errorf("failed to create an iterator: %v", err)
return
}
defer iter.Release()

bundles := make(chan []rlp.RawValue, 5)
errCh := make(chan error, 1)
cancelProcessing := make(chan struct{})

go func() {
counter := 0
for bundle := range bundles {
if err := s.sendRawEnvelopes(peer, bundle, batch); err != nil {
close(cancelProcessing)
errCh <- err
break
}
counter++
}
close(errCh)
log.Info("[mailserver:DeliverMail] finished sending bundles",
"peerID", peerID,
"requestID", requestIDStr,
"counter", counter,
)
}()

nextPageCursor, lastEnvelopeHash := s.processRequestInBundles(
iter,
bloom,
int(limit),
processRequestTimeout,
requestIDStr,
bundles,
cancelProcessing,
)

// Wait for the goroutine to finish the work. It may return an error.
err = <-errCh
if err != nil {
deliveryFailuresCounter.WithLabelValues("process").Inc()
err = fmt.Errorf("failed to send envelopes: %v", err)
return
}

// Processing of the request could be finished earlier due to iterator error.
err = iter.Error()
if err != nil {
deliveryFailuresCounter.WithLabelValues("iterator").Inc()
err = fmt.Errorf("failed to read all envelopes: %v", err)
return
}

log.Info("[mailserver:Deliver] sending historic message response",
"peerID", peerID,
"requestID", requestIDStr,
"last", lastEnvelopeHash,
"next", nextPageCursor,
)

err = s.sendHistoricMessageResponse(peer, requestIDHash, lastEnvelopeHash, nextPageCursor)
if err != nil {
deliveryFailuresCounter.WithLabelValues("historic_msg_resp").Inc()
err = fmt.Errorf("failed to send response: %v", err)
return
}
}

Expand Down Expand Up @@ -620,14 +757,14 @@ func (s *WMailServer) sendRawEnvelopes(peer *whisper.Peer, envelopes []rlp.RawVa
return nil
}

func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor []byte) error {
payload := whisper.CreateMailServerRequestCompletedPayload(request.Hash(), lastEnvelopeHash, cursor)
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, requestID, lastEnvelopeHash common.Hash, cursor []byte) error {
payload := whisper.CreateMailServerRequestCompletedPayload(requestID, lastEnvelopeHash, cursor)
return s.w.SendHistoricMessageResponse(peer, payload)
}

// this method doesn't return an error because it is already in the error handling chain
func (s *WMailServer) trySendHistoricMessageErrorResponse(peer *whisper.Peer, request *whisper.Envelope, errorToReport error) {
payload := whisper.CreateMailServerRequestFailedPayload(request.Hash(), errorToReport)
func (s *WMailServer) trySendHistoricMessageErrorResponse(peer *whisper.Peer, requestID common.Hash, errorToReport error) {
payload := whisper.CreateMailServerRequestFailedPayload(requestID, errorToReport)

err := s.w.SendHistoricMessageResponse(peer, payload)
// if we can't report an error, probably something is wrong with p2p connection,
Expand Down
8 changes: 0 additions & 8 deletions mailserver/mailserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,6 @@ func (s *MailserverSuite) TestInit() {
expectedError: errDirectoryNotProvided,
info: "config with empty DataDir",
},
{
config: params.WhisperConfig{
DataDir: "/invalid-path",
MailServerPassword: "pwd",
},
expectedError: errors.New("open DB: mkdir /invalid-path: permission denied"),
info: "config with an unexisting DataDir",
},
{
config: params.WhisperConfig{
DataDir: s.config.DataDir,
Expand Down
57 changes: 52 additions & 5 deletions vendor/github.com/status-im/whisper/whisperv6/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 183d626

Please sign in to comment.