From 65f317896821037f912d3a534053fb028018624e Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 25 Aug 2022 13:46:29 +0300 Subject: [PATCH 1/9] header: add stream deadlines --- header/p2p/exchange.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 55abceaec4..52f219719f 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -154,6 +154,9 @@ func (ex *Exchange) performRequest( } // read responses headers := make([]*header.ExtendedHeader, req.Amount) + if err = stream.SetReadDeadline(time.Now().Add(readDeadline * time.Duration(req.Amount))); err != nil { + log.Warn(err) + } for i := 0; i < int(req.Amount); i++ { resp := new(header_pb.ExtendedHeader) if err = stream.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { From 18992edda241167b3510a5e6487f6a89440d2ced Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 29 Aug 2022 13:33:31 +0300 Subject: [PATCH 2/9] chore: reset deadlines --- header/p2p/exchange.go | 3 --- header/p2p/server.go | 4 ++++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 52f219719f..55abceaec4 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -154,9 +154,6 @@ func (ex *Exchange) performRequest( } // read responses headers := make([]*header.ExtendedHeader, req.Amount) - if err = stream.SetReadDeadline(time.Now().Add(readDeadline * time.Duration(req.Amount))); err != nil { - log.Warn(err) - } for i := 0; i < int(req.Amount); i++ { resp := new(header_pb.ExtendedHeader) if err = stream.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { diff --git a/header/p2p/server.go b/header/p2p/server.go index 44bde05b35..671fa954eb 100644 --- a/header/p2p/server.go +++ b/header/p2p/server.go @@ -160,5 +160,9 @@ func (serv *ExchangeServer) handleRequest(from, to uint64, stream network.Stream stream.Reset() //nolint:errcheck return } + + if err = stream.SetWriteDeadline(time.Time{}); err != nil { + log.Warnf("error resetting deadline: %s", err) + } } } From 91999487fb5843c2b8c025b114577d266b06ac36 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 6 Sep 2022 13:29:01 +0300 Subject: [PATCH 3/9] chore: change log level --- header/p2p/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/header/p2p/server.go b/header/p2p/server.go index 671fa954eb..5b6b790947 100644 --- a/header/p2p/server.go +++ b/header/p2p/server.go @@ -162,7 +162,7 @@ func (serv *ExchangeServer) handleRequest(from, to uint64, stream network.Stream } if err = stream.SetWriteDeadline(time.Time{}); err != nil { - log.Warnf("error resetting deadline: %s", err) + log.Debugf("error resetting deadline: %s", err) } } } From 023f287af0ab148c64a24f9379bb9ff749c8437b Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 19 Sep 2022 13:34:38 +0300 Subject: [PATCH 4/9] chore: do not reset stream deadline after read/write operation --- header/p2p/server.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/header/p2p/server.go b/header/p2p/server.go index 5b6b790947..44bde05b35 100644 --- a/header/p2p/server.go +++ b/header/p2p/server.go @@ -160,9 +160,5 @@ func (serv *ExchangeServer) handleRequest(from, to uint64, stream network.Stream stream.Reset() //nolint:errcheck return } - - if err = stream.SetWriteDeadline(time.Time{}); err != nil { - log.Debugf("error resetting deadline: %s", err) - } } } From 6de5aa0b7bbcd590d0fffbb1dbfa5768f9bca3fc Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 21 Sep 2022 16:48:03 +0300 Subject: [PATCH 5/9] header/p2p: request head from multiple peers --- header/p2p/exchange.go | 72 ++++++++++++++++++++++++++++++++++--- header/p2p/exchange_test.go | 60 +++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 5 deletions(-) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 55abceaec4..70c90413ac 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "math/rand" + "sort" "time" logging "github.com/ipfs/go-log/v2" @@ -61,11 +62,34 @@ func (ex *Exchange) Head(ctx context.Context) (*header.ExtendedHeader, error) { Data: &p2p_pb.ExtendedHeaderRequest_Origin{Origin: uint64(0)}, Amount: 1, } - headers, err := ex.performRequest(ctx, req) - if err != nil { - return nil, err + + headerCh := make(chan *header.ExtendedHeader) + for _, from := range ex.trustedPeers { + go func(from peer.ID) { + headers, err := doRequest(ctx, from, ex.host, req) + if err != nil { + log.Errorw("head from trusted peer failed", "trustedPeer", from, "err", err) + headerCh <- nil + return + } + // doRequest ensures that the result slice will have at least one ExtendedHeader + headerCh <- headers[0] + }(from) + } + + result := make([]*header.ExtendedHeader, 0) + for range ex.trustedPeers { + select { + case h := <-headerCh: + if h != nil { + result = append(result, h) + } + case <-ctx.Done(): + return nil, ctx.Err() + } } - return headers[0], nil + + return parseHeads(result) } // GetByHeight performs a request for the ExtendedHeader at the given @@ -135,7 +159,17 @@ func (ex *Exchange) performRequest( //nolint:gosec // G404: Use of weak random number generator index := rand.Intn(len(ex.trustedPeers)) - stream, err := ex.host.NewStream(ctx, ex.trustedPeers[index], exchangeProtocolID) + return doRequest(ctx, ex.trustedPeers[index], ex.host, req) +} + +// doRequest sends the ExtendedHeaderRequest to a remote peer. +func doRequest( + ctx context.Context, + to peer.ID, + host host.Host, + req *p2p_pb.ExtendedHeaderRequest, +) ([]*header.ExtendedHeader, error) { + stream, err := host.NewStream(ctx, to, exchangeProtocolID) if err != nil { return nil, err } @@ -178,3 +212,31 @@ func (ex *Exchange) performRequest( } return headers, stream.Close() } + +// parseHeads chooses ExtendedHeader that matches the conditions: +// * should have max height among received; +// * should be received at least from 2 peers; +// If both conditions are not met, then ExtendedHeader with the biggest Height will be returned. +func parseHeads(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) { + if len(result) == 0 { + return nil, header.ErrNotFound + } + counter := make(map[string]int) + // go through all of ExtendedHeaders and count the number of headers with a specific hash + for i := 0; i < len(result); i++ { + counter[result[i].Hash().String()]++ + } + // sort results in a decreasing order + sort.Slice(result, func(i, j int) bool { + return result[i].Height > result[j].Height + }) + + // try to find ExtendedHeader with the maximum height that was received at least from 2 peers + for _, res := range result { + if counter[res.Hash().String()] >= 2 { + return res, nil + } + } + // otherwise return header with the max height + return result[0], nil +} diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index a2c28d6194..a8a64eadb9 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -96,6 +96,66 @@ func TestExchange_RequestByHash(t *testing.T) { assert.Equal(t, store.headers[reqHeight].Hash(), eh.Hash()) } +func Test_parseHeads(t *testing.T) { + gen := func() []*header.ExtendedHeader { + suite := header.NewTestSuite(t, 3) + res := make([]*header.ExtendedHeader, 0) + for i := 0; i < 3; i++ { + res = append(res, suite.GenExtendedHeader()) + } + return res + } + testCases := []struct { + precondition func() []*header.ExtendedHeader + expectedHeight int64 + }{ + /* + Height -> Amount + headerHeight[0]=1 -> 1 + headerHeight[1]=2 -> 1 + headerHeight[2]=3 -> 1 + result -> headerHeight[2] + */ + {precondition: gen, expectedHeight: 3}, + /* + Height -> Amount + headerHeight[0]=1 -> 2 + headerHeight[1]=2 -> 1 + headerHeight[2]=3 -> 1 + result -> headerHeight[0] + */ + {precondition: func() []*header.ExtendedHeader { + res := gen() + res = append(res, res[0]) + return res + }, + expectedHeight: 1, + }, + /* + Height -> Amount + headerHeight[0]=1 -> 3 + headerHeight[1]=2 -> 2 + headerHeight[2]=3 -> 1 + result -> headerHeight[1] + */ + {precondition: func() []*header.ExtendedHeader { + res := gen() + res = append(res, res[0]) + res = append(res, res[0]) + res = append(res, res[1]) + return res + }, + expectedHeight: 2, + }, + } + for _, tt := range testCases { + res := tt.precondition() + header, err := parseHeads(res) + require.NoError(t, err) + require.True(t, header.Height == tt.expectedHeight) + } +} + func createMocknet(t *testing.T) (libhost.Host, libhost.Host) { net, err := mocknet.FullMeshConnected(2) require.NoError(t, err) From fe14cbe212c20c8e17c8ddc8f7f7273764714aca Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 22 Sep 2022 12:28:57 +0300 Subject: [PATCH 6/9] chore: rename methods --- header/p2p/exchange.go | 26 +++++++++++++++----------- header/p2p/exchange_test.go | 33 +++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 70c90413ac..97d91577d7 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -9,7 +9,6 @@ import ( "time" logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -64,11 +63,12 @@ func (ex *Exchange) Head(ctx context.Context) (*header.ExtendedHeader, error) { } headerCh := make(chan *header.ExtendedHeader) + // request head from each trusted peer for _, from := range ex.trustedPeers { go func(from peer.ID) { - headers, err := doRequest(ctx, from, ex.host, req) + headers, err := request(ctx, from, ex.host, req) if err != nil { - log.Errorw("head from trusted peer failed", "trustedPeer", from, "err", err) + log.Errorw("head request to trusted peer failed", "trustedPeer", from, "err", err) headerCh <- nil return } @@ -89,7 +89,7 @@ func (ex *Exchange) Head(ctx context.Context) (*header.ExtendedHeader, error) { } } - return parseHeads(result) + return bestHead(result) } // GetByHeight performs a request for the ExtendedHeader at the given @@ -159,11 +159,11 @@ func (ex *Exchange) performRequest( //nolint:gosec // G404: Use of weak random number generator index := rand.Intn(len(ex.trustedPeers)) - return doRequest(ctx, ex.trustedPeers[index], ex.host, req) + return request(ctx, ex.trustedPeers[index], ex.host, req) } -// doRequest sends the ExtendedHeaderRequest to a remote peer. -func doRequest( +// request sends the ExtendedHeaderRequest to a remote peer. +func request( ctx context.Context, to peer.ID, host host.Host, @@ -206,18 +206,21 @@ func doRequest( headers[i] = header } + if err = stream.Close(); err != nil { + log.Errorw("while closing stream", err) + } // ensure at least one header was retrieved if len(headers) == 0 { return nil, header.ErrNotFound } - return headers, stream.Close() + return headers, nil } -// parseHeads chooses ExtendedHeader that matches the conditions: +// bestHead chooses ExtendedHeader that matches the conditions: // * should have max height among received; // * should be received at least from 2 peers; -// If both conditions are not met, then ExtendedHeader with the biggest Height will be returned. -func parseHeads(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) { +// If neither condition is met, then latest ExtendedHeader will be returned (header of the highest height). +func bestHead(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) { if len(result) == 0 { return nil, header.ErrNotFound } @@ -237,6 +240,7 @@ func parseHeads(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) return res, nil } } + log.Debug("could not find header received from at least two peers.Returning header with the max height") // otherwise return header with the max height return result[0], nil } diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index a8a64eadb9..d5e63e0b27 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -116,7 +116,10 @@ func Test_parseHeads(t *testing.T) { headerHeight[2]=3 -> 1 result -> headerHeight[2] */ - {precondition: gen, expectedHeight: 3}, + { + precondition: gen, + expectedHeight: 3, + }, /* Height -> Amount headerHeight[0]=1 -> 2 @@ -124,11 +127,12 @@ func Test_parseHeads(t *testing.T) { headerHeight[2]=3 -> 1 result -> headerHeight[0] */ - {precondition: func() []*header.ExtendedHeader { - res := gen() - res = append(res, res[0]) - return res - }, + { + precondition: func() []*header.ExtendedHeader { + res := gen() + res = append(res, res[0]) + return res + }, expectedHeight: 1, }, /* @@ -138,19 +142,20 @@ func Test_parseHeads(t *testing.T) { headerHeight[2]=3 -> 1 result -> headerHeight[1] */ - {precondition: func() []*header.ExtendedHeader { - res := gen() - res = append(res, res[0]) - res = append(res, res[0]) - res = append(res, res[1]) - return res - }, + { + precondition: func() []*header.ExtendedHeader { + res := gen() + res = append(res, res[0]) + res = append(res, res[0]) + res = append(res, res[1]) + return res + }, expectedHeight: 2, }, } for _, tt := range testCases { res := tt.precondition() - header, err := parseHeads(res) + header, err := bestHead(res) require.NoError(t, err) require.True(t, header.Height == tt.expectedHeight) } From 81c029e23c7c845033ac06ed7aad186be757fe10 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Thu, 29 Sep 2022 13:52:50 +0300 Subject: [PATCH 7/9] chore: apply suggestions Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> --- header/p2p/exchange.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 97d91577d7..085bd838cd 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -207,7 +207,7 @@ func request( headers[i] = header } if err = stream.Close(); err != nil { - log.Errorw("while closing stream", err) + log.Errorw("closing stream", "err", err) } // ensure at least one header was retrieved if len(headers) == 0 { @@ -240,7 +240,7 @@ func bestHead(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) { return res, nil } } - log.Debug("could not find header received from at least two peers.Returning header with the max height") + log.Debug("could not find latest header received from at least two peers, returning header with the max height") // otherwise return header with the max height return result[0], nil } From 514e326971b02c5b8e648e7eb9975cee9714efb4 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 10 Oct 2022 14:00:36 +0300 Subject: [PATCH 8/9] chore: apply @Wondertan suggestions --- header/p2p/exchange.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 085bd838cd..db221e0aa6 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -29,6 +29,8 @@ const ( writeDeadline = time.Second * 5 // readDeadline sets timeout for reading messages from the stream readDeadline = time.Minute + // minimum amount of responses with the same ExtendedHeader + minResponses = 2 ) // PubSubTopic hardcodes the name of the ExtendedHeader @@ -77,7 +79,7 @@ func (ex *Exchange) Head(ctx context.Context) (*header.ExtendedHeader, error) { }(from) } - result := make([]*header.ExtendedHeader, 0) + result := make([]*header.ExtendedHeader, 0, len(ex.trustedPeers)) for range ex.trustedPeers { select { case h := <-headerCh: @@ -226,8 +228,8 @@ func bestHead(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) { } counter := make(map[string]int) // go through all of ExtendedHeaders and count the number of headers with a specific hash - for i := 0; i < len(result); i++ { - counter[result[i].Hash().String()]++ + for _, res := range result { + counter[res.Hash().String()]++ } // sort results in a decreasing order sort.Slice(result, func(i, j int) bool { @@ -236,7 +238,7 @@ func bestHead(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) { // try to find ExtendedHeader with the maximum height that was received at least from 2 peers for _, res := range result { - if counter[res.Hash().String()] >= 2 { + if counter[res.Hash().String()] >= minResponses { return res, nil } } From ccf6d62cea85aa927edf09238551175348fc90f3 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 11 Oct 2022 12:07:30 +0300 Subject: [PATCH 9/9] chore: apply @renaynay suggestions --- header/p2p/exchange.go | 2 +- header/p2p/exchange_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index db221e0aa6..a488fc6f97 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -29,7 +29,7 @@ const ( writeDeadline = time.Second * 5 // readDeadline sets timeout for reading messages from the stream readDeadline = time.Minute - // minimum amount of responses with the same ExtendedHeader + // the target minimum amount of responses with the same chain head minResponses = 2 ) diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index d5e63e0b27..af07e6ac12 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -96,7 +96,7 @@ func TestExchange_RequestByHash(t *testing.T) { assert.Equal(t, store.headers[reqHeight].Hash(), eh.Hash()) } -func Test_parseHeads(t *testing.T) { +func Test_bestHead(t *testing.T) { gen := func() []*header.ExtendedHeader { suite := header.NewTestSuite(t, 3) res := make([]*header.ExtendedHeader, 0)