Skip to content

Commit

Permalink
Merge pull request #1810 from statechannels/scAddr-sign-dht-record
Browse files Browse the repository at this point in the history
Sign dht record with scAddr
  • Loading branch information
bitwiseguy authored Oct 11, 2023
2 parents e7e508f + 6be805d commit a0a1dfc
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 50 deletions.
37 changes: 31 additions & 6 deletions node/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package engine // import "github.com/statechannels/go-nitro/node/engine"

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"log/slog"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/statechannels/go-nitro/channel"
"github.com/statechannels/go-nitro/channel/consensus_channel"
"github.com/statechannels/go-nitro/internal/logging"
"github.com/statechannels/go-nitro/node/engine/chainservice"
"github.com/statechannels/go-nitro/node/engine/messageservice"
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
"github.com/statechannels/go-nitro/node/engine/store"
"github.com/statechannels/go-nitro/node/query"
"github.com/statechannels/go-nitro/payments"
Expand Down Expand Up @@ -61,9 +65,10 @@ type Engine struct {
ObjectiveRequestsFromAPI chan protocols.ObjectiveRequest
PaymentRequestsFromAPI chan PaymentRequest

fromChain <-chan chainservice.Event
fromMsg <-chan protocols.Message
fromLedger chan consensus_channel.Proposal
fromChain <-chan chainservice.Event
fromMsg <-chan protocols.Message
fromLedger chan consensus_channel.Proposal
signRequests <-chan p2pms.SignatureRequest

eventHandler func(EngineEvent)

Expand Down Expand Up @@ -136,7 +141,8 @@ func New(vm *payments.VoucherManager, msg messageservice.MessageService, chain c
e.PaymentRequestsFromAPI = make(chan PaymentRequest)

e.fromChain = chain.EventFeed()
e.fromMsg = msg.Out()
e.fromMsg = msg.P2PMessages()
e.signRequests = msg.SignRequests()

e.chain = chain
e.msg = msg
Expand Down Expand Up @@ -191,6 +197,8 @@ func (e *Engine) run(ctx context.Context) {
res, err = e.handleMessage(message)
case proposal := <-e.fromLedger:
res, err = e.handleProposal(proposal)
case signReq := <-e.signRequests:
err = e.handleSignRequest(signReq)
case <-blockTicker.C:
blockNum := e.chain.GetLastConfirmedBlockNum()
err = e.store.SetLastBlockNumSeen(blockNum)
Expand Down Expand Up @@ -225,12 +233,29 @@ func (e *Engine) handleProposal(proposal consensus_channel.Proposal) (EngineEven
return EngineEvent{}, err
}
if obj.GetStatus() == protocols.Completed {
e.logger.Info("Ignoring proposal for complected objective", logging.WithObjectiveIdAttribute(id))
e.logger.Info("Ignoring proposal for completed objective", logging.WithObjectiveIdAttribute(id))
return EngineEvent{}, nil
}
return e.attemptProgress(obj)
}

func (e *Engine) handleSignRequest(sigReq p2pms.SignatureRequest) error {
recordDataBytes, err := json.Marshal(sigReq.Data)
if err != nil {
return err
}

hash := sha256.Sum256(recordDataBytes) // Hash the data before signing it
secretKey := e.store.GetChannelSecretKey()
signature, err := secp256k1.Sign(hash[:], *secretKey)
if err != nil {
return err
}

sigReq.ResponseChan <- signature
return nil
}

// handleMessage handles a Message from a peer go-nitro Wallet.
// It:
// - reads an objective from the store,
Expand Down Expand Up @@ -278,7 +303,7 @@ func (e *Engine) handleMessage(message protocols.Message) (EngineEvent, error) {
}

if objective.GetStatus() == protocols.Completed {
e.logger.Info("Ignoring payload for complected objective", logging.WithObjectiveIdAttribute(objective.Id()))
e.logger.Info("Ignoring payload for completed objective", logging.WithObjectiveIdAttribute(objective.Id()))

continue
}
Expand Down
11 changes: 8 additions & 3 deletions node/engine/messageservice/messageservice.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Package messageservice is a messaging service responsible for routing messages to peers and relaying messages received from peers.
package messageservice // import "github.com/statechannels/go-nitro/node/messageservice"

import "github.com/statechannels/go-nitro/protocols"
import (
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
"github.com/statechannels/go-nitro/protocols"
)

type MessageService interface {
// Out returns a chan for receiving messages from the message service
Out() <-chan protocols.Message
// P2PMessages returns a chan for receiving messages from the message service
P2PMessages() <-chan protocols.Message
// SignRequests returns a chan for receiving signature requests from the message service
SignRequests() <-chan p2pms.SignatureRequest
// Send is for sending messages with the message service
Send(protocols.Message) error
// Close closes the message service
Expand Down
42 changes: 31 additions & 11 deletions node/engine/messageservice/p2p-message-service/dht-record.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package p2pms

import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -23,7 +26,13 @@ type stateChannelAddrToPeerIDValidator struct{}
// dhtRecord represents the data stored in the DHT record
type dhtRecord struct {
Data dhtData
Signature []byte
PeerIdSig []byte
SCAddrSig []byte
}

type SignatureRequest struct {
Data dhtData
ResponseChan chan []byte
}

type dhtData struct {
Expand All @@ -50,30 +59,41 @@ func (v stateChannelAddrToPeerIDValidator) Validate(key string, value []byte) er
return errors.New("record key does not match state channel address")
}

// Check if the value can be parsed into a valid libp2p peer.ID
peerId, err := peer.Decode(dhtRecord.Data.PeerID)
dataBytes, err := json.Marshal(dhtRecord.Data)
if err != nil {
return errors.New("invalid libp2p peer ID")
return err
}

pubKey, err := peerId.ExtractPublicKey()
// Check the scAddr signature to ensure it is the signed hash of dataBytes
hash := sha256.Sum256(dataBytes)
scAddrPubKey, err := secp256k1.RecoverPubkey(hash[:], dhtRecord.SCAddrSig)
if err != nil {
return err
}

dataBytes, err := json.Marshal(dhtRecord.Data)
sigToVerify := dhtRecord.SCAddrSig[:len(dhtRecord.SCAddrSig)-1] // Exclude the 1-byte 'V' field when verifying the signature
valid := crypto.VerifySignature(scAddrPubKey, hash[:], sigToVerify)
if !valid {
return errors.New("invalid scAddr signature")
}

// Check if the value can be parsed into a valid libp2p peer.ID
peerId, err := peer.Decode(dhtRecord.Data.PeerID)
if err != nil {
return err
return errors.New("invalid libp2p peer ID")
}

// Check the signature to ensure it is the signed hash of dataBytes
valid, err := pubKey.Verify(dataBytes, dhtRecord.Signature)
pubKey, err := peerId.ExtractPublicKey()
if err != nil {
return err
}

if !valid {
return errors.New("invalid signature")
// Check the peerId signature to ensure it is the signed hash of dataBytes
valid, err = pubKey.Verify(dataBytes, dhtRecord.PeerIdSig)
if err != nil {
return err
} else if !valid {
return errors.New("invalid peerId signature")
}

return nil
Expand Down
56 changes: 35 additions & 21 deletions node/engine/messageservice/p2p-message-service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ type MessageOpts struct {

// P2PMessageService is a rudimentary message service that uses TCP to send and receive messages.
type P2PMessageService struct {
initComplete chan struct{}
toEngine chan protocols.Message // for forwarding processed messages to the engine
peers *safesync.Map[peer.ID]
initComplete chan struct{}
toEngine chan protocols.Message // for forwarding processed messages to the engine
dhtSignRequests chan SignatureRequest // for forwarding signature requests to the engine
peers *safesync.Map[peer.ID]

scAddr types.Address
privateKey p2pcrypto.PrivKey
p2pHost host.Host
dht *dht.IpfsDHT
newPeerInfo chan basicPeerInfo
Expand All @@ -69,17 +69,15 @@ type P2PMessageService struct {
// NewMessageService returns a running P2PMessageService listening on the given ip, port and message key.
func NewMessageService(opts MessageOpts) *P2PMessageService {
ms := &P2PMessageService{
initComplete: make(chan struct{}, 1),
toEngine: make(chan protocols.Message, BUFFER_SIZE),
newPeerInfo: make(chan basicPeerInfo, BUFFER_SIZE),
peers: &safesync.Map[peer.ID]{},
scAddr: opts.SCAddr,
logger: logging.LoggerWithAddress(slog.Default(), opts.SCAddr),
initComplete: make(chan struct{}, 1),
toEngine: make(chan protocols.Message, BUFFER_SIZE),
dhtSignRequests: make(chan SignatureRequest, 50),
newPeerInfo: make(chan basicPeerInfo, BUFFER_SIZE),
peers: &safesync.Map[peer.ID]{},
scAddr: opts.SCAddr,
logger: logging.LoggerWithAddress(slog.Default(), opts.SCAddr),
}

messageKey, err := p2pcrypto.UnmarshalSecp256k1PrivateKey(opts.PkBytes)
ms.checkError(err)

addressFactory := func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr {
extMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", opts.PublicIp, opts.Port))
if err != nil {
Expand All @@ -90,9 +88,11 @@ func NewMessageService(opts MessageOpts) *P2PMessageService {
return addrs
}

ms.privateKey = messageKey
privateKey, err := p2pcrypto.UnmarshalSecp256k1PrivateKey(opts.PkBytes)
ms.checkError(err)

options := []libp2p.Option{
libp2p.Identity(messageKey),
libp2p.Identity(privateKey),
libp2p.AddrsFactory(addressFactory),
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/%s/tcp/%d", "0.0.0.0", opts.Port)),
libp2p.Transport(tcp.NewTCPTransport),
Expand Down Expand Up @@ -211,8 +211,7 @@ func (ms *P2PMessageService) InitComplete() <-chan struct{} {

// Id returns the libp2p peer ID of the message service.
func (ms *P2PMessageService) Id() peer.ID {
id, _ := peer.IDFromPrivateKey(ms.privateKey)
return id
return ms.p2pHost.ID()
}

// addScaddrDhtRecord adds this node's state channel address to the custom dht namespace
Expand All @@ -227,12 +226,22 @@ func (ms *P2PMessageService) addScaddrDhtRecord(ctx context.Context) {
recordDataBytes, err := json.Marshal(recordData)
ms.checkError(err)

signature, err := ms.privateKey.Sign(recordDataBytes)
sigReq := SignatureRequest{
Data: *recordData,
ResponseChan: make(chan []byte),
}
ms.dhtSignRequests <- sigReq

peerIdSig, err := ms.p2pHost.Peerstore().PrivKey(ms.Id()).Sign(recordDataBytes)
ms.checkError(err)

scAddrSig := <-sigReq.ResponseChan
ms.checkError(err)

fullRecord := &dhtRecord{
Data: *recordData,
Signature: signature,
PeerIdSig: peerIdSig,
SCAddrSig: scAddrSig,
}
fullRecordBytes, err := json.Marshal(fullRecord)
ms.checkError(err)
Expand Down Expand Up @@ -340,11 +349,16 @@ func (ms *P2PMessageService) checkError(err error) {
panic(err)
}

// Out returns a channel that can be used to receive messages from the message service
func (ms *P2PMessageService) Out() <-chan protocols.Message {
// P2PMessages returns a channel that can be used to receive messages from the message service
func (ms *P2PMessageService) P2PMessages() <-chan protocols.Message {
return ms.toEngine
}

// SignRequests returns a channel that can be used to receive signature request messages from the message service
func (ms *P2PMessageService) SignRequests() <-chan SignatureRequest {
return ms.dhtSignRequests
}

// Close closes the P2PMessageService
func (ms *P2PMessageService) Close() error {
ms.p2pHost.RemoveStreamHandler(GENERAL_MSG_PROTOCOL_ID)
Expand Down
21 changes: 14 additions & 7 deletions node/engine/messageservice/test-messageservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
"github.com/statechannels/go-nitro/protocols"
"github.com/statechannels/go-nitro/rand"
"github.com/statechannels/go-nitro/types"
Expand All @@ -22,8 +23,9 @@ type TestMessageService struct {
address types.Address

// connection to Engine:
out chan protocols.Message // for sending message to engine
maxDelay time.Duration // the max delay for messages
out chan protocols.Message // for sending message to engine
signRequests chan p2pms.SignatureRequest // for sending signature requests to engine
maxDelay time.Duration // the max delay for messages

broker Broker
}
Expand All @@ -48,20 +50,25 @@ func NewBroker() Broker {
// Messages will be handled with a random delay between 0 and maxDelay
func NewTestMessageService(address types.Address, broker Broker, maxDelay time.Duration) TestMessageService {
tms := TestMessageService{
address: address,
out: make(chan protocols.Message, 5),
maxDelay: maxDelay,
broker: broker,
address: address,
out: make(chan protocols.Message, 5),
signRequests: make(chan p2pms.SignatureRequest, 5),
maxDelay: maxDelay,
broker: broker,
}

tms.connect(broker)
return tms
}

func (t TestMessageService) Out() <-chan protocols.Message {
func (t TestMessageService) P2PMessages() <-chan protocols.Message {
return t.out
}

func (t TestMessageService) SignRequests() <-chan p2pms.SignatureRequest {
return t.signRequests
}

// dispatchMessage is responsible for dispatching a message to the appropriate peer message service.
// If there is a mean delay it will wait a random amount of time(based on meanDelay) before sending the message.
func (t TestMessageService) dispatchMessage(message protocols.Message) {
Expand Down
2 changes: 1 addition & 1 deletion node/engine/messageservice/test-messageservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var aToB protocols.Message = protocols.CreateSignedProposalMessage(
)

func TestConnect(t *testing.T) {
bobOut := bobMS.Out()
bobOut := bobMS.P2PMessages()

err := aliceMS.Send(aToB)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ Architectural decision records may be viewed [here](./.adr/0000-adrs.md).

## Testing

To run unit tests locally, you will need to generate a TLS certificate. Details are [here](./tls/readme.md).
> Pre-requisite: [generate a TLS certificate](./tls/readme.md)
Run the tests from repo root:

```
go test ./... -count=2 -shuffle=on -timeout 1m -v -failfast
```

## On-chain code

Expand Down

0 comments on commit a0a1dfc

Please sign in to comment.