diff --git a/client/client.go b/client/client.go index 6d0c916..b58a086 100644 --- a/client/client.go +++ b/client/client.go @@ -2,11 +2,12 @@ package client import ( "context" - "errors" "log" "net/url" "sync" + "github.com/pkg/errors" + "github.com/gorilla/websocket" ) @@ -35,36 +36,28 @@ type Client struct { ctx context.Context cancel context.CancelFunc initMsg BaseMessage // used to resend the initialization msg if connection drops - apiKey string + opts Opts + history *MsgHistory } -// New returns a new blocknative websocket client +// New returns a new blocknative websocket client caller must make sure to initialize afterwards func New(ctx context.Context, opts Opts) (*Client, error) { ctx, cancel := context.WithCancel(ctx) - u := url.URL{ + client := &Client{ + ctx: ctx, + cancel: cancel, + opts: opts, + history: &MsgHistory{}, + } + if err := client.doConnect(ctx, url.URL{ Scheme: opts.Scheme, Host: opts.Host, Path: opts.Path, - } - c, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) - if err != nil { - cancel() - return nil, err - } - // this checks out connection to blocknative's api and makes sure that we connected properly - var out ConnectResponse - if err := c.ReadJSON(&out); err != nil { + }); err != nil { cancel() return nil, err } - if out.Status != "ok" { - cancel() - return nil, errors.New("failed to initialize websockets api connection") - } - if opts.PrintConnectResponse { - log.Printf("%+v\n", out) - } - return &Client{conn: c, ctx: ctx, cancel: cancel, apiKey: opts.APIKey}, nil + return client, nil } // Initialize is used to handle blocknative websockets api initialization @@ -84,6 +77,8 @@ func (c *Client) Initialize(msg BaseMessage) error { } // ReadJSON is a wrapper around Conn:ReadJSON +// You should provide a pointer otherwise you will likely +// encounter a nil interface type as the returned value func (c *Client) ReadJSON(out interface{}) error { c.mx.RLock() defer c.mx.RUnlock() @@ -91,15 +86,20 @@ func (c *Client) ReadJSON(out interface{}) error { } // WriteJSON is a wrapper around Conn:WriteJSON +// Do not provide a pointer as this could cause problems +// with the message history buffer if the provided value +// becomes garbage collected func (c *Client) WriteJSON(out interface{}) error { c.mx.Lock() defer c.mx.Unlock() + // push the message into the history buffer + c.history.Push(out) return c.conn.WriteJSON(out) } // APIKey returns the api key being used by the client func (c *Client) APIKey() string { - return c.apiKey + return c.opts.APIKey } // Close is used to terminate our websocket client @@ -113,6 +113,75 @@ func (c *Client) Close() error { if err != nil { log.Println("failed to send close message: ", err) } + // close the underlying connection + c.conn.Close() c.cancel() return err } + +// ReInit should only be used in the event that we receive an unexpected +// error and allows us to replay previous messages +func (c *Client) ReInit() error { + c.mx.Lock() + defer c.mx.Unlock() + c.doConnect(c.ctx, url.URL{ + Scheme: c.opts.Scheme, + Host: c.opts.Host, + Path: c.opts.Path, + }) + // dont empty the buffer such that future errors + // can reuse the message history + msgs := c.history.CopyAll() + // send the initialize messsage + // we do not store this in the message history buffer + if err := c.conn.WriteJSON(c.initMsg); err != nil { + return errors.Wrap(err, "fatal error received") + } + // drain + _ = c.conn.ReadJSON(nil) + for _, msg := range msgs { + if err := c.conn.WriteJSON(&msg); err != nil { + // TODO(bonedaddy): figure out how to properly handle + log.Println("receive error during reinitialization: ", err) + return err + } + // drain + _ = c.conn.ReadJSON(nil) + } + return nil +} + +// ShouldReInit is used to check the given error +// and return whether or not we should reinitialize the connection +func (c *Client) ShouldReInit(err error) bool { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + return false + } + return true +} + +// doConnect should only be used during creation of the initial client object or during reinitialization +// caller must take care of locking +func (c *Client) doConnect(ctx context.Context, u url.URL) error { + // close the previous connection if it exists + if c.conn != nil { + c.conn.Close() + } + conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) + if err != nil { + return err + } + c.conn = conn + // this checks out connection to blocknative's api and makes sure that we connected properly + var out ConnectResponse + if err := c.conn.ReadJSON(&out); err != nil { + return err + } + if out.Status != "ok" { + return errors.New("failed to initialize websockets api connection") + } + if c.opts.PrintConnectResponse { + log.Printf("%+v\n", out) + } + return nil +} diff --git a/client/client_test.go b/client/client_test.go index cfff32b..7068342 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -22,6 +22,12 @@ func TestClient(t *testing.T) { t.Log("reading message...") var out interface{} require.NoError(t, client.ReadJSON(&out)) + /* + t.Log("message: ", out) + // test reinitialization + t.Log("testing reinit") + require.NoError(t, client.ReInit()) + t.Log("closing") t.Log(out) require.NoError(t, client.WriteJSON( NewConfiguration( @@ -38,6 +44,7 @@ func TestClient(t *testing.T) { ) require.NoError(t, client.ReadJSON(&out)) t.Log(out) + */ require.NoError(t, client.Close()) } diff --git a/client/history.go b/client/history.go index 7e8effc..04b2529 100644 --- a/client/history.go +++ b/client/history.go @@ -44,6 +44,15 @@ func (mg *MsgHistory) PopAll() []interface{} { return copied } +// CopyAll is like PopAll except it does not clear the buffer +func (mg *MsgHistory) CopyAll() []interface{} { + mg.mx.Lock() + defer mg.mx.Unlock() + copied := make([]interface{}, len(mg.buffer)) + copy(copied, mg.buffer) + return copied +} + // Len returns the length of the msg history buffewr func (mg *MsgHistory) Len() int { mg.mx.RLock() diff --git a/client/history_test.go b/client/history_test.go index 51ba9c2..8959060 100644 --- a/client/history_test.go +++ b/client/history_test.go @@ -43,7 +43,20 @@ func TestMsgHistory(t *testing.T) { require.Equal(t, nil, hist.Pop()) // set msg history again set() - all := hist.PopAll() + all := hist.CopyAll() + require.Equal(t, 3, hist.Len()) + for i := 0; i < len(all); i++ { + item = all[i].(arg) + switch i { + case 0: + require.Equal(t, 1, item.num) + case 1: + require.Equal(t, 2, item.num) + case 2: + require.Equal(t, 3, item.num) + } + } + all = hist.PopAll() require.Equal(t, 0, hist.Len()) for i := 0; i < len(all); i++ { item = all[i].(arg) diff --git a/go.mod b/go.mod index 8b0b181..756a6d9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,8 @@ go 1.15 require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gorilla/websocket v1.4.2 - github.com/kr/pretty v0.1.0 // indirect + github.com/kr/pretty v0.2.1 // indirect + github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 github.com/urfave/cli/v2 v2.3.0 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect diff --git a/go.sum b/go.sum index 787c806..b005980 100644 --- a/go.sum +++ b/go.sum @@ -7,11 +7,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=