Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
Merge pull request #284 from OpenBazaar/TS_shutdown_gateway
Browse files Browse the repository at this point in the history
REFACTOR: Make API Gateway closable.
  • Loading branch information
cpacia authored Dec 21, 2016
2 parents 7034447 + 7bb5c28 commit cec7f86
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 105 deletions.
95 changes: 41 additions & 54 deletions api/gateway.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
package api

import (
manet "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
"gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
"net"
"net/http"
"time"

"github.com/OpenBazaar/openbazaar-go/core"
"github.com/OpenBazaar/openbazaar-go/repo"
"github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core/corehttp"
"github.com/op/go-logging"
)

var log = logging.MustGetLogger("api")

func makeHandler(n *core.OpenBazaarNode, ctx commands.Context, authCookie http.Cookie, l net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) (http.Handler, error) {
// Gateway represents an HTTP API gateway
type Gateway struct {
listener net.Listener
handler http.Handler
config repo.APIConfig
shutdownCh chan struct{}
}

// NewGateway instantiates a new `Gateway`
func NewGateway(n *core.OpenBazaarNode, authCookie http.Cookie, l net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) (*Gateway, error) {
topMux := http.NewServeMux()

restAPI, err := newJsonAPIHandler(n, authCookie, config)
if err != nil {
return nil, err
}
wsAPI, err := newWSAPIHandler(n, ctx, config.Authenticated, authCookie, config.Username, config.Password)
wsAPI, err := newWSAPIHandler(n, n.Context, config.Authenticated, authCookie, config.Username, config.Password)
if err != nil {
return nil, err
}
Expand All @@ -35,66 +41,47 @@ func makeHandler(n *core.OpenBazaarNode, ctx commands.Context, authCookie http.C

mux := topMux
for _, option := range options {
var err error
mux, err = option(n.IpfsNode, l, mux)
if err != nil {
return nil, err
}
}
return topMux, nil
}

func Serve(cb chan<- bool, node *core.OpenBazaarNode, ctx commands.Context, authCookie http.Cookie, lis net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) error {
handler, err := makeHandler(node, ctx, authCookie, lis, config, options...)
cb <- true
if err != nil {
return err
}
return &Gateway{
listener: l,
handler: topMux,
config: config,
shutdownCh: make(chan struct{}),
}, nil
}

addr, err := manet.FromNetAddr(lis.Addr())
if err != nil {
return err
}
// Close shutsdown the Gateway listener
func (g *Gateway) Close() error {
log.Infof("server at %s terminating...", g.listener.Addr())

// If the server exits beforehand
var serverError error
serverExited := make(chan struct{})
// Print shutdown message every few seconds if we're taking too long
go func() {
select {
case <-g.shutdownCh:
return
case <-time.After(5 * time.Second):
log.Infof("waiting for server at %s to terminate...", g.listener.Addr())

node.IpfsNode.Process().Go(func(p goprocess.Process) {
if config.SSL {
serverError = http.ListenAndServeTLS(lis.Addr().String(), config.SSLCert, config.SSLKey, handler)
} else {
serverError = http.Serve(lis, handler)
}
close(serverExited)
})

// Wait for server to exit
select {
case <-serverExited:
}()

// If node being closed before server exits, close server
case <-node.IpfsNode.Process().Closing():
log.Infof("server at %s terminating...", addr)
if config.SSL {
close(serverExited)
} else {
lis.Close()
}
// Shutdown the listener
close(g.shutdownCh)
return g.listener.Close()
}

outer:
for {
// Wait until server exits
select {
case <-serverExited:
// If the server exited as we are closing, we really do not care about errors
serverError = nil
break outer
case <-time.After(5 * time.Second):
log.Infof("waiting for server at %s to terminate...", addr)
}
}
// Serve begins listening on the configured address
func (g *Gateway) Serve() error {
var err error
if g.config.SSL {
err = http.ListenAndServeTLS(g.listener.Addr().String(), g.config.SSLCert, g.config.SSLKey, g.handler)
} else {
err = http.Serve(g.listener, g.handler)
}
log.Infof("server at %s terminated", addr)
return serverError
return err
}
97 changes: 46 additions & 51 deletions openbazaard.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ var decryptDatabase DecryptDatabase

var parser = flags.NewParser(nil, flags.Default)

var ErrNoGateways = errors.New("No gateway addresses configured")

func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
Expand Down Expand Up @@ -505,46 +507,40 @@ func (x *Start) Execute(args []string) error {
UserAgent: USERAGENT,
}

var gwErrc <-chan error
var cb <-chan bool
if len(cfg.Addresses.Gateway) > 0 {
if (apiConfig.SSL && apiConfig.SSLCert == "") || (apiConfig.SSL && apiConfig.SSLKey == "") {
return errors.New("SSL cert and key files must be set when SSL is enabled")
}
err, cb, gwErrc = serveHTTPGateway(core.Node, authCookie, *apiConfig)
if err != nil {
log.Error(err)
return err
}
if len(cfg.Addresses.Gateway) <= 0 {
return ErrNoGateways
}
if (apiConfig.SSL && apiConfig.SSLCert == "") || (apiConfig.SSL && apiConfig.SSLKey == "") {
return errors.New("SSL cert and key files must be set when SSL is enabled")
}

/* Wait for gateway to start before starting the network service.
This way the websocket channel we pass into the service gets created first.
FIXME: There has to be a better way */
for b := range cb {
if b == true {
core.Node.Service = service.New(core.Node, ctx, sqliteDB)
MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, core.Node.Service, 16, core.Node.SendOfflineAck)
go MR.Run()
core.Node.MessageRetriever = MR
PR := rep.NewPointerRepublisher(nd, sqliteDB)
go PR.Run()
core.Node.PointerRepublisher = PR
if !x.DisableWallet {
MR.Wait()
TL := lis.NewTransactionListener(core.Node.Datastore, core.Node.Broadcast, core.Node.Wallet.Params())
wallet.AddTransactionListener(TL.OnTransactionReceived)
log.Info("Starting bitcoin wallet...")
go wallet.Start()
}
core.Node.UpdateFollow()
core.Node.SeedNode()
}
break
gateway, err := newHTTPGateway(core.Node, authCookie, *apiConfig)
if err != nil {
log.Error(err)
return err
}

for err := range gwErrc {
fmt.Println(err)
core.Node.Service = service.New(core.Node, ctx, sqliteDB)
MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, core.Node.Service, 16, core.Node.SendOfflineAck)
go MR.Run()
core.Node.MessageRetriever = MR
PR := rep.NewPointerRepublisher(nd, sqliteDB)
go PR.Run()
core.Node.PointerRepublisher = PR
if !x.DisableWallet {
MR.Wait()
TL := lis.NewTransactionListener(core.Node.Datastore, core.Node.Broadcast, core.Node.Wallet.Params())
wallet.AddTransactionListener(TL.OnTransactionReceived)
log.Info("Starting bitcoin wallet...")
go wallet.Start()
}
core.Node.UpdateFollow()
core.Node.SeedNode()

// Start gateway
err = gateway.Serve()
if err != nil {
log.Error(err)
}

return nil
Expand Down Expand Up @@ -596,37 +592,40 @@ func (d *DummyListener) Close() error {
}

// Collects options, creates listener, prints status message and starts serving requests
func serveHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config repo.APIConfig) (error, <-chan bool, <-chan error) {
func newHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config repo.APIConfig) (*api.Gateway, error) {
// Get API configuration
cfg, err := node.Context.GetConfig()
if err != nil {
return err, nil, nil
return nil, err
}

// Create a network listener
gatewayMaddr, err := ma.NewMultiaddr(cfg.Addresses.Gateway)
if err != nil {
return fmt.Errorf("serveHTTPGateway: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err), nil, nil
return nil, fmt.Errorf("newHTTPGateway: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err)
}
var gwLis manet.Listener
if config.SSL {
netAddr, err := manet.ToNetAddr(gatewayMaddr)
if err != nil {
return err, nil, nil
return nil, err
}
gwLis, err = manet.WrapNetListener(&DummyListener{netAddr})
if err != nil {
return err, nil, nil
return nil, err
}
} else {
gwLis, err = manet.Listen(gatewayMaddr)
if err != nil {
return fmt.Errorf("serveHTTPGateway: manet.Listen(%s) failed: %s", gatewayMaddr, err), nil, nil
return nil, fmt.Errorf("newHTTPGateway: manet.Listen(%s) failed: %s", gatewayMaddr, err)
}
}

// We might have listened to /tcp/0 - let's see what we are listing on
gatewayMaddr = gwLis.Multiaddr()

log.Infof("Gateway/API server listening on %s\n", gatewayMaddr)

// Setup an options slice
var opts = []corehttp.ServeOption{
corehttp.MetricsCollectionOption("gateway"),
corehttp.CommandsROOption(node.Context),
Expand All @@ -640,15 +639,11 @@ func serveHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config
}

if err != nil {
return fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err), nil, nil
return nil, fmt.Errorf("newHTTPGateway: ConstructNode() failed: %s", err)
}
errc := make(chan error)
cb := make(chan bool)
go func() {
errc <- api.Serve(cb, node, node.Context, authCookie, gwLis.NetListener(), config, opts...)
close(errc)
}()
return nil, cb, errc

// Create and return an API gateway
return api.NewGateway(node, authCookie, gwLis.NetListener(), config, opts...)
}

/* Returns the directory to store repo data in.
Expand Down

0 comments on commit cec7f86

Please sign in to comment.