Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1560 from OpenBazaar/proper-network-lookup-fix
Browse files Browse the repository at this point in the history
Proper network lookup fix
  • Loading branch information
cpacia authored Apr 29, 2019
2 parents 35844a3 + b478fd7 commit 97d21ff
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 30 deletions.
16 changes: 11 additions & 5 deletions ipfs/api_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
routing "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing"
ropts "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing/options"
pstore "gx/ipfs/QmaCTz9RkrU13bm9kMB54f7atgqM4qkjDZpRwRoJiWXEqs/go-libp2p-peerstore"
record "gx/ipfs/QmbeHtaBy9nZsW4cHRcvgVY4CnDhXudE2Dr6qDxS7yg9rX/go-libp2p-record"
)

var apiRouterHTTPClient = &http.Client{
Expand All @@ -32,13 +33,14 @@ var ErrNotStarted = errors.New("API router not started")
// provides the features offerened by routing.ValueStore and marks the others as
// unsupported.
type APIRouter struct {
uri string
started chan (struct{})
uri string
started chan (struct{})
validator record.Validator
}

// NewAPIRouter creates a new APIRouter backed by the given URI.
func NewAPIRouter(uri string) APIRouter {
return APIRouter{uri: uri, started: make(chan (struct{}))}
func NewAPIRouter(uri string, validator record.Validator) APIRouter {
return APIRouter{uri: uri, started: make(chan (struct{})), validator: validator}
}

func (r *APIRouter) Start(proxyDialer proxy.Dialer) {
Expand Down Expand Up @@ -79,7 +81,11 @@ func (r APIRouter) GetValue(ctx context.Context, key string, opts ...ropts.Optio
defer resp.Body.Close()

log.Debugf("read value from %s", path)
return ioutil.ReadAll(resp.Body)
value, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return value, r.validator.Validate(key, value)
}

// GetValues reads the value for the given key. The API does not return multiple
Expand Down
69 changes: 47 additions & 22 deletions ipfs/caching_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ipfs

import (
"context"
"encoding/hex"
"errors"
routinghelpers "gx/ipfs/QmRCrPXk2oUwpK1Cj2FXrUotRpddUxz56setkny2gz13Cx/go-libp2p-routing-helpers"
dht "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht"
routing "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing"
ropts "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing/options"
record "gx/ipfs/QmbeHtaBy9nZsW4cHRcvgVY4CnDhXudE2Dr6qDxS7yg9rX/go-libp2p-record"

"gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht"
ci "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto"
"gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
"gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing"
"gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing/options"
)

var (
Expand All @@ -17,14 +19,12 @@ var (
type CachingRouter struct {
apiRouter *APIRouter
routing.IpfsRouting
RecordValidator record.Validator
}

func NewCachingRouter(dht *dht.IpfsDHT, apiRouter *APIRouter) *CachingRouter {
return &CachingRouter{
apiRouter: apiRouter,
IpfsRouting: dht,
RecordValidator: dht.Validator,
apiRouter: apiRouter,
IpfsRouting: dht,
}
}

Expand All @@ -43,28 +43,53 @@ func (r *CachingRouter) APIRouter() *APIRouter {
func (r *CachingRouter) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) error {
// Write to the tiered router in the background then write to the caching
// router and return
go r.IpfsRouting.PutValue(ctx, key, value, opts...)
return r.apiRouter.PutValue(ctx, key, value, opts...)
var err error
if err = r.IpfsRouting.PutValue(ctx, key, value, opts...); err != nil {
log.Errorf("ipfs dht put (%s): %s", hex.EncodeToString([]byte(key)), err)
return err
}
if err = r.apiRouter.PutValue(ctx, key, value, opts...); err != nil {
log.Errorf("api cache put (%s): %s", hex.EncodeToString([]byte(key)), err)
}
return err
}

func (r *CachingRouter) GetValue(ctx context.Context, key string, opts ...ropts.Option) ([]byte, error) {
// First check the DHT router. If it's successful return the value otherwise
// continue on to check the other routers.
val, err := r.IpfsRouting.GetValue(ctx, key, opts...)
if err == nil {
return val, r.apiRouter.PutValue(ctx, key, val, opts...)
if err != nil && len(val) == 0 {
// No values from the DHT, check the API cache
log.Warningf("ipfs dht lookup was empty: %s", err.Error())
if val, err = r.apiRouter.GetValue(ctx, key, opts...); err != nil && len(val) == 0 {
// No values still, report NotFound
return nil, routing.ErrNotFound
}
}
if err := r.apiRouter.PutValue(ctx, key, val, opts...); err != nil {
log.Errorf("api cache put found dht value (%s): %s", hex.EncodeToString([]byte(key)), err.Error())
}
return val, nil
}

// Value miss; Check API router
return r.apiRouter.GetValue(ctx, key, opts...)
func (r *CachingRouter) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
if dht, ok := r.IpfsRouting.(routing.PubKeyFetcher); ok {
return dht.GetPublicKey(ctx, p)
}
return nil, routing.ErrNotSupported
}

func (r *CachingRouter) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
return routinghelpers.Parallel{
Routers: []routing.IpfsRouting{
r.IpfsRouting,
r.apiRouter,
},
Validator: r.RecordValidator,
}.SearchValue(ctx, key, opts...)
// TODO: Restore parallel lookup once validation is properly applied to
// the apiRouter results ensuring it doesn't return invalid records before the
// IpfsRouting object can. For some reason the validation is not being considered
// on returned results.
return r.IpfsRouting.SearchValue(ctx, key, opts...)
//return routinghelpers.Parallel{
//Routers: []routing.IpfsRouting{
//r.IpfsRouting,
//r.apiRouter,
//},
//Validator: r.RecordValidator,
//}.SearchValue(ctx, key, opts...)
}
4 changes: 2 additions & 2 deletions ipfs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func constructRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching
if err != nil {
return nil, err
}
apiRouter := NewAPIRouter(routerCacheURI)
apiRouter := NewAPIRouter(routerCacheURI, dhtRouting.Validator)
cachingRouter := NewCachingRouter(dhtRouting, &apiRouter)
return cachingRouter, nil
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func constructTestnetRouting(ctx context.Context, host p2phost.Host, dstore ds.B
if err != nil {
return nil, err
}
apiRouter := NewAPIRouter(routerCacheURI)
apiRouter := NewAPIRouter(routerCacheURI, dhtRouting.Validator)
cachingRouter := NewCachingRouter(dhtRouting, &apiRouter)
return cachingRouter, nil
}
2 changes: 1 addition & 1 deletion mobile/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func constructMobileRouting(ctx context.Context, host p2phost.Host, dstore ds.Ba
if err != nil {
return nil, err
}
apiRouter := ipfs.NewAPIRouter(schema.IPFSCachingRouterDefaultURI)
apiRouter := ipfs.NewAPIRouter(schema.IPFSCachingRouterDefaultURI, dhtRouting.Validator)
apiRouter.Start(nil)
cachingRouter := ipfs.NewCachingRouter(dhtRouting, &apiRouter)
return cachingRouter, nil
Expand Down

0 comments on commit 97d21ff

Please sign in to comment.