From bd0a92a724bdf07fd4470a3e88fa054b09216461 Mon Sep 17 00:00:00 2001 From: AstaFrode Date: Wed, 19 Jul 2023 16:08:07 +0800 Subject: [PATCH] Base readf (#61) * add ReadDataAction * add example_readdata --- core/node.go | 1 + core/protocol.go | 4 +- core/readdata.go | 299 ++++++++++++++++++++++++++ core/readfile.go | 12 ++ core/utils.go | 20 ++ examples/readdata/example_readdata.go | 73 +++++++ 6 files changed, 407 insertions(+), 2 deletions(-) create mode 100644 core/readdata.go create mode 100644 examples/readdata/example_readdata.go diff --git a/core/node.go b/core/node.go index 25a91c2..e2fecc0 100644 --- a/core/node.go +++ b/core/node.go @@ -714,6 +714,7 @@ func (n *Node) initProtocol(protocolPrefix string) { n.FileProtocol = n.NewFileProtocol() n.AggrProofProtocol = n.NewAggrProofProtocol() n.PushTagProtocol = n.NewPushTagProtocol() + n.ReadDataProtocol = n.NewReadDataProtocol() } func (n *Node) initDHT() error { diff --git a/core/protocol.go b/core/protocol.go index dda5b87..12f5f6a 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -15,12 +15,12 @@ import ( type Protocol interface { WriteFileAction(id peer.ID, roothash, path string) error ReadFileAction(id peer.ID, roothash, datahash, path string, size int64) error + ReadDataAction(id peer.ID, roothash, datahash, path string, size int64) error TagPushReq(peerid peer.ID) (uint32, error) IdleReq(peerId peer.ID, filesize, blocknum uint64, pubkey, sign []byte) (uint32, error) TagReq(peerId peer.ID, filename, customdata string, blocknum uint64) (uint32, error) FileReq(peerId peer.ID, filehash string, filetype pb.FileType, fpath string) (uint32, error) AggrProofReq(peerId peer.ID, ihash, shash []byte, qslice []*pb.Qslice, puk, sign []byte) (uint32, error) - // add other protocols here... } type protocols struct { @@ -32,7 +32,7 @@ type protocols struct { *FileProtocol *AggrProofProtocol *PushTagProtocol - // add other protocols here... + *ReadDataProtocol } func NewProtocol() *protocols { diff --git a/core/readdata.go b/core/readdata.go new file mode 100644 index 0000000..da73d58 --- /dev/null +++ b/core/readdata.go @@ -0,0 +1,299 @@ +/* + Copyright (C) CESS. All rights reserved. + Copyright (C) Cumulus Encrypted Storage System. All rights reserved. + + SPDX-License-Identifier: Apache-2.0 +*/ + +package core + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "sync" + "time" + + "github.com/CESSProject/p2p-go/pb" + "github.com/pkg/errors" + + "github.com/gogo/protobuf/proto" + "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +// pattern: /protocol-name/request-or-response-message/version +const readDataRequest = "/data/readreq/v0" +const readDataResponse = "/data/readresp/v0" + +type readDataResp struct { + ch chan bool + *pb.ReadfileResponse +} + +type ReadDataProtocol struct { + *Node // local host + *sync.Mutex + requests map[string]*readDataResp // determine whether it is your own response +} + +func (n *Node) NewReadDataProtocol() *ReadDataProtocol { + e := ReadDataProtocol{Node: n, Mutex: new(sync.Mutex), requests: make(map[string]*readDataResp)} + n.SetStreamHandler(protocol.ID(n.protocolPrefix+readDataRequest), e.onReadDataRequest) + n.SetStreamHandler(protocol.ID(n.protocolPrefix+readDataResponse), e.onReadDataResponse) + return &e +} + +func (e *protocols) ReadDataAction(id peer.ID, roothash, datahash, path string, size int64) error { + var ok bool + var err error + var offset int64 + var num int + var fstat fs.FileInfo + var f *os.File + var req pb.ReadfileRequest + + if size <= 0 { + return errors.New("invalid size") + } + + fstat, err = os.Stat(path) + if err == nil { + if fstat.IsDir() { + return fmt.Errorf("%s is a directory", path) + } + if fstat.Size() < size { + offset = fstat.Size() + } else if fstat.Size() == size { + return nil + } else { + f, err := os.Open(path) + if err != nil { + return err + } + defer func() { + if f != nil { + f.Close() + } + }() + newpath := filepath.Join(filepath.Dir(path), uuid.New().String()) + new_f, err := os.Create(newpath) + if err != nil { + return err + } + defer func() { + if new_f != nil { + new_f.Close() + } + }() + + _, err = io.CopyN(new_f, f, size) + if err != nil { + return err + } + f.Close() + f = nil + new_f.Close() + new_f = nil + return nil + } + } + f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm) + if err != nil { + return errors.Wrapf(err, "[open file]") + } + defer f.Close() + + req.Roothash = roothash + req.Datahash = datahash + req.MessageData = e.ReadDataProtocol.NewMessageData(uuid.New().String(), false) + + // store request so response handler has access to it + var respChan = make(chan bool, 1) + e.ReadDataProtocol.Lock() + for { + if _, ok := e.ReadDataProtocol.requests[req.MessageData.Id]; ok { + req.MessageData.Id = uuid.New().String() + continue + } + e.ReadDataProtocol.requests[req.MessageData.Id] = &readDataResp{ + ch: respChan, + } + break + } + e.ReadDataProtocol.Unlock() + defer func() { + e.ReadDataProtocol.Lock() + delete(e.ReadDataProtocol.requests, req.MessageData.Id) + close(respChan) + e.ReadDataProtocol.Unlock() + }() + timeout := time.NewTicker(P2PReadReqRespTime) + defer timeout.Stop() + + for { + req.Offset = offset + + err = e.ReadDataProtocol.SendProtoMessage(id, protocol.ID(e.ProtocolPrefix+readDataRequest), &req) + if err != nil { + return errors.Wrapf(err, "[SendProtoMessage]") + } + + // + timeout.Reset(P2PReadReqRespTime) + select { + case ok = <-respChan: + if !ok { + return errors.New(ERR_RespFailure) + } + case <-timeout.C: + return errors.New(ERR_RespTimeOut) + } + + e.ReadDataProtocol.Lock() + resp, ok := e.ReadDataProtocol.requests[req.MessageData.Id] + if !ok { + e.ReadDataProtocol.Unlock() + return errors.New(ERR_RespFailure) + } + e.ReadDataProtocol.Unlock() + + if resp.ReadfileResponse == nil { + return errors.New(ERR_RespFailure) + } + + if len(resp.ReadfileResponse.Data) == 0 || resp.Length == 0 { + if resp.Code == P2PResponseFinish { + err = f.Sync() + if err != nil { + return errors.Wrapf(err, "[sync file]") + } + return nil + } + return errors.New(ERR_RespFailure) + } + + num, err = f.Write(resp.Data[:resp.Length]) + if err != nil { + return errors.Wrapf(err, "[write file]") + } + + if resp.Code == P2PResponseFinish { + err = f.Sync() + if err != nil { + return errors.Wrapf(err, "[sync file]") + } + return nil + } + + offset = req.Offset + int64(num) + } +} + +// remote peer requests handler +func (e *ReadDataProtocol) onReadDataRequest(s network.Stream) { + defer s.Close() + + var code = P2PResponseOK + // get request data + data := &pb.ReadfileRequest{} + buf, err := io.ReadAll(s) + if err != nil { + s.Reset() + return + } + + // unmarshal it + err = proto.Unmarshal(buf, data) + if err != nil { + s.Reset() + return + } + + fpath := FindFile(e.ReadDataProtocol.GetDirs().FileDir, data.Datahash) + if fpath == "" { + fpath = filepath.Join(e.ReadDataProtocol.GetDirs().TmpDir, data.Datahash) + } + + _, err = os.Stat(fpath) + if err != nil { + s.Reset() + return + } + + f, err := os.Open(fpath) + if err != nil { + s.Reset() + return + } + defer f.Close() + + fstat, err := f.Stat() + if err != nil { + s.Reset() + return + } + + _, err = f.Seek(data.Offset, 0) + if err != nil { + s.Reset() + return + } + + var readBuf = make([]byte, FileProtocolBufSize) + num, err := f.Read(readBuf) + if err != nil { + s.Reset() + return + } + + if num+int(data.Offset) >= int(fstat.Size()) { + code = P2PResponseFinish + } + + // send response to the request using the message string he provided + resp := &pb.ReadfileResponse{ + MessageData: e.ReadDataProtocol.NewMessageData(data.MessageData.Id, false), + Code: code, + Offset: data.Offset, + Length: uint32(num), + Data: readBuf[:num], + } + + e.ReadDataProtocol.SendProtoMessage(s.Conn().RemotePeer(), protocol.ID(e.ProtocolPrefix+readDataResponse), resp) +} + +// remote peer requests handler +func (e *ReadDataProtocol) onReadDataResponse(s network.Stream) { + defer s.Close() + data := &pb.ReadfileResponse{} + buf, err := io.ReadAll(s) + if err != nil { + s.Reset() + return + } + + // unmarshal it + err = proto.Unmarshal(buf, data) + if err != nil { + s.Reset() + return + } + + e.ReadDataProtocol.Lock() + defer e.ReadDataProtocol.Unlock() + // locate request data and remove it if found + _, ok := e.requests[data.MessageData.Id] + if ok { + if data.Code == P2PResponseOK || data.Code == P2PResponseFinish { + e.requests[data.MessageData.Id].ch <- true + e.requests[data.MessageData.Id].ReadfileResponse = data + } else { + e.requests[data.MessageData.Id].ch <- false + } + } +} diff --git a/core/readfile.go b/core/readfile.go index 73c397c..a37d0c4 100644 --- a/core/readfile.go +++ b/core/readfile.go @@ -171,6 +171,13 @@ func (e *protocols) ReadFileAction(id peer.ID, roothash, datahash, path string, } if len(resp.ReadfileResponse.Data) == 0 || resp.Length == 0 { + if resp.Code == P2PResponseFinish { + err = f.Sync() + if err != nil { + return errors.Wrapf(err, "[sync file]") + } + return nil + } return errors.New(ERR_RespFailure) } @@ -200,12 +207,14 @@ func (e *ReadFileProtocol) onReadFileRequest(s network.Stream) { data := &pb.ReadfileRequest{} buf, err := io.ReadAll(s) if err != nil { + s.Reset() return } // unmarshal it err = proto.Unmarshal(buf, data) if err != nil { + s.Reset() return } @@ -218,17 +227,20 @@ func (e *ReadFileProtocol) onReadFileRequest(s network.Stream) { f, err := os.Open(fpath) if err != nil { + s.Reset() return } defer f.Close() fstat, err := f.Stat() if err != nil { + s.Reset() return } _, err = f.Seek(data.Offset, 0) if err != nil { + s.Reset() return } diff --git a/core/utils.go b/core/utils.go index ad59f8e..9c2b372 100644 --- a/core/utils.go +++ b/core/utils.go @@ -18,6 +18,7 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "strings" "time" @@ -210,3 +211,22 @@ func FreeLocalPort(port uint32) bool { conn.Close() return false } + +func FindFile(dir, name string) string { + var result string + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.Name() == name { + result = path + return nil + } + return nil + }) + + if err != nil { + fmt.Println(err) + } + return result +} diff --git a/examples/readdata/example_readdata.go b/examples/readdata/example_readdata.go new file mode 100644 index 0000000..5c3f9bd --- /dev/null +++ b/examples/readdata/example_readdata.go @@ -0,0 +1,73 @@ +/* + Copyright (C) CESS. All rights reserved. + Copyright (C) Cumulus Encrypted Storage System. All rights reserved. + + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "os" + "time" + + p2pgo "github.com/CESSProject/p2p-go" + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +func main() { + file := "readfile" + ctx := context.Background() + sourcePort1 := flag.Int("p1", 15000, "Source port number") + sourcePort2 := flag.Int("p2", 15001, "Source port number") + + // To construct a simple host with all the default settings, just use `New` + h1, err := p2pgo.New( + ctx, + p2pgo.PrivatekeyFile(".private1"), + p2pgo.ListenPort(*sourcePort1), // regular tcp connections + p2pgo.Workspace("."), + ) + if err != nil { + panic(err) + } + defer h1.Close() + + fmt.Println("node1:", h1.Addrs(), h1.ID()) + + // To construct a simple host with all the default settings, just use `New` + h2, err := p2pgo.New( + ctx, + p2pgo.PrivatekeyFile(".private2"), + p2pgo.ListenPort(*sourcePort2), // regular tcp connections + p2pgo.Workspace("."), + ) + if err != nil { + panic(err) + } + defer h2.Close() + + fmt.Println("node2:", h2.Addrs(), h2.ID()) + + remote := fmt.Sprintf("/ip4/0.0.0.0/tcp/15001/p2p/%v", h2.ID()) + + maddr, err := ma.NewMultiaddr(remote) + if err != nil { + fmt.Println("NewMultiaddr err: ", err) + os.Exit(1) + } + // Extract the peer ID from the multiaddr. + info, err := peer.AddrInfoFromP2pAddr(maddr) + if err != nil { + fmt.Println("AddrInfoFromP2pAddr err: ", err) + os.Exit(1) + } + h1.Peerstore().AddAddr(info.ID, maddr, time.Hour) + + err = h1.ReadDataAction(info.ID, "example_readdata.go", "example_readdata.go", file, 1717) + fmt.Println("err: ", err) +}