Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccxt implementation #11

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions app/default.env

This file was deleted.

57 changes: 57 additions & 0 deletions app/exchange_harness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from json import loads
from datetime import datetime
import settings
import logging
import requests
import utils
import ccxt
from time import sleep
class ExchangeHarness(object):
"""Poloniex Market Data"""
def __init__(self,exchange_id):
self.symbols = ['BTC/USD','ETH/USD']
self.exchange_id = exchange_id.lower()
self.products = {'ETH/USD': 'eth.{}.ticker'.format(self.exchange_id),
'BTC/USD': 'btc.{}.ticker'.format(self.exchange_id)}
self.exchange = getattr(ccxt,self.exchange_id)({
'enableRateLimit': True, # this option enables the built-in rate limiter
})
# self.markets = self.exchange.load_markets()
def clean_ticker(self,data):
clean_data = dict()
now = datetime.utcnow()
clean_data['tracker_time'] = now
clean_data["ask"] = float(data["ask"])
clean_data["bid"] = float(data["bid"])
clean_data["price"] = float(data["last"])
clean_data["exchange"] = self.exchange_id
clean_data["product"] = data['symbol']
clean_data['info'] = data['info']
clean_data["size"] = float(data['baseVolume'])
clean_data["volume"] = float(data['quoteVolume'])
clean_data['time'] = data['timestamp']
for k,v in data.items():
if k not in clean_data.keys():
clean_data[k]=v
# if not clean_data["last"]:
# clean_data['last'] =
return clean_data

def get_ticker(self,symbol):
ticker = self.exchange.fetch_ticker(symbol)
clean = self.clean_ticker(ticker)
return clean

def record_ticker(self, es):
"""Record current tick"""
for product in self.products.keys():
es_body=self.get_ticker(product)
# print(es_body)
# if 'price' in es_body:
try:
es.create(index=self.products[product], id=utils.generate_nonce(), doc_type='ticker', body=es_body)
except:
raise ValueError("Misformed Body for Elastic Search on " + self.exchange_id)

2 changes: 2 additions & 0 deletions app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ six==1.10.0
urllib3==1.21.1
websocket==0.2.1
websocket-client==0.40.0
futures
ccxt
48 changes: 21 additions & 27 deletions app/tracker.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,52 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
@authors: dconroy
avelkoski
"""
from elasticsearch import Elasticsearch, helpers
from public.bitfinex import BitFinex_Market
from public.bitmex import BitMex_Market
from public.bittrex import BitTrex_Market
from public.gdax import GDAX_Market
from public.gemini import Gemini_Market
from public.kraken import Kraken_Market
from public.okcoin import OKCoin_Market
from public.poloniex import Poloniex_Market
from dotenv import Dotenv
from time import sleep
from exchange_harness import ExchangeHarness
import logging
import schedule
# import schedule
import settings
import utils
import random
import time
from concurrent.futures import ThreadPoolExecutor
from time import sleep

def main():
logging.basicConfig(format='%(levelname)s:%(asctime)s %(message)s',level=settings.LOGLEVEL)
es = Elasticsearch(settings.ELASTICSEARCH_CONNECT_STRING)

logging.info('Market Refresh Rate: ' + str(settings.MARKET_REFRESH_RATE) + ' seconds.')
logging.info('Initial Sleep: ' + str(settings.INITIAL_SLEEP) + ' seconds.')
logging.info('Initial Sleep: ' + str(5) + ' seconds.')


sleep(settings.INITIAL_SLEEP)
logging.info('Application Started.')
#supported_exchanges = [BitFinex_Market(), BitMex_Market(), BitTrex_Market(), GDAX_Market(), Gemini_Market(), Kraken_Market(), OKCoin_Market(), Poloniex_Market()]
exchanges = [BitFinex_Market(), BitMex_Market(), BitTrex_Market(), GDAX_Market(), Gemini_Market(), Kraken_Market(), OKCoin_Market(), Poloniex_Market()]
tmp = ['bitstamp', 'gdax', 'kraken', 'gemini']
exchanges = [ExchangeHarness(x) for x in tmp]


#print active exchanges and create indexes in kibana based on products listed in each market
for exchange in exchanges:
logging.info(exchange.exchange + ': activated and indexed.')
for product, kibana_index in exchange.products.iteritems():
logging.info(exchange.exchange_id + ': activated and indexed.')
for product, kibana_index in exchange.products.items():
utils.create_index(es, kibana_index)

logging.warn('Initiating Market Tracking.')

#Record Ticks
while True:
sleep(settings.MARKET_REFRESH_RATE)
try:
for exchange in exchanges:
exchange.record_ticker(es)
with ThreadPoolExecutor(max_workers=5) as executor:
try:
sleep(settings.MARKET_REFRESH_RATE)
executor.map(lambda ex: ex.record_ticker(es), exchanges)
logging.info("added another ticker record")
except Exception as e:
logging.warning(e)
sleep(settings.RETRY_RATE)



except Exception as e:
logging.warning(e)
sleep(settings.RETRY_RATE)

if __name__ == '__main__':

main()