diff --git a/pom.xml b/pom.xml
index 0250003299b..1d03b5371fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -188,6 +188,7 @@
xchange-stream-bitmex
xchange-stream-bitstamp
xchange-stream-btcmarkets
+ xchange-stream-bybit
xchange-stream-cexio
xchange-stream-coinbasepro
xchange-stream-coinjar
diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitDigest.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitDigest.java
index bb1e26e4ff3..e1642efa265 100644
--- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitDigest.java
+++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitDigest.java
@@ -28,7 +28,11 @@ public BybitDigest(String secretKeyBase64) {
}
public static ParamsDigest createInstance(String secretKeyBase64) {
- return new BybitDigest(secretKeyBase64);
+ if (secretKeyBase64 != null) {
+ return new BybitDigest(secretKeyBase64);
+ } else {
+ return null;
+ }
}
@SneakyThrows
diff --git a/xchange-stream-bybit/pom.xml b/xchange-stream-bybit/pom.xml
new file mode 100644
index 00000000000..71b2943843b
--- /dev/null
+++ b/xchange-stream-bybit/pom.xml
@@ -0,0 +1,31 @@
+
+
+
+ 4.0.0
+
+
+ org.knowm.xchange
+ xchange-parent
+ 5.1.2-SNAPSHOT
+
+
+ XChange Bybit Stream
+ xchange-stream-bybit
+
+
+
+ org.knowm.xchange
+ xchange-stream-core
+ ${project.parent.version}
+
+
+ org.knowm.xchange
+ xchange-bybit
+ ${project.parent.version}
+
+
+
+
+
\ No newline at end of file
diff --git a/xchange-stream-bybit/src/main/java/dto/BybitSubscribeMessage.java b/xchange-stream-bybit/src/main/java/dto/BybitSubscribeMessage.java
new file mode 100644
index 00000000000..e3c01595d52
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/dto/BybitSubscribeMessage.java
@@ -0,0 +1,13 @@
+package dto;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class BybitSubscribeMessage {
+ private final String op;
+ private final List args;
+ }
+
diff --git a/xchange-stream-bybit/src/main/java/dto/marketdata/BybitOrderbook.java b/xchange-stream-bybit/src/main/java/dto/marketdata/BybitOrderbook.java
new file mode 100644
index 00000000000..7ba6df89560
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/dto/marketdata/BybitOrderbook.java
@@ -0,0 +1,22 @@
+package dto.marketdata;
+
+import java.beans.ConstructorProperties;
+import lombok.Getter;
+
+@Getter
+public class BybitOrderbook {
+
+ private final String topic;
+ private final String dataType;
+ private final String ts;
+ private final BybitOrderbookData data;
+
+ @ConstructorProperties({"topic","type","ts","data"})
+ public BybitOrderbook(String topic, String dataType, String ts, BybitOrderbookData data) {
+ this.topic = topic;
+ this.dataType = dataType;
+ this.ts = ts;
+ this.data = data;
+ }
+
+}
diff --git a/xchange-stream-bybit/src/main/java/dto/marketdata/BybitOrderbookData.java b/xchange-stream-bybit/src/main/java/dto/marketdata/BybitOrderbookData.java
new file mode 100644
index 00000000000..fc613a392ce
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/dto/marketdata/BybitOrderbookData.java
@@ -0,0 +1,33 @@
+package dto.marketdata;
+
+import java.beans.ConstructorProperties;
+import java.util.List;
+import lombok.Getter;
+
+@Getter
+public class BybitOrderbookData {
+
+ private final String symbolName;
+ private final List bid;
+ private final List ask;
+ // Update ID. Is a sequence. Occasionally, you'll receive "u"=1, which is a snapshot data due to
+ // the restart of the service. So please overwrite your local orderbook
+ private final Integer u;
+ // Cross sequence
+ // You can use this field to compare different levels orderbook data, and for the smaller seq,
+ // then it means the data is generated earlier.
+ // in docs says than it is Integer, but in fact, we get in response bigger numbers
+ private final Long seq;
+
+ @ConstructorProperties({"s", "b", "a", "u", "seq"})
+ public BybitOrderbookData(String symbolName, List bid,
+ List ask,
+ Integer u,
+ Long seq) {
+ this.symbolName = symbolName;
+ this.bid = bid;
+ this.ask = ask;
+ this.u = u;
+ this.seq = seq;
+ }
+}
diff --git a/xchange-stream-bybit/src/main/java/dto/marketdata/BybitPublicOrder.java b/xchange-stream-bybit/src/main/java/dto/marketdata/BybitPublicOrder.java
new file mode 100644
index 00000000000..55954ca82a1
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/dto/marketdata/BybitPublicOrder.java
@@ -0,0 +1,18 @@
+package dto.marketdata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import lombok.Getter;
+
+@Getter
+public class BybitPublicOrder {
+
+ private final String price;
+ private final String size;
+
+ @JsonCreator
+ public BybitPublicOrder(String[] data) {
+ this.price = data[0];
+ this.size = data[1];
+ }
+}
+
diff --git a/xchange-stream-bybit/src/main/java/dto/trade/BybitTrade.java b/xchange-stream-bybit/src/main/java/dto/trade/BybitTrade.java
new file mode 100644
index 00000000000..a0ac503f2ef
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/dto/trade/BybitTrade.java
@@ -0,0 +1,42 @@
+package dto.trade;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.math.BigDecimal;
+import java.util.Date;
+import lombok.Getter;
+import org.knowm.xchange.bybit.dto.trade.BybitSide;
+
+@Getter
+public class BybitTrade {
+
+ //https://bybit-exchange.github.io/docs/v5/websocket/public/trad
+ //The timestamp (ms) that the order is filled
+ private final Date timestamp;
+ //Symbol name
+ private final String instId;
+ //Side of taker. Buy,Sell
+ private final BybitSide side;
+ private final BigDecimal tradeSize;
+ private final BigDecimal tradePrice;
+ //L string Direction of price change. Unique field for future
+ private final String direction;
+ private final String tradeId;
+ //boolean Whether it is a block trade order or not
+ private final boolean bT;
+
+
+ public BybitTrade(@JsonProperty("T") Date timestamp, @JsonProperty("s") String instId,
+ @JsonProperty("S") String side, @JsonProperty("v") BigDecimal tradeSize,
+ @JsonProperty("p") BigDecimal tradePrice, @JsonProperty("L") String direction,
+ @JsonProperty("i") String tradeId, @JsonProperty("BT") boolean bT) {
+ this.timestamp = timestamp;
+ this.instId = instId;
+ this.side = BybitSide.valueOf(side.toUpperCase());
+ this.tradeSize = tradeSize;
+ this.tradePrice = tradePrice;
+ this.direction = direction;
+ this.tradeId = tradeId;
+ this.bT = bT;
+ }
+}
+
diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamAdapters.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamAdapters.java
new file mode 100644
index 00000000000..d5b1808cc01
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamAdapters.java
@@ -0,0 +1,62 @@
+package info.bitrich.xchangestream.bybit;
+
+import static org.knowm.xchange.bybit.BybitAdapters.getOrderType;
+
+import dto.marketdata.BybitPublicOrder;
+import dto.trade.BybitTrade;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import dto.marketdata.BybitOrderbook;
+import org.knowm.xchange.dto.Order;
+import org.knowm.xchange.dto.Order.OrderType;
+import org.knowm.xchange.dto.marketdata.OrderBook;
+import org.knowm.xchange.dto.marketdata.Trade;
+import org.knowm.xchange.dto.marketdata.Trades;
+import org.knowm.xchange.dto.trade.LimitOrder;
+import org.knowm.xchange.instrument.Instrument;
+
+public class BybitStreamAdapters {
+ public static OrderBook adaptOrderBook(BybitOrderbook bybitOrderbooks, Instrument instrument) {
+ List asks = new ArrayList<>();
+ List bids = new ArrayList<>();
+ Date timestamp = new Date(Long.parseLong(bybitOrderbooks.getTs()));
+ bybitOrderbooks.getData().getAsk()
+ .forEach(bybitAsk -> asks.add(
+ adaptOrderBookOrder(bybitAsk, instrument, OrderType.ASK,timestamp)));
+
+ bybitOrderbooks.getData().getBid()
+ .forEach(bybitBid -> bids.add(
+ adaptOrderBookOrder(bybitBid, instrument, OrderType.BID,timestamp)));
+
+ return new OrderBook(timestamp,asks, bids);
+ }
+
+ public static Trades adaptTrades(List bybitTrades, Instrument instrument) {
+ List trades = new ArrayList<>();
+
+ bybitTrades.forEach(
+ bybitTrade ->
+ trades.add(
+ new Trade.Builder()
+ .id(bybitTrade.getTradeId())
+ .instrument(instrument)
+ .originalAmount(bybitTrade.getTradeSize())
+ .price(bybitTrade.getTradePrice())
+ .timestamp(bybitTrade.getTimestamp())
+ .type(getOrderType(bybitTrade.getSide()))
+ .build()));
+
+ return new Trades(trades);
+ }
+
+ public static LimitOrder adaptOrderBookOrder(BybitPublicOrder bybitPublicOrder,
+ Instrument instrument, Order.OrderType orderType, Date timestamp) {
+
+ return new LimitOrder(orderType, new BigDecimal(bybitPublicOrder.getSize()), instrument, "",
+ timestamp, new BigDecimal(bybitPublicOrder.getPrice()));
+ }
+
+
+}
diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java
new file mode 100644
index 00000000000..5482484bb0e
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java
@@ -0,0 +1,76 @@
+package info.bitrich.xchangestream.bybit;
+
+import info.bitrich.xchangestream.core.ProductSubscription;
+import info.bitrich.xchangestream.core.StreamingExchange;
+import io.reactivex.Completable;
+import org.knowm.xchange.bybit.BybitExchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BybitStreamingExchange extends BybitExchange implements StreamingExchange {
+
+ private final Logger LOG = LoggerFactory.getLogger(BybitStreamingExchange.class);
+
+ //https://bybit-exchange.github.io/docs/v5/ws/connect
+ public static final String URI = "wss://stream.bybit.com/v5/public";
+ public static final String TESTNET_URI = "wss://stream-testnet.bybit.com/v5/public";
+ public static final String AUTH_URI = "wss://stream.bybit.com/v5/private";
+ public static final String TESTNET_AUTH_URI = "wss://stream-testnet.bybit.com/v5/private";
+
+ //spot, linear, inverse or option
+ public static final String EXCHANGE_TYPE = "EXCHANGE_TYPE";
+
+ private BybitStreamingService streamingService;
+ private BybitStreamingMarketDataService streamingMarketDataService;
+
+ @Override
+ protected void initServices() {
+ super.initServices();
+ this.streamingService = new BybitStreamingService(getApiUrl(),
+ exchangeSpecification.getExchangeSpecificParametersItem(EXCHANGE_TYPE));
+ this.streamingMarketDataService = new BybitStreamingMarketDataService(streamingService);
+ }
+
+ private String getApiUrl() {
+ String apiUrl = null;
+ if (exchangeSpecification.getApiKey() == null) {
+ if (Boolean.TRUE.equals(
+ exchangeSpecification.getExchangeSpecificParametersItem(USE_SANDBOX))) {
+ apiUrl = TESTNET_URI;
+ } else {
+ apiUrl = URI;
+ }
+ apiUrl += "/" + exchangeSpecification.getExchangeSpecificParametersItem(EXCHANGE_TYPE);
+ }
+// TODO auth
+ return apiUrl;
+ }
+
+ @Override
+ public Completable connect(ProductSubscription... args) {
+ LOG.info("Connect to BybitStream");
+ return streamingService.connect();
+ }
+
+ @Override
+ public Completable disconnect() {
+ streamingService.pingPongDisconnectIfConnected();
+ return streamingService.disconnect();
+ }
+
+ @Override
+ public boolean isAlive() {
+ return streamingService != null && streamingService.isSocketOpen();
+ }
+
+ @Override
+ public void useCompressedMessages(boolean compressedMessages) {
+ streamingService.useCompressedMessages(compressedMessages);
+ }
+
+ @Override
+ public BybitStreamingMarketDataService getStreamingMarketDataService() {
+ return streamingMarketDataService;
+ }
+
+}
diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.java
new file mode 100644
index 00000000000..2fd375b44c5
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.java
@@ -0,0 +1,171 @@
+package info.bitrich.xchangestream.bybit;
+
+import static org.knowm.xchange.bybit.BybitAdapters.convertToBybitSymbol;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import dto.marketdata.BybitPublicOrder;
+import info.bitrich.xchangestream.core.StreamingMarketDataService;
+import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
+import io.reactivex.Observable;
+import io.reactivex.subjects.PublishSubject;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import dto.marketdata.BybitOrderbook;
+import dto.trade.BybitTrade;
+import java.util.concurrent.atomic.AtomicLong;
+import org.knowm.xchange.dto.Order;
+import org.knowm.xchange.dto.marketdata.OrderBook;
+import org.knowm.xchange.dto.marketdata.OrderBookUpdate;
+import org.knowm.xchange.dto.marketdata.Trade;
+import org.knowm.xchange.instrument.Instrument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BybitStreamingMarketDataService implements StreamingMarketDataService {
+
+ private final Logger LOG = LoggerFactory.getLogger(BybitStreamingMarketDataService.class);
+ private final BybitStreamingService streamingService;
+ private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
+ public static final String TRADE = "publicTrade.";
+ public static final String ORDERBOOK = "orderbook.";
+ public static final String TICKER = "tickers.";
+
+ private final Map orderBookMap = new HashMap<>();
+ private final Map>>
+ orderBookUpdatesSubscriptions;
+
+ public BybitStreamingMarketDataService(BybitStreamingService streamingService) {
+ this.streamingService = streamingService;
+ this.orderBookUpdatesSubscriptions = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Linear & inverse: Level 1 data, push frequency: 10ms Level 50 data, push frequency: 20ms Level
+ * 200 data, push frequency: 100ms Level 500 data, push frequency: 100ms Spot: Level 1 data, push
+ * frequency: 10ms Level 50 data, push frequency: 20ms Level 200 data, push frequency: 200ms
+ *
+ * @param args - orderbook depth
+ **/
+ @Override
+ public Observable getOrderBook(Instrument instrument, Object... args) {
+ String depth = "50";
+ AtomicLong orderBookUpdateIdPrev = new AtomicLong();
+ if (args.length > 0 && args[0] != null) {
+ depth = args[0].toString();
+ }
+ String channelUniqueId =
+ ORDERBOOK + depth + "." + convertToBybitSymbol(instrument);
+ return streamingService
+ .subscribeChannel(channelUniqueId)
+ .flatMap(
+ jsonNode -> {
+ BybitOrderbook bybitOrderbooks =
+ mapper.treeToValue(jsonNode, BybitOrderbook.class);
+ String type = bybitOrderbooks.getDataType();
+ if (type.equalsIgnoreCase("snapshot")) {
+ OrderBook orderBook = BybitStreamAdapters.adaptOrderBook(bybitOrderbooks, instrument);
+ orderBookUpdateIdPrev.set(bybitOrderbooks.getData().getU());
+ orderBookMap.put(channelUniqueId, orderBook);
+ return Observable.just(orderBook);
+ } else if (type.equalsIgnoreCase("delta")) {
+ return applyDeltaSnapshot(channelUniqueId, instrument, bybitOrderbooks, orderBookUpdateIdPrev);
+ }
+ return Observable.fromIterable(new LinkedList<>());
+ });
+ }
+
+ private Observable applyDeltaSnapshot(String channelUniqueId, Instrument instrument,
+ BybitOrderbook bybitOrderBookUpdate,AtomicLong orderBookUpdateIdPrev) {
+ OrderBook orderBook = orderBookMap.getOrDefault(channelUniqueId, null);
+ if (orderBook == null) {
+ LOG.error("Failed to get orderBook, channelUniqueId= {}", channelUniqueId);
+ return Observable.fromIterable(new LinkedList<>());
+ }
+ if (orderBookUpdateIdPrev.incrementAndGet() == bybitOrderBookUpdate.getData().getU()) {
+ LOG.debug("orderBookUpdate id {}, seq {} ", bybitOrderBookUpdate.getData().getU(),
+ bybitOrderBookUpdate.getData().getSeq());
+ List asks = bybitOrderBookUpdate.getData().getAsk();
+ List bids = bybitOrderBookUpdate.getData().getBid();
+ Date timestamp = new Date(Long.parseLong(bybitOrderBookUpdate.getTs()));
+ asks.forEach(
+ bybitPublicOrder ->
+ orderBook.update(
+ BybitStreamAdapters.adaptOrderBookOrder(
+ bybitPublicOrder, instrument, Order.OrderType.ASK, timestamp)));
+ bids.forEach(
+ bybitPublicOrder ->
+ orderBook.update(
+ BybitStreamAdapters.adaptOrderBookOrder(
+ bybitPublicOrder, instrument, Order.OrderType.BID, timestamp)));
+ if (orderBookUpdatesSubscriptions.get(instrument) != null) {
+ orderBookUpdatesSubscriptions(instrument, asks, bids, timestamp);
+ }
+ return Observable.just(orderBook);
+ } else {
+ LOG.error("orderBookUpdate id sequence failed, expected {}, in fact {}",
+ orderBookUpdateIdPrev,
+ bybitOrderBookUpdate.getData().getU());
+ // resubscribe or what here?
+ return Observable.fromIterable(new LinkedList<>());
+ }
+ }
+
+ @Override
+ public Observable> getOrderBookUpdates(Instrument instrument,Object... args) {
+ return orderBookUpdatesSubscriptions.computeIfAbsent(instrument, v -> PublishSubject.create());
+ }
+
+ private void orderBookUpdatesSubscriptions(
+ Instrument instrument, List asks, List bids, Date date) {
+ List orderBookUpdates = new ArrayList<>();
+ for (BybitPublicOrder ask : asks) {
+ OrderBookUpdate o =
+ new OrderBookUpdate(
+ Order.OrderType.ASK,
+ new BigDecimal(ask.getSize()),
+ instrument,
+ new BigDecimal(ask.getPrice()),
+ date,
+ new BigDecimal(ask.getSize()));
+ orderBookUpdates.add(o);
+ }
+ for (BybitPublicOrder bid : bids) {
+ OrderBookUpdate o =
+ new OrderBookUpdate(
+ Order.OrderType.BID,
+ new BigDecimal(bid.getSize()),
+ instrument,
+ new BigDecimal(bid.getPrice()),
+ date,
+ new BigDecimal(bid.getSize()));
+ orderBookUpdates.add(o);
+ }
+ orderBookUpdatesSubscriptions.get(instrument).onNext(orderBookUpdates);
+ }
+
+ @Override
+ public Observable getTrades(Instrument instrument, Object... args) {
+ String channelUniqueId =
+ TRADE + convertToBybitSymbol(instrument);
+
+ return streamingService
+ .subscribeChannel(channelUniqueId)
+ .filter(message -> message.has("data"))
+ .flatMap(
+ jsonNode -> {
+ List bybitTradeList =
+ mapper.treeToValue(
+ jsonNode.get("data"),
+ mapper.getTypeFactory()
+ .constructCollectionType(List.class, BybitTrade.class));
+ return Observable.fromIterable(
+ BybitStreamAdapters.adaptTrades(bybitTradeList, instrument).getTrades());
+ });
+ }
+}
diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java
new file mode 100644
index 00000000000..d1d0e57006b
--- /dev/null
+++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java
@@ -0,0 +1,107 @@
+package info.bitrich.xchangestream.bybit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import dto.BybitSubscribeMessage;
+import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
+import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler;
+import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
+import io.reactivex.Completable;
+import io.reactivex.CompletableSource;
+import io.reactivex.Observable;
+import io.reactivex.disposables.Disposable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BybitStreamingService extends JsonNettyStreamingService {
+
+ private final Logger LOG = LoggerFactory.getLogger(BybitStreamingService.class);
+ public final String exchange_type;
+ private final Observable pingPongSrc = Observable.interval(15, 20, TimeUnit.SECONDS);
+ private Disposable pingPongSubscription;
+
+ public BybitStreamingService(String apiUrl, Object exchange_type) {
+ super(apiUrl);
+ this.exchange_type = (String) exchange_type;
+// this.setEnableLoggingHandler(true);
+ }
+
+ @Override
+ public Completable connect() {
+ Completable conn = super.connect();
+ return conn.andThen(
+ (CompletableSource)
+ (completable) -> {
+ pingPongDisconnectIfConnected();
+ pingPongSubscription = pingPongSrc.subscribe(
+ o -> this.sendMessage("{\"op\":\"ping\"}"));
+ completable.onComplete();
+ });
+ }
+
+ @Override
+ protected String getChannelNameFromMessage(JsonNode message) {
+ if (message.has("topic")) {
+ return message.get("topic").asText();
+ }
+ return "";
+ }
+
+ @Override
+ public String getSubscribeMessage(String channelName, Object... args) throws IOException {
+ LOG.info(" getSubscribeMessage {}", channelName);
+ return objectMapper.writeValueAsString(
+ new BybitSubscribeMessage("subscribe", Collections.singletonList(channelName)));
+ }
+
+ @Override
+ public String getUnsubscribeMessage(String channelName, Object... args) throws IOException {
+ LOG.info(" getUnsubscribeMessage {}", channelName);
+ return objectMapper.writeValueAsString(
+ new BybitSubscribeMessage("unsubscribe", Collections.singletonList(channelName)));
+ }
+
+ @Override
+ public void messageHandler(String message) {
+ LOG.debug("Received message: {}", message);
+ JsonNode jsonNode;
+ try {
+ jsonNode = objectMapper.readTree(message);
+ } catch (IOException e) {
+ LOG.error("Error parsing incoming message to JSON: {}", message);
+ return;
+ }
+ String op = "";
+ boolean success = false;
+ if (jsonNode.has("op")) {
+ op = jsonNode.get("op").asText();
+ }
+ if (jsonNode.has("success")) {
+ success = jsonNode.get("success").asBoolean();
+ }
+ if (success) {
+ switch (op) {
+ case "pong":
+ case "subscribe":
+ case "unsubscribe": {
+ break;
+ }
+ }
+ return;
+ }
+ handleMessage(jsonNode);
+ }
+
+ public void pingPongDisconnectIfConnected() {
+ if (pingPongSubscription != null && !pingPongSubscription.isDisposed()) {
+ pingPongSubscription.dispose();
+ }
+ }
+
+ @Override
+ protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
+ return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE;
+ }
+}
diff --git a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/BybitStreamExample.java b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/BybitStreamExample.java
new file mode 100644
index 00000000000..5a3dc135d6d
--- /dev/null
+++ b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/BybitStreamExample.java
@@ -0,0 +1,163 @@
+package info.bitrich.xchangestream.bybit;
+
+import static org.knowm.xchange.Exchange.USE_SANDBOX;
+import static org.knowm.xchange.bybit.BybitExchange.SPECIFIC_PARAM_ACCOUNT_TYPE;
+
+import info.bitrich.xchangestream.core.StreamingExchange;
+import info.bitrich.xchangestream.core.StreamingExchangeFactory;
+import io.reactivex.disposables.Disposable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.knowm.xchange.ExchangeSpecification;
+import org.knowm.xchange.bybit.dto.BybitCategory;
+import org.knowm.xchange.bybit.dto.account.walletbalance.BybitAccountType;
+import org.knowm.xchange.currency.CurrencyPair;
+import org.knowm.xchange.derivative.FuturesContract;
+import org.knowm.xchange.derivative.OptionsContract;
+import org.knowm.xchange.instrument.Instrument;
+
+
+public class BybitStreamExample {
+
+ public static void main(String[] args) {
+ try {
+ spot();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ futures();
+ }
+
+ private static final Instrument BTC_PERP = new FuturesContract("BTC/USDT/PERP");
+ private static final Instrument ETH_PERP = new CurrencyPair("ETH/USDT/PERP");
+ private static final Instrument BTC_SPOT = new CurrencyPair("BTC/USDT");
+ private static final Instrument ETH_SPOT = new CurrencyPair("ETH/USDT");
+
+ public static void spot() throws IOException {
+ ExchangeSpecification exchangeSpecification =
+ new ExchangeSpecification(BybitStreamingExchange.class);
+ exchangeSpecification.setExchangeSpecificParametersItem(USE_SANDBOX, true);
+ exchangeSpecification.setExchangeSpecificParametersItem(BybitStreamingExchange.EXCHANGE_TYPE,
+ BybitCategory.SPOT.getValue());
+ exchangeSpecification.setExchangeSpecificParametersItem(SPECIFIC_PARAM_ACCOUNT_TYPE,
+ BybitAccountType.UNIFIED);
+ StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(
+ exchangeSpecification);
+ exchange.connect().blockingAwait();
+ int count_currencyPair = 0;
+ int count_futureContractPerp = 0;
+ int count_futureContractDate = 0;
+ List instruments = new ArrayList<>(exchange
+ .getExchangeMetaData()
+ .getInstruments().keySet());
+ for (Instrument instrument : instruments) {
+ if (instrument instanceof CurrencyPair) {
+ count_currencyPair++;
+ }
+ if (instrument instanceof FuturesContract) {
+ count_futureContractPerp++;
+ }
+ if (instrument instanceof OptionsContract) {
+ count_futureContractDate++;
+ }
+ }
+ System.out.println("Currency pairs: " + count_currencyPair);
+ System.out.println("Futures: " + count_futureContractPerp);
+ System.out.println("Futures date: " + count_futureContractDate);
+
+ System.out.println(exchange.getMarketDataService().getTicker(BTC_SPOT));
+ System.out.println(exchange.getMarketDataService().getTicker(BTC_PERP));
+ List tradesDisposable = new ArrayList<>();
+ Disposable bookDisposable =
+ exchange
+ .getStreamingMarketDataService()
+ .getOrderBook(ETH_SPOT)
+ .subscribe();
+ tradesDisposable.add(exchange
+ .getStreamingMarketDataService().getTrades(ETH_SPOT).subscribe(
+ trade -> System.out.println("trade: " + trade)));
+ tradesDisposable.add(exchange
+ .getStreamingMarketDataService().getTrades(BTC_SPOT).subscribe(
+ trade -> System.out.println("trade: " + trade)));
+ try {
+ Thread.sleep(2000);
+ } catch (
+ InterruptedException ignored) {
+ }
+ bookDisposable.dispose();
+ for (Disposable disposable : tradesDisposable) {
+ disposable.dispose();
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (
+ InterruptedException ignored) {
+ }
+ exchange.disconnect().blockingAwait();
+ }
+
+
+ public static void futures() {
+ ExchangeSpecification exchangeSpecification =
+ new ExchangeSpecification(BybitStreamingExchange.class);
+ exchangeSpecification.setExchangeSpecificParametersItem(SPECIFIC_PARAM_ACCOUNT_TYPE,
+ BybitAccountType.UNIFIED);
+ exchangeSpecification.setExchangeSpecificParametersItem(BybitStreamingExchange.EXCHANGE_TYPE,
+ BybitCategory.LINEAR.getValue());
+ StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(
+ exchangeSpecification);
+ exchange.connect().blockingAwait();
+ List tradesDisposable = new ArrayList<>();
+ List booksDisposable = new ArrayList<>();
+ List booksUpdatesDisposable = new ArrayList<>();
+ booksDisposable.add(exchange
+ .getStreamingMarketDataService()
+ .getOrderBook(BTC_PERP)
+ .subscribe());
+ booksDisposable.add(exchange
+ .getStreamingMarketDataService()
+ .getOrderBook(ETH_PERP)
+ .subscribe());
+ booksUpdatesDisposable.add(exchange
+ .getStreamingMarketDataService()
+ .getOrderBookUpdates(BTC_PERP)
+ .subscribe(
+ orderBookUpdates -> System.out.printf("orderBookUpdates: %s\n", orderBookUpdates)
+ ));
+ booksUpdatesDisposable.add(exchange
+ .getStreamingMarketDataService()
+ .getOrderBookUpdates(ETH_PERP)
+ .subscribe(
+ orderBookUpdates -> System.out.printf("orderBookUpdates: %s\n", orderBookUpdates)
+ ));
+ tradesDisposable.add(exchange
+ .getStreamingMarketDataService().getTrades(BTC_PERP).subscribe(
+ trade -> System.out.println("trade: " + trade)));
+ tradesDisposable.add(exchange
+ .getStreamingMarketDataService().getTrades(ETH_PERP).subscribe(
+ trade -> System.out.println("trade: " + trade)));
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
+
+ }
+ for (Disposable disposable : booksUpdatesDisposable) {
+ disposable.dispose();
+ }
+ for (Disposable disposable : booksDisposable) {
+ disposable.dispose();
+ }
+ for (Disposable disposable : tradesDisposable) {
+ disposable.dispose();
+ }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ignored) {
+
+ }
+ exchange.disconnect().blockingAwait();
+ }
+
+}
diff --git a/xchange-stream-bybit/src/test/resources/logback.xml b/xchange-stream-bybit/src/test/resources/logback.xml
new file mode 100644
index 00000000000..fabc864484c
--- /dev/null
+++ b/xchange-stream-bybit/src/test/resources/logback.xml
@@ -0,0 +1,27 @@
+
+
+
+
+
+
+
+
+ %d{HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{36} - %msg %xEx%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file