Skip to content

Commit

Permalink
add read file action extension (#110)
Browse files Browse the repository at this point in the history
* update read file server

* use extension data to read file action

* fix a bug
  • Loading branch information
jiuquxzy authored Apr 19, 2024
1 parent 029ddb6 commit 0e91f8f
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 98 deletions.
174 changes: 110 additions & 64 deletions core/readfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
const readFileRequest = "/file/readreq/v0"
const readFileResponse = "/file/readresp/v0"

type ReadFileServerHandle func(req *pb.ReadfileRequest) (*pb.ReadfileResponse, error)

type readMsgResp struct {
ch chan bool
*pb.ReadfileResponse
Expand All @@ -38,6 +40,7 @@ type readMsgResp struct {
type ReadFileProtocol struct {
*PeerNode // local host
*sync.Mutex
handle ReadFileServerHandle
requests map[string]*readMsgResp // determine whether it is your own response
}

Expand All @@ -49,14 +52,11 @@ func (n *PeerNode) NewReadFileProtocol() *ReadFileProtocol {
}

func (e *protocols) ReadFileAction(id peer.ID, roothash, datahash, path string, size int64) error {
var ok bool
var err error
var hash string
var offset int64
var num int
var fstat fs.FileInfo
var f *os.File
var req pb.ReadfileRequest

if size != FragmentSize {
return errors.New("invalid size")
Expand Down Expand Up @@ -111,11 +111,59 @@ func (e *protocols) ReadFileAction(id peer.ID, roothash, datahash, path string,
return errors.Wrapf(err, "[open file]")
}
defer f.Close()
defer f.Sync()
req := &pb.ReadfileRequest{
Roothash: roothash,
Datahash: datahash,
Offset: offset,
MessageData: e.ReadFileProtocol.NewMessageData(uuid.New().String(), false),
}
return e.readFileAction(id, req, func(req *pb.ReadfileRequest, resp *readMsgResp) error {
num, err := f.Write(resp.Data[:resp.Length])
if err != nil {
return errors.Wrapf(err, "[write file]")
}
req.Offset += int64(num)
return nil
})
}

req.Roothash = roothash
req.Datahash = datahash
req.MessageData = e.ReadFileProtocol.NewMessageData(uuid.New().String(), false)
func (e *protocols) ReadFileActionWithExtension(id peer.ID, roothash, datahash, path string, size int64, extData []byte) error {
var err error
var offset int64
var fstat fs.FileInfo
var f *os.File

if fstat, err = os.Stat(path); err == nil && fstat.IsDir() {
return fmt.Errorf("%s is a directory", path)
}
f, err = os.Create(path)
if err != nil {
return errors.Wrapf(err, "[create file]")
}
defer f.Close()
defer f.Sync()
req := &pb.ReadfileRequest{
Roothash: roothash,
Datahash: datahash,
Offset: offset,
//add extension data
ExtendData: extData,
MessageData: e.ReadFileProtocol.NewMessageData(uuid.New().String(), false),
}
return e.readFileAction(id, req, func(req *pb.ReadfileRequest, resp *readMsgResp) error {
num, err := f.Write(resp.Data[:resp.Length])
if err != nil {
return errors.Wrapf(err, "[write file]")
}
req.Offset += int64(num)
return nil
})
}

func (e *ReadFileProtocol) readFileAction(
id peer.ID, req *pb.ReadfileRequest, callback func(req *pb.ReadfileRequest, resp *readMsgResp,
) error) error {
// store request so response handler has access to it
var respChan = make(chan bool, 1)
e.ReadFileProtocol.Lock()
Expand All @@ -138,19 +186,16 @@ func (e *protocols) ReadFileAction(id peer.ID, roothash, datahash, path string,
}()
timeout := time.NewTicker(P2PReadReqRespTime)
defer timeout.Stop()

for {
req.Offset = offset

err = e.ReadFileProtocol.SendProtoMessage(id, protocol.ID(e.ProtocolPrefix+readFileRequest), &req)
err := e.ReadFileProtocol.SendProtoMessage(id, protocol.ID(e.ProtocolPrefix+readFileRequest), req)
if err != nil {
return errors.Wrapf(err, "[SendProtoMessage]")
}

//
timeout.Reset(P2PReadReqRespTime)
select {
case ok = <-respChan:
case ok := <-respChan:
if !ok {
return errors.New(ERR_RespFailure)
}
Expand All @@ -169,101 +214,102 @@ func (e *protocols) ReadFileAction(id peer.ID, roothash, datahash, path string,
if resp.ReadfileResponse == nil {
return errors.New(ERR_RespFailure)
}

if len(resp.ReadfileResponse.Data) == 0 || resp.Length == 0 {
if resp.Code == P2PResponseFinish {
err = f.Sync()
if err != nil {
return errors.Wrapf(err, "[sync file]")
}
return nil
}
return errors.New(ERR_RespFailure)
}

num, err = f.Write(resp.Data[:resp.Length])
err = callback(req, resp)
if err != nil {
return errors.Wrapf(err, "[write file]")
return err
}

if resp.Code == P2PResponseFinish {
err = f.Sync()
if err != nil {
return errors.Wrapf(err, "[sync file]")
}
return nil
}

offset = req.Offset + int64(num)
}
}

// remote peer requests handler
func (e *ReadFileProtocol) onReadFileRequest(s network.Stream) {
defer s.Close()
func (e *ReadFileProtocol) SetReadFileServiceHandle(handle ReadFileServerHandle) {
e.handle = handle
}

func (e *ReadFileProtocol) DefaultReadFileServerHandle(req *pb.ReadfileRequest) (*pb.ReadfileResponse, error) {
var code = P2PResponseOK
// get request data
data := &pb.ReadfileRequest{}
buf, err := io.ReadAll(s)
if err != nil {
s.Reset()
return
}
fpath := filepath.Join(e.ReadFileProtocol.GetDirs().TmpDir, req.Roothash, req.Datahash)

// unmarshal it
err = proto.Unmarshal(buf, data)
if err != nil {
s.Reset()
return
}

fpath := filepath.Join(e.ReadFileProtocol.GetDirs().TmpDir, data.Roothash, data.Datahash)

_, err = os.Stat(fpath)
if err != nil {
fpath = filepath.Join(e.ReadFileProtocol.GetDirs().FileDir, data.Roothash, data.Datahash)
if _, err := os.Stat(fpath); err != nil {
fpath = filepath.Join(e.ReadFileProtocol.GetDirs().FileDir, req.Roothash, req.Datahash)
}

f, err := os.Open(fpath)
if err != nil {
s.Reset()
return
return &pb.ReadfileResponse{Code: P2PResponseFailed}, err
}
defer f.Close()

fstat, err := f.Stat()
if err != nil {
s.Reset()
return
return &pb.ReadfileResponse{Code: P2PResponseRemoteFailed}, err
}

_, err = f.Seek(data.Offset, 0)
if err != nil {
s.Reset()
return
if _, err = f.Seek(req.Offset, 0); err != nil {
return &pb.ReadfileResponse{Code: P2PResponseRemoteFailed}, err
}

var readBuf = make([]byte, FileProtocolBufSize)
num, err := f.Read(readBuf)
if err != nil {
return
return &pb.ReadfileResponse{Code: P2PResponseRemoteFailed}, err
}

if num+int(data.Offset) >= int(fstat.Size()) {
if num+int(req.Offset) >= int(fstat.Size()) {
code = P2PResponseFinish
}

// send response to the request using the message string he provided
resp := &pb.ReadfileResponse{
MessageData: e.ReadFileProtocol.NewMessageData(data.MessageData.Id, false),
Code: code,
Offset: data.Offset,
Length: uint32(num),
Data: readBuf[:num],
Code: code,
Offset: req.Offset,
Length: uint32(num),
Data: readBuf[:num],
}
return resp, nil
}

// remote peer requests handler
func (e *ReadFileProtocol) onReadFileRequest(s network.Stream) {
defer s.Close()

// get request data
data := &pb.ReadfileRequest{}
buf, err := io.ReadAll(s)
if err != nil {
s.Reset()
return
}

e.ReadFileProtocol.SendProtoMessage(s.Conn().RemotePeer(), protocol.ID(e.ProtocolPrefix+readFileResponse), resp)
// unmarshal it
err = proto.Unmarshal(buf, data)
if err != nil {
s.Reset()
return
}
handle := e.handle
if handle == nil {
handle = e.DefaultReadFileServerHandle
}
resp, err := handle(data)
if err != nil && resp == nil {
s.Reset()
return
}
resp.MessageData = e.ReadFileProtocol.NewMessageData(data.MessageData.Id, false)
e.ReadFileProtocol.SendProtoMessage(
s.Conn().RemotePeer(),
protocol.ID(e.ProtocolPrefix+readFileResponse),
resp,
)
}

// remote peer requests handler
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
Expand Down Expand Up @@ -89,6 +89,7 @@ require (
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/protocolbuffers/protobuf v5.26.1+incompatible // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
github.com/quic-go/quic-go v0.39.4 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -97,6 +98,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -162,10 +165,13 @@ github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPw
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down Expand Up @@ -240,6 +246,7 @@ github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dz
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
Expand Down Expand Up @@ -267,6 +274,7 @@ github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dy
github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY=
Expand Down Expand Up @@ -300,6 +308,8 @@ github.com/prometheus/common v0.47.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5E
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/protocolbuffers/protobuf v5.26.1+incompatible h1:rsMX3S/GhXD3gZe9doksoUYFpcMP67si8Jf+cEz3KYE=
github.com/protocolbuffers/protobuf v5.26.1+incompatible/go.mod h1:DdhgU1nye99PVSHQwKVPGBaTs902wvndr/KhlFhJxmw=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-20 v0.3.4 h1:MfFAPULvst4yoMgY9QmtpYmfij/em7O8UUi+bNVm7Cg=
Expand Down Expand Up @@ -569,6 +579,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
Loading

0 comments on commit 0e91f8f

Please sign in to comment.