diff --git a/node/engine/engine.go b/node/engine/engine.go index 042b26ba3..8b8fa1230 100644 --- a/node/engine/engine.go +++ b/node/engine/engine.go @@ -3,6 +3,8 @@ package engine // import "github.com/statechannels/go-nitro/node/engine" import ( "context" + "crypto/sha256" + "encoding/json" "errors" "fmt" "log/slog" @@ -10,11 +12,13 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/statechannels/go-nitro/channel" "github.com/statechannels/go-nitro/channel/consensus_channel" "github.com/statechannels/go-nitro/internal/logging" "github.com/statechannels/go-nitro/node/engine/chainservice" "github.com/statechannels/go-nitro/node/engine/messageservice" + p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" "github.com/statechannels/go-nitro/node/engine/store" "github.com/statechannels/go-nitro/node/query" "github.com/statechannels/go-nitro/payments" @@ -61,9 +65,10 @@ type Engine struct { ObjectiveRequestsFromAPI chan protocols.ObjectiveRequest PaymentRequestsFromAPI chan PaymentRequest - fromChain <-chan chainservice.Event - fromMsg <-chan protocols.Message - fromLedger chan consensus_channel.Proposal + fromChain <-chan chainservice.Event + fromMsg <-chan protocols.Message + fromLedger chan consensus_channel.Proposal + signRequests <-chan p2pms.SignatureRequest eventHandler func(EngineEvent) @@ -136,7 +141,8 @@ func New(vm *payments.VoucherManager, msg messageservice.MessageService, chain c e.PaymentRequestsFromAPI = make(chan PaymentRequest) e.fromChain = chain.EventFeed() - e.fromMsg = msg.Out() + e.fromMsg = msg.P2PMessages() + e.signRequests = msg.SignRequests() e.chain = chain e.msg = msg @@ -191,6 +197,8 @@ func (e *Engine) run(ctx context.Context) { res, err = e.handleMessage(message) case proposal := <-e.fromLedger: res, err = e.handleProposal(proposal) + case signReq := <-e.signRequests: + err = e.handleSignRequest(signReq) case <-blockTicker.C: blockNum := e.chain.GetLastConfirmedBlockNum() err = e.store.SetLastBlockNumSeen(blockNum) @@ -225,12 +233,29 @@ func (e *Engine) handleProposal(proposal consensus_channel.Proposal) (EngineEven return EngineEvent{}, err } if obj.GetStatus() == protocols.Completed { - e.logger.Info("Ignoring proposal for complected objective", logging.WithObjectiveIdAttribute(id)) + e.logger.Info("Ignoring proposal for completed objective", logging.WithObjectiveIdAttribute(id)) return EngineEvent{}, nil } return e.attemptProgress(obj) } +func (e *Engine) handleSignRequest(sigReq p2pms.SignatureRequest) error { + recordDataBytes, err := json.Marshal(sigReq.Data) + if err != nil { + return err + } + + hash := sha256.Sum256(recordDataBytes) // Hash the data before signing it + secretKey := e.store.GetChannelSecretKey() + signature, err := secp256k1.Sign(hash[:], *secretKey) + if err != nil { + return err + } + + sigReq.ResponseChan <- signature + return nil +} + // handleMessage handles a Message from a peer go-nitro Wallet. // It: // - reads an objective from the store, @@ -278,7 +303,7 @@ func (e *Engine) handleMessage(message protocols.Message) (EngineEvent, error) { } if objective.GetStatus() == protocols.Completed { - e.logger.Info("Ignoring payload for complected objective", logging.WithObjectiveIdAttribute(objective.Id())) + e.logger.Info("Ignoring payload for completed objective", logging.WithObjectiveIdAttribute(objective.Id())) continue } diff --git a/node/engine/messageservice/messageservice.go b/node/engine/messageservice/messageservice.go index 92349d36a..9b0ce50b3 100644 --- a/node/engine/messageservice/messageservice.go +++ b/node/engine/messageservice/messageservice.go @@ -1,11 +1,16 @@ // Package messageservice is a messaging service responsible for routing messages to peers and relaying messages received from peers. package messageservice // import "github.com/statechannels/go-nitro/node/messageservice" -import "github.com/statechannels/go-nitro/protocols" +import ( + p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" + "github.com/statechannels/go-nitro/protocols" +) type MessageService interface { - // Out returns a chan for receiving messages from the message service - Out() <-chan protocols.Message + // P2PMessages returns a chan for receiving messages from the message service + P2PMessages() <-chan protocols.Message + // SignRequests returns a chan for receiving signature requests from the message service + SignRequests() <-chan p2pms.SignatureRequest // Send is for sending messages with the message service Send(protocols.Message) error // Close closes the message service diff --git a/node/engine/messageservice/p2p-message-service/dht-record.go b/node/engine/messageservice/p2p-message-service/dht-record.go index 5336584b2..11b290ab9 100644 --- a/node/engine/messageservice/p2p-message-service/dht-record.go +++ b/node/engine/messageservice/p2p-message-service/dht-record.go @@ -1,6 +1,7 @@ package p2pms import ( + "crypto/sha256" "encoding/json" "errors" "fmt" @@ -8,6 +9,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/libp2p/go-libp2p/core/peer" ) @@ -23,7 +26,13 @@ type stateChannelAddrToPeerIDValidator struct{} // dhtRecord represents the data stored in the DHT record type dhtRecord struct { Data dhtData - Signature []byte + PeerIdSig []byte + SCAddrSig []byte +} + +type SignatureRequest struct { + Data dhtData + ResponseChan chan []byte } type dhtData struct { @@ -50,30 +59,41 @@ func (v stateChannelAddrToPeerIDValidator) Validate(key string, value []byte) er return errors.New("record key does not match state channel address") } - // Check if the value can be parsed into a valid libp2p peer.ID - peerId, err := peer.Decode(dhtRecord.Data.PeerID) + dataBytes, err := json.Marshal(dhtRecord.Data) if err != nil { - return errors.New("invalid libp2p peer ID") + return err } - pubKey, err := peerId.ExtractPublicKey() + // Check the scAddr signature to ensure it is the signed hash of dataBytes + hash := sha256.Sum256(dataBytes) + scAddrPubKey, err := secp256k1.RecoverPubkey(hash[:], dhtRecord.SCAddrSig) if err != nil { return err } - dataBytes, err := json.Marshal(dhtRecord.Data) + sigToVerify := dhtRecord.SCAddrSig[:len(dhtRecord.SCAddrSig)-1] // Exclude the 1-byte 'V' field when verifying the signature + valid := crypto.VerifySignature(scAddrPubKey, hash[:], sigToVerify) + if !valid { + return errors.New("invalid scAddr signature") + } + + // Check if the value can be parsed into a valid libp2p peer.ID + peerId, err := peer.Decode(dhtRecord.Data.PeerID) if err != nil { - return err + return errors.New("invalid libp2p peer ID") } - // Check the signature to ensure it is the signed hash of dataBytes - valid, err := pubKey.Verify(dataBytes, dhtRecord.Signature) + pubKey, err := peerId.ExtractPublicKey() if err != nil { return err } - if !valid { - return errors.New("invalid signature") + // Check the peerId signature to ensure it is the signed hash of dataBytes + valid, err = pubKey.Verify(dataBytes, dhtRecord.PeerIdSig) + if err != nil { + return err + } else if !valid { + return errors.New("invalid peerId signature") } return nil diff --git a/node/engine/messageservice/p2p-message-service/service.go b/node/engine/messageservice/p2p-message-service/service.go index 5f80ce0fe..1617f9bab 100644 --- a/node/engine/messageservice/p2p-message-service/service.go +++ b/node/engine/messageservice/p2p-message-service/service.go @@ -52,12 +52,12 @@ type MessageOpts struct { // P2PMessageService is a rudimentary message service that uses TCP to send and receive messages. type P2PMessageService struct { - initComplete chan struct{} - toEngine chan protocols.Message // for forwarding processed messages to the engine - peers *safesync.Map[peer.ID] + initComplete chan struct{} + toEngine chan protocols.Message // for forwarding processed messages to the engine + dhtSignRequests chan SignatureRequest // for forwarding signature requests to the engine + peers *safesync.Map[peer.ID] scAddr types.Address - privateKey p2pcrypto.PrivKey p2pHost host.Host dht *dht.IpfsDHT newPeerInfo chan basicPeerInfo @@ -69,17 +69,15 @@ type P2PMessageService struct { // NewMessageService returns a running P2PMessageService listening on the given ip, port and message key. func NewMessageService(opts MessageOpts) *P2PMessageService { ms := &P2PMessageService{ - initComplete: make(chan struct{}, 1), - toEngine: make(chan protocols.Message, BUFFER_SIZE), - newPeerInfo: make(chan basicPeerInfo, BUFFER_SIZE), - peers: &safesync.Map[peer.ID]{}, - scAddr: opts.SCAddr, - logger: logging.LoggerWithAddress(slog.Default(), opts.SCAddr), + initComplete: make(chan struct{}, 1), + toEngine: make(chan protocols.Message, BUFFER_SIZE), + dhtSignRequests: make(chan SignatureRequest, 50), + newPeerInfo: make(chan basicPeerInfo, BUFFER_SIZE), + peers: &safesync.Map[peer.ID]{}, + scAddr: opts.SCAddr, + logger: logging.LoggerWithAddress(slog.Default(), opts.SCAddr), } - messageKey, err := p2pcrypto.UnmarshalSecp256k1PrivateKey(opts.PkBytes) - ms.checkError(err) - addressFactory := func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { extMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", opts.PublicIp, opts.Port)) if err != nil { @@ -90,9 +88,11 @@ func NewMessageService(opts MessageOpts) *P2PMessageService { return addrs } - ms.privateKey = messageKey + privateKey, err := p2pcrypto.UnmarshalSecp256k1PrivateKey(opts.PkBytes) + ms.checkError(err) + options := []libp2p.Option{ - libp2p.Identity(messageKey), + libp2p.Identity(privateKey), libp2p.AddrsFactory(addressFactory), libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/%s/tcp/%d", "0.0.0.0", opts.Port)), libp2p.Transport(tcp.NewTCPTransport), @@ -211,8 +211,7 @@ func (ms *P2PMessageService) InitComplete() <-chan struct{} { // Id returns the libp2p peer ID of the message service. func (ms *P2PMessageService) Id() peer.ID { - id, _ := peer.IDFromPrivateKey(ms.privateKey) - return id + return ms.p2pHost.ID() } // addScaddrDhtRecord adds this node's state channel address to the custom dht namespace @@ -227,12 +226,22 @@ func (ms *P2PMessageService) addScaddrDhtRecord(ctx context.Context) { recordDataBytes, err := json.Marshal(recordData) ms.checkError(err) - signature, err := ms.privateKey.Sign(recordDataBytes) + sigReq := SignatureRequest{ + Data: *recordData, + ResponseChan: make(chan []byte), + } + ms.dhtSignRequests <- sigReq + + peerIdSig, err := ms.p2pHost.Peerstore().PrivKey(ms.Id()).Sign(recordDataBytes) + ms.checkError(err) + + scAddrSig := <-sigReq.ResponseChan ms.checkError(err) fullRecord := &dhtRecord{ Data: *recordData, - Signature: signature, + PeerIdSig: peerIdSig, + SCAddrSig: scAddrSig, } fullRecordBytes, err := json.Marshal(fullRecord) ms.checkError(err) @@ -340,11 +349,16 @@ func (ms *P2PMessageService) checkError(err error) { panic(err) } -// Out returns a channel that can be used to receive messages from the message service -func (ms *P2PMessageService) Out() <-chan protocols.Message { +// P2PMessages returns a channel that can be used to receive messages from the message service +func (ms *P2PMessageService) P2PMessages() <-chan protocols.Message { return ms.toEngine } +// SignRequests returns a channel that can be used to receive signature request messages from the message service +func (ms *P2PMessageService) SignRequests() <-chan SignatureRequest { + return ms.dhtSignRequests +} + // Close closes the P2PMessageService func (ms *P2PMessageService) Close() error { ms.p2pHost.RemoveStreamHandler(GENERAL_MSG_PROTOCOL_ID) diff --git a/node/engine/messageservice/test-messageservice.go b/node/engine/messageservice/test-messageservice.go index 0bef212a0..dc87649c0 100644 --- a/node/engine/messageservice/test-messageservice.go +++ b/node/engine/messageservice/test-messageservice.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" "github.com/statechannels/go-nitro/protocols" "github.com/statechannels/go-nitro/rand" "github.com/statechannels/go-nitro/types" @@ -22,8 +23,9 @@ type TestMessageService struct { address types.Address // connection to Engine: - out chan protocols.Message // for sending message to engine - maxDelay time.Duration // the max delay for messages + out chan protocols.Message // for sending message to engine + signRequests chan p2pms.SignatureRequest // for sending signature requests to engine + maxDelay time.Duration // the max delay for messages broker Broker } @@ -48,20 +50,25 @@ func NewBroker() Broker { // Messages will be handled with a random delay between 0 and maxDelay func NewTestMessageService(address types.Address, broker Broker, maxDelay time.Duration) TestMessageService { tms := TestMessageService{ - address: address, - out: make(chan protocols.Message, 5), - maxDelay: maxDelay, - broker: broker, + address: address, + out: make(chan protocols.Message, 5), + signRequests: make(chan p2pms.SignatureRequest, 5), + maxDelay: maxDelay, + broker: broker, } tms.connect(broker) return tms } -func (t TestMessageService) Out() <-chan protocols.Message { +func (t TestMessageService) P2PMessages() <-chan protocols.Message { return t.out } +func (t TestMessageService) SignRequests() <-chan p2pms.SignatureRequest { + return t.signRequests +} + // dispatchMessage is responsible for dispatching a message to the appropriate peer message service. // If there is a mean delay it will wait a random amount of time(based on meanDelay) before sending the message. func (t TestMessageService) dispatchMessage(message protocols.Message) { diff --git a/node/engine/messageservice/test-messageservice_test.go b/node/engine/messageservice/test-messageservice_test.go index a907789e4..90aab0858 100644 --- a/node/engine/messageservice/test-messageservice_test.go +++ b/node/engine/messageservice/test-messageservice_test.go @@ -26,7 +26,7 @@ var aToB protocols.Message = protocols.CreateSignedProposalMessage( ) func TestConnect(t *testing.T) { - bobOut := bobMS.Out() + bobOut := bobMS.P2PMessages() err := aliceMS.Send(aToB) if err != nil { diff --git a/readme.md b/readme.md index 01ae73e04..4a5db7d15 100644 --- a/readme.md +++ b/readme.md @@ -31,7 +31,13 @@ Architectural decision records may be viewed [here](./.adr/0000-adrs.md). ## Testing -To run unit tests locally, you will need to generate a TLS certificate. Details are [here](./tls/readme.md). +> Pre-requisite: [generate a TLS certificate](./tls/readme.md) + +Run the tests from repo root: + +``` +go test ./... -count=2 -shuffle=on -timeout 1m -v -failfast +``` ## On-chain code