Skip to content

Commit

Permalink
Huobi: Add subscription configuration (#1604)
Browse files Browse the repository at this point in the history
* Huobi: Update test config format

* Huobi: Add subscription configuration

* Huobi: Add subscription documentation

* Huobi: Clarify OB sub Levels usage

* Huobi: Enable websocket for tests

* Subscriptions: Rename ErrPrivateChannelName

Rename ErrPrivateChannelName to ErrUseConstChannelName
  • Loading branch information
gbjk authored Nov 11, 2024
1 parent d4c4bf1 commit a2a7fed
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 98 deletions.
20 changes: 16 additions & 4 deletions cmd/documentation/exchanges_templates/huobi.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,24 @@ if err != nil {
}
```

### How to do Websocket public/private calls
### Subscriptions

```go
// Exchanges will be abstracted out in further updates and examples will be
// supplied then
All subscriptions are for spot only.

Default Public Subscriptions:
- Ticker
- Candles ( Interval: 1min )
- Orderbook ( Level: 0 - No aggregation )
- Configure Level: 1-5 for depth aggregation, for example:
```json
{"enabled": true, "channel": "orderbook", "asset": "spot", "levels": 1}
```
- Trades

Default Authenticated Subscriptions:
- Account Trades
- Account Orders
- Account Updates

### Please click GoDocs chevron above to view current GoDoc information for this package
{{template "contributions"}}
Expand Down
20 changes: 16 additions & 4 deletions exchanges/huobi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,24 @@ if err != nil {
}
```

### How to do Websocket public/private calls
### Subscriptions

```go
// Exchanges will be abstracted out in further updates and examples will be
// supplied then
All subscriptions are for spot only.

Default Public Subscriptions:
- Ticker
- Candles ( Interval: 1min )
- Orderbook ( Level: 0 - No aggregation )
- Configure Level: 1-5 for depth aggregation, for example:
```json
{"enabled": true, "channel": "orderbook", "asset": "spot", "levels": 1}
```
- Trades

Default Authenticated Subscriptions:
- Account Trades
- Account Orders
- Account Updates

### Please click GoDocs chevron above to view current GoDoc information for this package

Expand Down
64 changes: 64 additions & 0 deletions exchanges/huobi/huobi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -2927,3 +2929,65 @@ func TestGetCurrencyTradeURL(t *testing.T) {
}
}
}

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()

h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{}
for _, s := range h.Features.Subscriptions {
if s.Authenticated && !h.Websocket.CanUseAuthenticatedEndpoints() {
continue
}
for _, a := range h.GetAssetTypes(true) {
if s.Asset != asset.All && s.Asset != a {
continue
}
pairs, err := h.GetEnabledPairs(a)
require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a)
pairs = common.SortStrings(pairs).Format(currency.PairFormat{Uppercase: false, Delimiter: ""})
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.Asset = a
for i, p := range pairs {
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.QualifiedChannel = channelName(s, p)
switch s.Channel {
case subscription.OrderbookChannel:
s.QualifiedChannel += ".step0"
case subscription.CandlesChannel:
s.QualifiedChannel += ".1min"
}
s.Pairs = pairs[i : i+1]
exp = append(exp, s)
}
}
}
testsubs.EqualLists(t, exp, subs)
}

// TestSubscribe exercises live public subscriptions
func TestSubscribe(t *testing.T) {
t.Parallel()
h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.Features.Subscriptions.ExpandTemplates(h)
require.NoError(t, err, "ExpandTemplates must not error")
testexch.SetupWs(t, h)
err = h.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")
got := h.Websocket.GetSubscriptions()
require.Equal(t, 4, len(got), "Must get correct number of subscriptions")
for _, s := range got {
assert.Equal(t, subscription.SubscribedState, s.State())
}
}

func TestChannelName(t *testing.T) {
p := currency.NewPair(currency.BTC, currency.USD)
assert.Equal(t, "market.BTCUSD.kline", channelName(&subscription.Subscription{Channel: subscription.CandlesChannel}, p))
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: wsOrderbookChannel}, p) })
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: subscription.MyAccountChannel}, p) }, "Should panic on V2 endpoints until implemented")
}
175 changes: 97 additions & 78 deletions exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"text/template"
"time"

"github.com/gorilla/websocket"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
Expand All @@ -31,11 +33,13 @@ const (
baseWSURL = "wss://api.huobi.pro"
futuresWSURL = "wss://api.hbdm.com/"

wsMarketURL = baseWSURL + "/ws"
wsMarketKline = "market.%s.kline.1min"
wsMarketDepth = "market.%s.depth.step0"
wsMarketTrade = "market.%s.trade.detail"
wsMarketTicker = "market.%s.detail"
wsMarketURL = baseWSURL + "/ws"
wsCandlesChannel = "market.%s.kline"
wsOrderbookChannel = "market.%s.depth"
wsTradesChannel = "market.%s.trade.detail"
wsMarketDetailChannel = "market.%s.detail"
wsMyOrdersChannel = "orders.%s"
wsMyTradesChannel = "orders.%s.update"

wsAccountsOrdersEndPoint = "/ws/v1"
wsAccountsList = "accounts.list"
Expand All @@ -56,6 +60,28 @@ const (
loginDelay = 50 * time.Millisecond
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 0}, // Aggregation Levels; 0 is no depth aggregation
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyTradesChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
}

