diff --git a/main.py b/main.py index 1261540..fc79121 100755 --- a/main.py +++ b/main.py @@ -2,10 +2,14 @@ # coding: utf-8 """Download historical candlestick data for all trading pairs on Binance.com. -All trading pair data is checked for integrity, sorted and saved as both a CSV -and a Parquet file. The CSV files act as a raw buffer on every update round. +All trading pair data is checked for integrity, sorted and saved as a Parquet +file and optionally as a CSV file. The Parquet files are much more space efficient (~50GB vs ~10GB) and are therefore the files used to upload to Kaggle after each run. + +It is possible to circumvent the need for CSV buffers and update the parquet +files directly by setting CIRCUMVENT_CSV to True. This makes it easy to keep +the parquet files up to date after you have downloaded them from kaggle. """ __author__ = 'GOSUTO.AI' @@ -16,11 +20,20 @@ import subprocess import time from datetime import date, datetime, timedelta - +from progressbar import ProgressBar +import pyarrow.parquet as pq import requests import pandas as pd - import preprocessing as pp +in_pycharm = "PYCHARM_HOSTED" in os.environ + +BATCH_SIZE = 1000 # Number of candles to ask for in each API request. +SHAVE_OFF_TODAY = False # Whether to shave off candles after last midnight to equalize end-time of all datasets. +CIRCUMVENT_CSV = True # Whether to use the parquet files directly when updating data. +UPLOAD_TO_KAGGLE = False # Whether to upload the parquet files to kaggle after updating. +SKIP_DELISTED = True +COMPRESSED_PATH = r'C:\Users\magla\Documents\Datasets\binance_pairs' +CSV_PATH = 'data' API_BASE = 'https://api.binance.com/api/v3/' @@ -60,7 +73,7 @@ def write_metadata(n_count): METADATA['subtitle'] = f'1 minute candlesticks for all {n_count} cryptocurrency pairs' METADATA['description'] = f"""### Introduction\n\nThis is a collection of all 1 minute candlesticks of all cryptocurrency pairs on [Binance.com](https://binance.com). All {n_count} of them are included. Both retrieval and uploading the data is fully automated—see [this GitHub repo](https://github.com/gosuto-ai/candlestick_retriever).\n\n### Content\n\nFor every trading pair, the following fields from [Binance's official API endpoint for historical candlestick data](https://github.com/binance-exchange/binance-official-api-docs/blob/master/rest-api.md#klinecandlestick-data) are saved into a Parquet file:\n\n```\n # Column Dtype \n--- ------ ----- \n 0 open_time datetime64[ns]\n 1 open float32 \n 2 high float32 \n 3 low float32 \n 4 close float32 \n 5 volume float32 \n 6 quote_asset_volume float32 \n 7 number_of_trades uint16 \n 8 taker_buy_base_asset_volume float32 \n 9 taker_buy_quote_asset_volume float32 \ndtypes: datetime64[ns](1), float32(8), uint16(1)\n```\n\nThe dataframe is indexed by `open_time` and sorted from oldest to newest. The first row starts at the first timestamp available on the exchange, which is July 2017 for the longest running pairs.\n\nHere are two simple plots based on a single file; one of the opening price with an added indicator (MA50) and one of the volume and number of trades:\n\n![](https://www.googleapis.com/download/storage/v1/b/kaggle-user-content/o/inbox%2F2234678%2Fb8664e6f26dc84e9a40d5a3d915c9640%2Fdownload.png?generation=1582053879538546&alt=media)\n![](https://www.googleapis.com/download/storage/v1/b/kaggle-user-content/o/inbox%2F2234678%2Fcd04ed586b08c1576a7b67d163ad9889%2Fdownload-1.png?generation=1582053899082078&alt=media)\n\n### Inspiration\n\nOne obvious use-case for this data could be technical analysis by adding indicators such as moving averages, MACD, RSI, etc. Other approaches could include backtesting trading algorithms or computing arbitrage potential with other exchanges.\n\n### License\n\nThis data is being collected automatically from crypto exchange Binance.""" - with open('compressed/dataset-metadata.json', 'w') as file: + with open(f'{COMPRESSED_PATH}/dataset-metadata.json', 'w') as file: json.dump(METADATA, file, indent=4) @@ -76,121 +89,246 @@ def get_batch(symbol, interval='1m', start_time=0, limit=1000): 'limit': limit } try: - response = requests.get(f'{API_BASE}klines', params) + # timeout should also be given as a parameter to the function + response = requests.get(f'{API_BASE}klines', params, timeout=30) except requests.exceptions.ConnectionError: - print('Cooling down for 5 mins...') + print('Connection error, Cooling down for 5 mins...') time.sleep(5 * 60) return get_batch(symbol, interval, start_time, limit) + + except requests.exceptions.Timeout: + print('Timeout, Cooling down for 5 min...') + time.sleep(5 * 60) + return get_batch(symbol, interval, start_time, limit) + + except requests.exceptions.ConnectionResetError: + print('Connection reset by peer, Cooling down for 5 min...') + time.sleep(5 * 60) + return get_batch(symbol, interval, start_time, limit) + if response.status_code == 200: return pd.DataFrame(response.json(), columns=LABELS) print(f'Got erroneous response back: {response}') return pd.DataFrame([]) -def all_candles_to_csv(base, quote, interval='1m'): - """Collect a list of candlestick batches with all candlesticks of a trading pair, - concat into a dataframe and write it to CSV. +def gather_new_candles(base, quote, last_timestamp, interval='1m'): + """ + Gather all candlesticks available, starting from the last timestamp loaded from disk or from beginning of time. + Stop if the timestamp that comes back from the api is the same as the last one. """ - - # see if there is any data saved on disk already - try: - batches = [pd.read_csv(f'data/{base}-{quote}.csv')] - last_timestamp = batches[-1]['open_time'].max() - except FileNotFoundError: - batches = [pd.DataFrame([], columns=LABELS)] - last_timestamp = 0 - old_lines = len(batches[-1].index) - - # gather all candlesticks available, starting from the last timestamp loaded from disk or 0 - # stop if the timestamp that comes back from the api is the same as the last one previous_timestamp = None + batches = [pd.DataFrame([], columns=LABELS)] + first_read = True + start_datetime = None + bar = None + print(f"Last timestamp of pair {base}-{quote} was {datetime.fromtimestamp(last_timestamp / 1000)}.") while previous_timestamp != last_timestamp: - # stop if we reached data from today - if date.fromtimestamp(last_timestamp / 1000) >= date.today(): - break - previous_timestamp = last_timestamp new_batch = get_batch( symbol=base+quote, interval=interval, - start_time=last_timestamp+1 + start_time=last_timestamp+1, + limit=BATCH_SIZE ) - # requesting candles from the future returns empty # also stop in case response code was not 200 if new_batch.empty: break last_timestamp = new_batch['open_time'].max() - # sometimes no new trades took place yet on date.today(); # in this case the batch is nothing new if previous_timestamp == last_timestamp: break batches.append(new_batch) - last_datetime = datetime.fromtimestamp(last_timestamp / 1000) - covering_spaces = 20 * ' ' - print(datetime.now(), base, quote, interval, str(last_datetime)+covering_spaces, end='\r', flush=True) + #Get info for progressbar + if first_read: + start_datetime = datetime.fromtimestamp(new_batch['open_time'][0] / 1000) + missing_data_timedelta = datetime.now() - start_datetime + total_minutes_of_data = int(missing_data_timedelta.total_seconds()/60) + print(f"Will fetch {missing_data_timedelta} ({total_minutes_of_data} minutes) worth of candles.") + if in_pycharm: time.sleep(0.2) + first_read = False + if total_minutes_of_data >= BATCH_SIZE*2: + bar = ProgressBar(max_value=total_minutes_of_data).start() + + if bar is not None: + time_covered = datetime.fromtimestamp(last_timestamp / 1000) - start_datetime + minutes_covered = int(time_covered.total_seconds()/60) + bar.max_value = max(int((datetime.now() - start_datetime).total_seconds()/60), minutes_covered) + bar.update(minutes_covered) + if bar is not None: + bar.finish(dirty=True) + if in_pycharm: time.sleep(0.2) + return batches + + +def all_candles_to_csv(base, quote, interval='1m'): + """Collect a list of candlestick batches with all candlesticks of a trading pair, + concat into a dataframe and write it to CSV. + """ + filepath = f'{CSV_PATH}/{base}-{quote}.csv' + + last_timestamp, old_lines = get_csv_info(filepath) + new_candle_batches = gather_new_candles(base, quote, last_timestamp, interval) + return write_to_csv(filepath, new_candle_batches, old_lines) + + +def all_candles_to_parquet(base, quote, interval='1m'): + """Collect a list of candlestick batches with all candlesticks of a trading pair, + concat into a dataframe and write it to parquet. + """ + filepath = f'{COMPRESSED_PATH}/{base}-{quote}.parquet' + + last_timestamp, old_lines = get_parquet_info(filepath) + new_candle_batches = gather_new_candles(base, quote, last_timestamp, interval) + return write_to_parquet(filepath, new_candle_batches, base, quote, append=True) - # write clean version of csv to parquet - parquet_name = f'{base}-{quote}.parquet' - full_path = f'compressed/{parquet_name}' + +def get_csv_info(filepath): + """ + Reads and returns the last timestamp and number of candles in a csv file. + """ + last_timestamp = 0 + old_lines = 0 + try: + batches = [pd.read_csv(filepath)] + last_timestamp = batches[-1]['open_time'].max() + old_lines = len(batches[-1].index) + except FileNotFoundError: + pass + return last_timestamp, old_lines + + +def get_parquet_info(filepath): + """ + Reads and returns the last timestamp and number of candles in a parquet file. + """ + last_timestamp = 0 + old_lines = 0 + try: + existing_data = pq.read_pandas(filepath).to_pandas() + if not existing_data.empty: + last_timestamp = int(existing_data.index.max().timestamp()*1000) + old_lines = len(existing_data.index) + except OSError: + pass + return last_timestamp, old_lines + + +def write_to_parquet(file, batches, base, quote, append=False): + """ + Writes a batch of candles data to a parquet file. + """ df = pd.concat(batches, ignore_index=True) df = pp.quick_clean(df) - pp.write_raw_to_parquet(df, full_path) + if append: + pp.append_raw_to_parquet(df, file, SHAVE_OFF_TODAY) + else: + pp.write_raw_to_parquet(df, file, SHAVE_OFF_TODAY) METADATA['data'].append({ - 'description': f'All trade history for the pair {base} and {quote} at 1 minute intervals. Counts {df.index.size} records.', - 'name': parquet_name, - 'totalBytes': os.stat(full_path).st_size, + 'description': f'All trade history for the pair {base} and {quote} at 1 minute intervals. ' + f'Counts {df.index.size} records.', + 'name': f"{base}-{quote}", + 'totalBytes': os.stat(file).st_size, 'columns': [] }) + return len(df.index) - # in the case that new data was gathered write it to disk - if len(batches) > 1: - df.to_csv(f'data/{base}-{quote}.csv', index=False) + +def write_to_csv(file, batches, old_lines): + """ + Writes a batch of candles data to a csv file. + """ + if len(batches) > 0: + df = batches_to_pd(batches) + header = not os.path.isfile(file) + df.to_csv(file, index=False, mode='a', header=header) return len(df.index) - old_lines return 0 +def batches_to_pd(batches): + """ + Converts batches of candle data to a pandas dataframe. + """ + df = pd.concat(batches, ignore_index=True) + return pp.quick_clean(df) + + +def csv_to_parquet(base, quote): + """ + Saves a csv file given by a base and a quote to a parquet file. + """ + csv_filepath = f'{CSV_PATH}/{base}-{quote}.csv' + parquet_filepath = f'{COMPRESSED_PATH}/{base}-{quote}.parquet' + data = [pd.read_csv(csv_filepath)] + write_to_parquet(parquet_filepath, data, base, quote) + +def print_opts(): + print("\n#########################################################################################################") + print(f"Candle request batch size: {BATCH_SIZE}") + print(f"Equalize dataset ends to midnight: {SHAVE_OFF_TODAY}") + print(f"Directly update parquets: {CIRCUMVENT_CSV}") + print(f"Upload parquets to kaggle: {UPLOAD_TO_KAGGLE}") + print(f"Skip update of delisted pairs: {SKIP_DELISTED}") + print(f"Parquet files path: {COMPRESSED_PATH}") + print(f"CSV files path: {CSV_PATH}") + print("#########################################################################################################\n") + def main(): """Main loop; loop over all currency pairs that exist on the exchange. Once done upload the compressed (Parquet) dataset to Kaggle. """ - + print_opts() # get all pairs currently available all_symbols = pd.DataFrame(requests.get(f'{API_BASE}exchangeInfo').json()['symbols']) - all_pairs = [tuple(x) for x in all_symbols[['baseAsset', 'quoteAsset']].to_records(index=False)] + active_symbols = all_symbols.loc[all_symbols["status"] == "TRADING"] + if SKIP_DELISTED: + all_pairs = [tuple(x) for x in active_symbols[['baseAsset', 'quoteAsset']].to_records(index=False)] + n_inactive = len(all_symbols) - len(active_symbols) + print(f'{datetime.now()} Got {len(all_pairs)} active pairs from binance. Dropped {n_inactive} inactive pairs.') + else: + all_pairs = [tuple(x) for x in all_symbols[['baseAsset', 'quoteAsset']].to_records(index=False)] + print(f'{datetime.now()} Got {len(all_pairs)} pairs from binance, of which {len(active_symbols)} are active.') + # randomising order helps during testing and doesn't make any difference in production random.shuffle(all_pairs) # make sure data folders exist - os.makedirs('data', exist_ok=True) - os.makedirs('compressed', exist_ok=True) + os.makedirs(f'{CSV_PATH}', exist_ok=True) + os.makedirs(f'{COMPRESSED_PATH}', exist_ok=True) # do a full update on all pairs n_count = len(all_pairs) for n, pair in enumerate(all_pairs, 1): base, quote = pair - new_lines = all_candles_to_csv(base=base, quote=quote) + print(f'{datetime.now()} {n}/{n_count} Updating {base}-{quote}') + if CIRCUMVENT_CSV: + new_lines = all_candles_to_parquet(base=base, quote=quote) + else: + new_lines = all_candles_to_csv(base=base, quote=quote) if new_lines > 0: print(f'{datetime.now()} {n}/{n_count} Wrote {new_lines} new lines to file for {base}-{quote}') else: print(f'{datetime.now()} {n}/{n_count} Already up to date with {base}-{quote}') - # clean the data folder and upload a new version of the dataset to kaggle - try: - os.remove('compressed/.DS_Store') - except FileNotFoundError: - pass - write_metadata(n_count) - yesterday = date.today() - timedelta(days=1) - subprocess.run(['kaggle', 'datasets', 'version', '-p', 'compressed/', '-m', f'full update of all {n_count} pairs up to {str(yesterday)}']) - os.remove('compressed/dataset-metadata.json') + if UPLOAD_TO_KAGGLE: + # clean the data folder and upload a new version of the dataset to kaggle + try: + os.remove(f'{COMPRESSED_PATH}/.DS_Store') + except FileNotFoundError: + pass + write_metadata(n_count) + yesterday = date.today() - timedelta(days=1) + subprocess.run(['kaggle', 'datasets', 'version', '-p', f'{COMPRESSED_PATH}/', '-m', f'full update of all {n_count} pairs up to {str(yesterday)}']) + os.remove(f'{COMPRESSED_PATH}/dataset-metadata.json') if __name__ == '__main__': diff --git a/preprocessing.py b/preprocessing.py index 7ee3660..0949e8c 100644 --- a/preprocessing.py +++ b/preprocessing.py @@ -1,8 +1,9 @@ import os from datetime import date - +import pyarrow.parquet as pq import pandas as pd + def set_dtypes(df): """ set datetimeindex and convert all columns in pd.df to their proper dtype @@ -74,9 +75,22 @@ def quick_clean(df): return df -def write_raw_to_parquet(df, full_path): - """takes raw df and writes a parquet to disk""" +def append_raw_to_parquet(df, full_path, limit_to_today=True): + """Takes raw df and appends it to an existing parquet file. If the file does not exist, it is created.""" + df = polish_df(df, limit_to_today) + try: + df = pd.concat([pq.read_pandas(full_path).to_pandas(), df]) + except OSError: + pass + df.to_parquet(full_path) + +def write_raw_to_parquet(df, full_path, limit_to_today=True): + """Takes raw df and writes it to an existing parquet file, overwriting existin data. If the file does not exist, + it is created.""" + df = polish_df(df, limit_to_today) + df.to_parquet(full_path) +def polish_df(df, limit_to_today=True): # some candlesticks do not span a full minute # these points are not reliable and thus filtered df = df[~(df['open_time'] - df['close_time'] != -59999)] @@ -87,30 +101,29 @@ def write_raw_to_parquet(df, full_path): df = set_dtypes_compressed(df) # give all pairs the same nice cut-off - df = df[df.index < str(date.today())] - - df.to_parquet(full_path) - + if limit_to_today: + df = df[df.index < str(date.today())] + return df -def groom_data(dirname='data'): +def groom_data(csv_dir): """go through data folder and perform a quick clean on all csv files""" - for filename in os.listdir(dirname): + for filename in os.listdir(csv_dir): if filename.endswith('.csv'): - full_path = f'{dirname}/{filename}' + full_path = csv_dir + f'/{filename}' quick_clean(pd.read_csv(full_path)).to_csv(full_path) -def compress_data(dirname='data'): +def compress_data(csv_dir, parquet_path): """go through data folder and rewrite csv files to parquets""" - os.makedirs('compressed', exist_ok=True) - for filename in os.listdir(dirname): + os.makedirs(parquet_path, exist_ok=True) + for filename in os.listdir(csv_dir): if filename.endswith('.csv'): - full_path = f'{dirname}/{filename}' + full_path = f'{csv_dir}/{filename}' df = pd.read_csv(full_path) new_filename = filename.replace('.csv', '.parquet') - new_full_path = f'compressed/{new_filename}' + new_full_path = parquet_path + f'/{new_filename}' write_raw_to_parquet(df, new_full_path) diff --git a/requirements.txt b/requirements.txt index 04dd54d..91e56e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ requests pandas pyarrow kaggle +progressbar2 \ No newline at end of file