Skip to content

Commit

Permalink
feat: implement delegated PUTs
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Feb 15, 2024
1 parent 3b4235d commit fc7e7d2
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 22 deletions.
9 changes: 0 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ func findProviders(ctx context.Context, key cid.Cid, endpoint string, prettyOutp
fmt.Fprintln(os.Stdout, record.ID)
fmt.Fprintln(os.Stdout, "\tProtocols:", record.Protocols)
fmt.Fprintln(os.Stdout, "\tAddresses:", record.Addrs)

//lint:ignore SA1019 // ignore staticcheck
case types.SchemaBitswap:
//lint:ignore SA1019 // ignore staticcheck
record := res.Val.(*types.BitswapRecord)
fmt.Fprintln(os.Stdout, record.ID)
fmt.Fprintln(os.Stdout, "\tProtocol:", record.Protocol)
fmt.Fprintln(os.Stdout, "\tAddresses:", record.Addrs)

default:
// This is an unknown schema. Let's just print it raw.
err := json.NewEncoder(os.Stdout).Encode(res.Val)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/CAFxX/httpcompression v0.0.9
github.com/felixge/httpsnoop v1.0.4
github.com/ipfs/boxo v0.17.0
github.com/ipfs/boxo v0.17.1-0.20240215103716-813b59fc38e9
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/libp2p/go-libp2p v0.32.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/boxo v0.17.0 h1:fVXAb12dNbraCX1Cdid5BB6Kl62gVLNVA+e0EYMqAU0=
github.com/ipfs/boxo v0.17.0/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80=
github.com/ipfs/boxo v0.17.1-0.20240215103716-813b59fc38e9 h1:UCaiJRJTjpYtDXG9H3M3AdeotBw+KxcU6zj3D8PdnVU=
github.com/ipfs/boxo v0.17.1-0.20240215103716-813b59fc38e9/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
Expand Down
118 changes: 108 additions & 10 deletions server_routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"math"
"sync"
"time"

Expand All @@ -24,10 +25,12 @@ type router interface {

type providersRouter interface {
FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error)
Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error)
}

type peersRouter interface {
FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error)
ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error)
}

type ipnsRouter interface {
Expand All @@ -50,13 +53,27 @@ func (r composableRouter) FindProviders(ctx context.Context, key cid.Cid, limit
return r.providers.FindProviders(ctx, key, limit)
}

func (r composableRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
if r.providers == nil {
return 0, nil
}
return r.providers.Provide(ctx, req)

Check warning on line 60 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L56-L60

Added lines #L56 - L60 were not covered by tests
}

func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
if r.peers == nil {
return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil
}
return r.peers.FindPeers(ctx, pid, limit)
}

func (r composableRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
if r.peers == nil {
return 0, nil
}
return r.peers.ProvidePeer(ctx, req)

Check warning on line 74 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L70-L74

Added lines #L70 - L74 were not covered by tests
}

