Skip to content

Commit

Permalink
U online (#111)
Browse files Browse the repository at this point in the history
* add online

* update NewDHT

* update node
  • Loading branch information
AstaFrode authored Apr 18, 2024
1 parent 283f36f commit 029ddb6
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 32 deletions.
90 changes: 58 additions & 32 deletions core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type P2P interface {
// GetProtocolVersion returns the ProtocolVersion of the host
GetProtocolVersion() string

//
GetProtocolPrefix() string

// GetDhtProtocolVersion returns the host's DHT ProtocolVersion
GetDhtProtocolVersion() string

Expand Down Expand Up @@ -333,12 +336,27 @@ func NewPeerNode(ctx context.Context, cfg *config.Config) (*PeerNode, error) {
return nil, fmt.Errorf("[NewDHT] %v", err)
}

peer_node.dir, err = mkdir(cfg.Workspace)
if err != nil {
return nil, err
}
if len(boots) > 0 {
peer_node.dir, err = mkdir(cfg.Workspace)
if err != nil {
return nil, err
}

peer_node.initProtocol(cfg.ProtocolPrefix)
peer_node.initProtocol(cfg.ProtocolPrefix)

for _, v := range boots {
bootstrapAddr, err := ma.NewMultiaddr(v)
if err != nil {
continue
}
peerinfo, err := peer.AddrInfoFromP2pAddr(bootstrapAddr)
if err != nil {
continue
}
peer_node.Connect(ctx, *peerinfo)
peer_node.OnlineAction(peerinfo.ID)
}
}

return peer_node, nil
}
Expand Down Expand Up @@ -438,6 +456,10 @@ func (n *PeerNode) GetProtocolVersion() string {
return n.protocolVersion
}

func (n *PeerNode) GetProtocolPrefix() string {
return n.protocolPrefix
}

func (n *PeerNode) GetDhtProtocolVersion() string {
return n.dhtProtocolVersion
}
Expand Down Expand Up @@ -641,6 +663,7 @@ func (n *PeerNode) initProtocol(protocolPrefix string) {
n.ReadFileProtocol = n.NewReadFileProtocol()
n.ReadDataProtocol = n.NewReadDataProtocol()
n.ReadDataStatProtocol = n.NewReadDataStatProtocol()
n.OnlineProtocol = n.NewOnlineProtocol()
}

func NewDHT(ctx context.Context, h host.Host, bucketsize int, version string, boot_nodes []string, protocolPrefix, dhtProtocol string) (*dht.IpfsDHT, string, string, error) {
Expand Down Expand Up @@ -669,36 +692,39 @@ func NewDHT(ctx context.Context, h host.Host, bucketsize int, version string, bo
return nil, "", "", err
}

netenv := ""
for _, peerAddr := range boot_nodes {
bootstrapAddr, err := ma.NewMultiaddr(peerAddr)
if err != nil {
continue
}
peerinfo, err := peer.AddrInfoFromP2pAddr(bootstrapAddr)
if err != nil {
continue
}
err = h.Connect(ctx, *peerinfo)
if err == nil {
out.Ok(fmt.Sprintf("Connect to the boot node: %s", peerinfo.ID.String()))
switch peerinfo.ID.String() {
case "12D3KooWS8a18xoBzwkmUsgGBctNo6QCr6XCpUDR946mTBBUTe83",
"12D3KooWDWeiiqbpNGAqA5QbDTdKgTtwX8LCShWkTpcyxpRf2jA9",
"12D3KooWNcTWWuUWKhjTVDF1xZ38yCoHXoF4aDjnbjsNpeVwj33U":
netenv = "testnet"
case "12D3KooWGDk9JJ5F6UPNuutEKSbHrTXnF5eSn3zKaR27amgU6o9S",
"12D3KooWEGeAp1MvvUrBYQtb31FE1LPg7aHsd1LtTXn6cerZTBBd",
"12D3KooWRm2sQg65y2ZgCUksLsjWmKbBtZ4HRRsGLxbN76XTtC8T":
netenv = "devnet"
default:
netenv = "mainnet"
if len(boot_nodes) > 0 {
netenv := ""
for _, peerAddr := range boot_nodes {
bootstrapAddr, err := ma.NewMultiaddr(peerAddr)
if err != nil {
continue
}
peerinfo, err := peer.AddrInfoFromP2pAddr(bootstrapAddr)
if err != nil {
continue
}
err = h.Connect(ctx, *peerinfo)
if err == nil {
out.Ok(fmt.Sprintf("Connect to the boot node: %s", peerinfo.ID.String()))
switch peerinfo.ID.String() {
case "12D3KooWS8a18xoBzwkmUsgGBctNo6QCr6XCpUDR946mTBBUTe83",
"12D3KooWDWeiiqbpNGAqA5QbDTdKgTtwX8LCShWkTpcyxpRf2jA9",
"12D3KooWNcTWWuUWKhjTVDF1xZ38yCoHXoF4aDjnbjsNpeVwj33U":
netenv = "testnet"
case "12D3KooWGDk9JJ5F6UPNuutEKSbHrTXnF5eSn3zKaR27amgU6o9S",
"12D3KooWEGeAp1MvvUrBYQtb31FE1LPg7aHsd1LtTXn6cerZTBBd",
"12D3KooWRm2sQg65y2ZgCUksLsjWmKbBtZ4HRRsGLxbN76XTtC8T":
netenv = "devnet"
default:
netenv = "mainnet"
}
return kademliaDHT, bootstrapAddr.String(), netenv, nil
}

return kademliaDHT, bootstrapAddr.String(), netenv, nil
}
} else {
return kademliaDHT, "", "", nil
}
return kademliaDHT, "", netenv, fmt.Errorf("failed to connect to all boot nodes")
return kademliaDHT, "", "", fmt.Errorf("failed to connect to all boot nodes")
}

func buildPrimaryResourceManager() (network.ResourceManager, error) {
Expand Down
133 changes: 133 additions & 0 deletions core/online.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright (C) CESS. All rights reserved.
Copyright (C) Cumulus Encrypted Storage System. All rights reserved.
SPDX-License-Identifier: Apache-2.0
*/

package core

import (
"errors"
"io"
"sync"
"time"

"github.com/CESSProject/p2p-go/pb"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"google.golang.org/protobuf/proto"
)

// pattern: /protocol-name/request-or-response-message/version
const OnlineRequest = "/online/req/v0"
const OnlineResponse = "/online/resp/v0"

type onlineResp struct {
ch chan bool
*pb.MessageData
}

type OnlineProtocol struct { // local host
*PeerNode
*sync.Mutex
requests map[string]*onlineResp // determine whether it is your own response
}

func (n *PeerNode) NewOnlineProtocol() *OnlineProtocol {
e := OnlineProtocol{PeerNode: n, Mutex: new(sync.Mutex), requests: make(map[string]*onlineResp)}
n.SetStreamHandler(protocol.ID(n.protocolPrefix+OnlineResponse), e.onOnlineResponse)
return &e
}

func (e *protocols) OnlineAction(id peer.ID) error {
var err error
var ok bool
// create message data
req := &pb.MessageData{
Id: uuid.New().String(),
NodeId: e.OnlineProtocol.ID().String(),
}

// store request so response handler has access to it
respChan := make(chan bool, 1)

e.OnlineProtocol.Lock()
for {
if _, ok := e.OnlineProtocol.requests[req.Id]; ok {
req.Id = uuid.New().String()
continue
}
e.OnlineProtocol.requests[req.Id] = &onlineResp{
ch: respChan,
MessageData: &pb.MessageData{
Id: req.Id,
},
}
break
}
e.OnlineProtocol.Unlock()

defer func() {
e.OnlineProtocol.Lock()
delete(e.OnlineProtocol.requests, req.Id)
close(respChan)
e.OnlineProtocol.Unlock()
}()

timeout := time.NewTicker(P2PWriteReqRespTime)
defer timeout.Stop()

err = e.OnlineProtocol.SendProtoMessage(id, protocol.ID(e.ProtocolPrefix+OnlineRequest), req)
if err != nil {
return err
}

// wait response
timeout.Reset(P2PWriteReqRespTime)
select {
case ok = <-respChan:
if !ok {
return errors.New(ERR_RespFailure)
}
return nil
case <-timeout.C:
return errors.New(ERR_RespTimeOut)
}
}

// remote peer response handler
func (e *OnlineProtocol) onOnlineResponse(s network.Stream) {
defer s.Close()

data := &pb.MessageData{}
buf, err := io.ReadAll(s)
if err != nil {
s.Reset()
return
}

// unmarshal it
err = proto.Unmarshal(buf, data)
if err != nil {
s.Reset()
return
}

if data.NodeId == "" {
s.Reset()
return
}

// locate request data and remove it if found
e.OnlineProtocol.Lock()
defer e.OnlineProtocol.Unlock()

_, ok := e.requests[data.Id]
if ok {
e.requests[data.Id].ch <- true
}
}
2 changes: 2 additions & 0 deletions core/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Protocol interface {
ReadFileAction(id peer.ID, roothash, datahash, path string, size int64) error
ReadDataAction(id peer.ID, roothash, datahash, path string, size int64) error
ReadDataStatAction(id peer.ID, roothash string, datahash string) (uint64, error)
OnlineAction(id peer.ID) error
}

type protocols struct {
Expand All @@ -24,6 +25,7 @@ type protocols struct {
*ReadFileProtocol
*ReadDataProtocol
*ReadDataStatProtocol
*OnlineProtocol
}

func NewProtocol() *protocols {
Expand Down
58 changes: 58 additions & 0 deletions examples/online/example_online.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright (C) CESS. All rights reserved.
Copyright (C) Cumulus Encrypted Storage System. All rights reserved.
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"context"
"flag"
"fmt"
"log"
"time"

p2pgo "github.com/CESSProject/p2p-go"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)

func main() {
ctx := context.Background()
sourcePort1 := flag.Int("p1", 15000, "Source port number")
// To construct a simple host with all the default settings, just use `New`
h1, err := p2pgo.New(
ctx,
p2pgo.PrivatekeyFile(".private1"),
p2pgo.ListenPort(*sourcePort1), // regular tcp connections
p2pgo.Workspace("."),
p2pgo.BootPeers([]string{"_dnsaddr.boot-bucket-devnet.cess.cloud"}),
)
if err != nil {
log.Println("[p2pgo.New]", err)
return
}
defer h1.Close()

log.Println("node1:", h1.Addrs(), h1.ID())

remote := "/ip4/127.0.0.1/tcp/8001/p2p/12D3KooWGDk9JJ5F6UPNuutEKSbHrTXnF5eSn3zKaR27amgU6o9S"

maddr, err := ma.NewMultiaddr(remote)
if err != nil {
log.Println("[ma.NewMultiaddr]", err)
return
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
log.Println("[peer.AddrInfoFromP2pAddr]", err)
return
}
h1.Peerstore().AddAddr(info.ID, maddr, time.Hour)

err = h1.OnlineAction(info.ID)
fmt.Println(err)
}

0 comments on commit 029ddb6

Please sign in to comment.