Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Connection ReInitialization (WIP) #4

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 91 additions & 22 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package client

import (
"context"
"errors"
"log"
"net/url"
"sync"

"github.com/pkg/errors"

"github.com/gorilla/websocket"
)

Expand Down Expand Up @@ -35,36 +36,28 @@ type Client struct {
ctx context.Context
cancel context.CancelFunc
initMsg BaseMessage // used to resend the initialization msg if connection drops
apiKey string
opts Opts
history *MsgHistory
}

// New returns a new blocknative websocket client
// New returns a new blocknative websocket client caller must make sure to initialize afterwards
func New(ctx context.Context, opts Opts) (*Client, error) {
ctx, cancel := context.WithCancel(ctx)
u := url.URL{
client := &Client{
ctx: ctx,
cancel: cancel,
opts: opts,
history: &MsgHistory{},
}
if err := client.doConnect(ctx, url.URL{
Scheme: opts.Scheme,
Host: opts.Host,
Path: opts.Path,
}
c, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
if err != nil {
cancel()
return nil, err
}
// this checks out connection to blocknative's api and makes sure that we connected properly
var out ConnectResponse
if err := c.ReadJSON(&out); err != nil {
}); err != nil {
cancel()
return nil, err
}
if out.Status != "ok" {
cancel()
return nil, errors.New("failed to initialize websockets api connection")
}
if opts.PrintConnectResponse {
log.Printf("%+v\n", out)
}
return &Client{conn: c, ctx: ctx, cancel: cancel, apiKey: opts.APIKey}, nil
return client, nil
}

// Initialize is used to handle blocknative websockets api initialization
Expand All @@ -84,22 +77,29 @@ func (c *Client) Initialize(msg BaseMessage) error {
}

// ReadJSON is a wrapper around Conn:ReadJSON
// You should provide a pointer otherwise you will likely
// encounter a nil interface type as the returned value
func (c *Client) ReadJSON(out interface{}) error {
c.mx.RLock()
defer c.mx.RUnlock()
return c.conn.ReadJSON(out)
}

// WriteJSON is a wrapper around Conn:WriteJSON
// Do not provide a pointer as this could cause problems
// with the message history buffer if the provided value
// becomes garbage collected
func (c *Client) WriteJSON(out interface{}) error {
c.mx.Lock()
defer c.mx.Unlock()
// push the message into the history buffer
c.history.Push(out)
return c.conn.WriteJSON(out)
}

// APIKey returns the api key being used by the client
func (c *Client) APIKey() string {
return c.apiKey
return c.opts.APIKey
}

// Close is used to terminate our websocket client
Expand All @@ -113,6 +113,75 @@ func (c *Client) Close() error {
if err != nil {
log.Println("failed to send close message: ", err)
}
// close the underlying connection
c.conn.Close()
c.cancel()
return err
}

// ReInit should only be used in the event that we receive an unexpected
// error and allows us to replay previous messages
func (c *Client) ReInit() error {
c.mx.Lock()
defer c.mx.Unlock()
c.doConnect(c.ctx, url.URL{
Scheme: c.opts.Scheme,
Host: c.opts.Host,
Path: c.opts.Path,
})
// dont empty the buffer such that future errors
// can reuse the message history
msgs := c.history.CopyAll()
// send the initialize messsage
// we do not store this in the message history buffer
if err := c.conn.WriteJSON(c.initMsg); err != nil {
return errors.Wrap(err, "fatal error received")
}
// drain
_ = c.conn.ReadJSON(nil)
for _, msg := range msgs {
if err := c.conn.WriteJSON(&msg); err != nil {
// TODO(bonedaddy): figure out how to properly handle
log.Println("receive error during reinitialization: ", err)
return err
}
// drain
_ = c.conn.ReadJSON(nil)
}
return nil
}

// ShouldReInit is used to check the given error
// and return whether or not we should reinitialize the connection
func (c *Client) ShouldReInit(err error) bool {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
return false
}
return true
}

// doConnect should only be used during creation of the initial client object or during reinitialization
// caller must take care of locking
func (c *Client) doConnect(ctx context.Context, u url.URL) error {
// close the previous connection if it exists
if c.conn != nil {
c.conn.Close()
}
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
if err != nil {
return err
}
c.conn = conn
// this checks out connection to blocknative's api and makes sure that we connected properly
var out ConnectResponse
if err := c.conn.ReadJSON(&out); err != nil {
return err
}
if out.Status != "ok" {
return errors.New("failed to initialize websockets api connection")
}
if c.opts.PrintConnectResponse {
log.Printf("%+v\n", out)
}
return nil
}
7 changes: 7 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ func TestClient(t *testing.T) {
t.Log("reading message...")
var out interface{}
require.NoError(t, client.ReadJSON(&out))
/*
t.Log("message: ", out)
// test reinitialization
t.Log("testing reinit")
require.NoError(t, client.ReInit())
t.Log("closing")
t.Log(out)
require.NoError(t, client.WriteJSON(
NewConfiguration(
Expand All @@ -38,6 +44,7 @@ func TestClient(t *testing.T) {
)
require.NoError(t, client.ReadJSON(&out))
t.Log(out)
*/
require.NoError(t, client.Close())
}

Expand Down
9 changes: 9 additions & 0 deletions client/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func (mg *MsgHistory) PopAll() []interface{} {
return copied
}

// CopyAll is like PopAll except it does not clear the buffer
func (mg *MsgHistory) CopyAll() []interface{} {
mg.mx.Lock()
defer mg.mx.Unlock()
copied := make([]interface{}, len(mg.buffer))
copy(copied, mg.buffer)
return copied
}

// Len returns the length of the msg history buffewr
func (mg *MsgHistory) Len() int {
mg.mx.RLock()
Expand Down
15 changes: 14 additions & 1 deletion client/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,20 @@ func TestMsgHistory(t *testing.T) {
require.Equal(t, nil, hist.Pop())
// set msg history again
set()
all := hist.PopAll()
all := hist.CopyAll()
require.Equal(t, 3, hist.Len())
for i := 0; i < len(all); i++ {
item = all[i].(arg)
switch i {
case 0:
require.Equal(t, 1, item.num)
case 1:
require.Equal(t, 2, item.num)
case 2:
require.Equal(t, 3, item.num)
}
}
all = hist.PopAll()
require.Equal(t, 0, hist.Len())
for i := 0; i < len(all); i++ {
item = all[i].(arg)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ go 1.15
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gorilla/websocket v1.4.2
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/urfave/cli/v2 v2.3.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
Expand Down