var subscriptionNames = map[string]string{
subscription.TickerChannel: wsMarketDetailChannel,
subscription.CandlesChannel: wsCandlesChannel,
subscription.OrderbookChannel: wsOrderbookChannel,
subscription.AllTradesChannel: wsTradesChannel,
/* TODO: Pending upcoming V2 support, these are dropped from the translation table so that the sub conf will be correct and not need upgrading, but will error on usage
subscription.MyTradesChannel: wsMyOrdersChannel,
subscription.MyOrdersChannel: wsMyTradesChannel,
subscription.MyAccountChannel: wsMyAccountChannel,
*/
}

// Instantiates a communications channel between websocket connections
var comms = make(chan WsMessage)

Expand Down Expand Up @@ -514,101 +540,66 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error {
return h.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (h *HUOBI) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{wsMarketKline,
wsMarketDepth,
wsMarketTrade,
wsMarketTicker}
var subscriptions subscription.List
if h.Websocket.CanUseAuthenticatedEndpoints() {
channels = append(channels, "orders.%v", "orders.%v.update")
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: "accounts",
})
}
enabledCurrencies, err := h.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
for i := range channels {
for j := range enabledCurrencies {
enabledCurrencies[j].Delimiter = ""
channel := fmt.Sprintf(channels[i],
enabledCurrencies[j].Lower().String())
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channel,
Pairs: currency.Pairs{enabledCurrencies[j]},
})
}
}
return subscriptions, nil
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (h *HUOBI) generateSubscriptions() (subscription.List, error) {
return h.Features.Subscriptions.ExpandTemplates(h)
}

// GetSubscriptionTemplate returns a subscription channel template
func (h *HUOBI) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"interval": h.FormatExchangeKlineInterval,
}).Parse(subTplText)
}

// Subscribe sends a websocket message to receive data from the channel
func (h *HUOBI) Subscribe(channelsToSubscribe subscription.List) error {
func (h *HUOBI) Subscribe(subs subscription.List) error {
ctx := context.Background()
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if len(subs.Private()) > 0 {
if creds, errs = h.GetCredentials(ctx); errs != nil {
return errs
}
}
var errs error
for i := range channelsToSubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToSubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToSubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"sub",
wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel,
channelsToSubscribe[i].Channel)
if s.Authenticated {
if err = h.wsAuthenticatedSubscribe(creds, "sub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel); err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, s)
}
} else {
err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{
Subscribe: channelsToSubscribe[i].Channel,
})
}
if err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, channelsToSubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
if err = h.Websocket.Conn.SendJSONMessage(ctx, request.Unset, WsRequest{Subscribe: s.QualifiedChannel}); err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.AuthConn, s)
}
}
errs = common.AppendError(errs, err)
}
return nil
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (h *HUOBI) Unsubscribe(channelsToUnsubscribe subscription.List) error {
func (h *HUOBI) Unsubscribe(subs subscription.List) error {
ctx := context.Background()
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if len(subs.Private()) > 0 {
if creds, errs = h.GetCredentials(ctx); errs != nil {
return errs
}
}
var errs error
for i := range channelsToUnsubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToUnsubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToUnsubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"unsub",
wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel,
channelsToUnsubscribe[i].Channel)
if s.Authenticated {
err = h.wsAuthenticatedSubscribe(creds, "unsub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel)
} else {
err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{
Unsubscribe: channelsToUnsubscribe[i].Channel,
})
err = h.Websocket.Conn.SendJSONMessage(ctx, request.Unset, WsRequest{Unsubscribe: s.QualifiedChannel})
}
if err == nil {
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, channelsToUnsubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, s)
}
errs = common.AppendError(errs, err)
}
return errs
}
Expand Down Expand Up @@ -810,3 +801,31 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe
}
return &response, nil
}

// channelName converts global channel Names used in config of channel input into exchange channel names
// returns the name unchanged if no match is found
func channelName(s *subscription.Subscription, p currency.Pair) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return fmt.Sprintf(n, p)
}
if s.Authenticated {
panic(fmt.Errorf("%w: Private endpoints not currently supported", common.ErrNotYetImplemented))
}
panic(subscription.ErrUseConstChannelName)
}

const subTplText = `
{{- if $.S.Asset }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs }}
{{- channelName $.S $p -}}
{{- if eq $.S.Channel "candles" -}} . {{- interval $.S.Interval }}{{ end }}
{{- if eq $.S.Channel "orderbook" -}} .step {{- $.S.Levels }}{{ end }}
{{ $.PairSeparator }}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
{{- else -}}
{{ channelName $.S nil }}
{{- end }}
`
3 changes: 2 additions & 1 deletion exchanges/huobi/huobi_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (h *HUOBI) SetDefaults() {
GlobalResultLimit: 2000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}

h.Requester, err = request.New(h.Name,
Expand Down Expand Up @@ -213,7 +214,7 @@ func (h *HUOBI) Setup(exch *config.Exchange) error {
Connector: h.WsConnect,
Subscriber: h.Subscribe,
Unsubscriber: h.Unsubscribe,
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
GenerateSubscriptions: h.generateSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
})
if err != nil {
Expand Down
Loading

0 comments on commit a2a7fed

Please sign in to comment.