func (r composableRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
if r.ipns == nil {
return nil, routing.ErrNotFound
Expand All @@ -71,11 +88,6 @@ func (r composableRouter) PutIPNS(ctx context.Context, name ipns.Name, record *i
return r.ipns.PutIPNS(ctx, name, record)
}

//lint:ignore SA1019 // ignore staticcheck
func (r composableRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

var _ server.ContentRouter = parallelRouter{}

type parallelRouter struct {
Expand Down Expand Up @@ -206,6 +218,57 @@ func (mi *manyIter[T]) Close() error {
return err
}

func (r parallelRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
return provide(ctx, r.routers, func(ctx context.Context, r router) (time.Duration, error) {
return r.Provide(ctx, req)
})

Check warning on line 224 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L221-L224

Added lines #L221 - L224 were not covered by tests
}

func (r parallelRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
return provide(ctx, r.routers, func(ctx context.Context, r router) (time.Duration, error) {
return r.ProvidePeer(ctx, req)
})

Check warning on line 230 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L227-L230

Added lines #L227 - L230 were not covered by tests
}

func provide(ctx context.Context, routers []router, call func(context.Context, router) (time.Duration, error)) (time.Duration, error) {
switch len(routers) {
case 0:
return 0, nil
case 1:
return call(ctx, routers[0])

Check warning on line 238 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L233-L238

Added lines #L233 - L238 were not covered by tests
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
resultsTTL := make([]time.Duration, len(routers))
resultsErr := make([]error, len(routers))
wg.Add(len(routers))
for i, ri := range routers {
go func(ri router, i int) {
resultsTTL[i], resultsErr[i] = call(ctx, ri)
wg.Done()
}(ri, i)

Check warning on line 252 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L241-L252

Added lines #L241 - L252 were not covered by tests
}
wg.Wait()

var err error
for _, e := range resultsErr {
err = errors.Join(err, e)
}

Check warning on line 259 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L254-L259

Added lines #L254 - L259 were not covered by tests

// Choose lowest TTL to return.
var ttl time.Duration = math.MaxInt64
for _, t := range resultsTTL {
if t < ttl {
ttl = t
}

Check warning on line 266 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L262-L266

Added lines #L262 - L266 were not covered by tests
}

return ttl, err

Check warning on line 269 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L269

Added line #L269 was not covered by tests
}

func (r parallelRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
switch len(r.routers) {
case 0:
Expand Down Expand Up @@ -296,11 +359,6 @@ func (r parallelRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipn
return errs
}

//lint:ignore SA1019 // ignore staticcheck
func (r parallelRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

var _ router = libp2pRouter{}

type libp2pRouter struct {
Expand All @@ -316,6 +374,12 @@ func (d libp2pRouter) FindProviders(ctx context.Context, key cid.Cid, limit int)
}), nil
}

func (d libp2pRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
// NOTE: this router cannot provide further to the DHT, since we can only
// announce CIDs that our own node has, which is not the case.
return 0, routing.ErrNotSupported

Check warning on line 380 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L377-L380

Added lines #L377 - L380 were not covered by tests
}

func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -337,6 +401,10 @@ func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it
return iter.ToResultIter[*types.PeerRecord](iter.FromSlice[*types.PeerRecord]([]*types.PeerRecord{rec})), nil
}

func (r libp2pRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
return 0, routing.ErrNotSupported

Check warning on line 405 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L404-L405

Added lines #L404 - L405 were not covered by tests
}

func (d libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -409,6 +477,36 @@ func (d clientRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int)
return d.Client.FindProviders(ctx, cid)
}

func (d clientRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
return d.provide(func() (iter.ResultIter[*types.AnnouncementRecord], error) {
return d.Client.ProvideRecords(ctx, req)
})

Check warning on line 483 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L480-L483

Added lines #L480 - L483 were not covered by tests
}

func (d clientRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
return d.Client.FindPeers(ctx, pid)
}

func (d clientRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
return d.provide(func() (iter.ResultIter[*types.AnnouncementRecord], error) {
return d.Client.ProvidePeerRecords(ctx, req)
})

Check warning on line 493 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L490-L493

Added lines #L490 - L493 were not covered by tests
}

func (d clientRouter) provide(do func() (iter.ResultIter[*types.AnnouncementRecord], error)) (time.Duration, error) {
resIter, err := do()
if err != nil {
return 0, err
}

Check warning on line 500 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L496-L500

Added lines #L496 - L500 were not covered by tests

records, err := iter.ReadAllResults(resIter)
if err != nil {
return 0, err
}

Check warning on line 505 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L502-L505

Added lines #L502 - L505 were not covered by tests

if len(records) != 1 {
return 0, errors.New("invalid number of records returned")
}

Check warning on line 509 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L507-L509

Added lines #L507 - L509 were not covered by tests

return records[0].Payload.TTL, nil

Check warning on line 511 in server_routers.go

View check run for this annotation

Codecov / codecov/patch

server_routers.go#L511

Added line #L511 was not covered by tests
}
10 changes: 10 additions & 0 deletions server_routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (m *mockRouter) FindProviders(ctx context.Context, key cid.Cid, limit int)
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
}

func (m *mockRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
args := m.Called(ctx, req)
return args.Get(0).(time.Duration), args.Error(1)
}

func (m *mockRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
args := m.Called(ctx, pid, limit)
if arg0 := args.Get(0); arg0 == nil {
Expand All @@ -40,6 +45,11 @@ func (m *mockRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (ite
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
}

func (m *mockRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) {
args := m.Called(ctx, req)
return args.Get(0).(time.Duration), args.Error(1)
}

func (m *mockRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
args := m.Called(ctx, name)
if arg0 := args.Get(0); arg0 == nil {
Expand Down

0 comments on commit fc7e7d2

Please sign in to comment.