Skip to content

Commit

Permalink
More robust progress bar.
Browse files Browse the repository at this point in the history
New setting: Optional skipping of inactive pairs.
User options are printed when program runs.
  • Loading branch information
nup002 committed Mar 5, 2021
1 parent 1dcf8ce commit b6f4df9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 22 deletions.
32 changes: 25 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
SHAVE_OFF_TODAY = False # Whether to shave off candles after last midnight to equalize end-time of all datasets.
CIRCUMVENT_CSV = True # Whether to use the parquet files directly when updating data.
UPLOAD_TO_KAGGLE = False # Whether to upload the parquet files to kaggle after updating.
SKIP_DELISTED = True
COMPRESSED_PATH = r'C:\Users\magla\Documents\Datasets\binance_pairs'
CSV_PATH = 'data'

Expand Down Expand Up @@ -154,15 +155,15 @@ def gather_new_candles(base, quote, last_timestamp, interval='1m'):
if in_pycharm: time.sleep(0.2)
first_read = False
if total_minutes_of_data >= BATCH_SIZE*2:
bar = ProgressBar(max_value=total_minutes_of_data)
bar = ProgressBar(max_value=total_minutes_of_data).start()

if bar is not None:
time_covered = datetime.fromtimestamp(last_timestamp / 1000) - start_datetime
minutes_covered = int(time_covered.total_seconds()/60)
bar.max_value = max(int((datetime.now() - start_datetime).total_seconds()/60), minutes_covered)
bar.update(minutes_covered)
if bar is not None:
bar.finish()
bar.finish(dirty=True)
if in_pycharm: time.sleep(0.2)
return batches

Expand Down Expand Up @@ -227,9 +228,9 @@ def write_to_parquet(file, batches, base, quote, append=False):
df = pd.concat(batches, ignore_index=True)
df = pp.quick_clean(df)
if append:
pp.append_raw_to_parquet(df, file)
pp.append_raw_to_parquet(df, file, SHAVE_OFF_TODAY)
else:
pp.write_raw_to_parquet(df, file)
pp.write_raw_to_parquet(df, file, SHAVE_OFF_TODAY)
METADATA['data'].append({
'description': f'All trade history for the pair {base} and {quote} at 1 minute intervals. '
f'Counts {df.index.size} records.',
Expand Down Expand Up @@ -269,16 +270,33 @@ def csv_to_parquet(base, quote):
data = [pd.read_csv(csv_filepath)]
write_to_parquet(parquet_filepath, data, base, quote)

def print_opts():
print("\n#########################################################################################################")
print(f"Candle request batch size: {BATCH_SIZE}")
print(f"Equalize dataset ends to midnight: {SHAVE_OFF_TODAY}")
print(f"Directly update parquets: {CIRCUMVENT_CSV}")
print(f"Upload parquets to kaggle: {UPLOAD_TO_KAGGLE}")
print(f"Skip update of delisted pairs: {SKIP_DELISTED}")
print(f"Parquet files path: {COMPRESSED_PATH}")
print(f"CSV files path: {CSV_PATH}")
print("#########################################################################################################\n")

def main():
"""Main loop; loop over all currency pairs that exist on the exchange. Once done upload the
compressed (Parquet) dataset to Kaggle.
"""

print_opts()
# get all pairs currently available
all_symbols = pd.DataFrame(requests.get(f'{API_BASE}exchangeInfo').json()['symbols'])
all_pairs = [tuple(x) for x in all_symbols[['baseAsset', 'quoteAsset']].to_records(index=False)]
print(f'{datetime.now()} Got {len(all_pairs)} pairs from binance.')
active_symbols = all_symbols.loc[all_symbols["status"] == "TRADING"]
if SKIP_DELISTED:
all_pairs = [tuple(x) for x in active_symbols[['baseAsset', 'quoteAsset']].to_records(index=False)]
n_inactive = len(all_symbols) - len(active_symbols)
print(f'{datetime.now()} Got {len(all_pairs)} active pairs from binance. Dropped {n_inactive} inactive pairs.')
else:
all_pairs = [tuple(x) for x in all_symbols[['baseAsset', 'quoteAsset']].to_records(index=False)]
print(f'{datetime.now()} Got {len(all_pairs)} pairs from binance, of which {len(active_symbols)} are active.')


# randomising order helps during testing and doesn't make any difference in production
random.shuffle(all_pairs)
Expand Down
29 changes: 14 additions & 15 deletions preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import date
import pyarrow.parquet as pq
import pandas as pd
from main import SHAVE_OFF_TODAY, CSV_PATH, COMPRESSED_PATH


def set_dtypes(df):
Expand Down Expand Up @@ -76,22 +75,22 @@ def quick_clean(df):
return df


def append_raw_to_parquet(df, full_path):
def append_raw_to_parquet(df, full_path, limit_to_today=True):
"""Takes raw df and appends it to an existing parquet file. If the file does not exist, it is created."""
df = polish_df(df)
df = polish_df(df, limit_to_today)
try:
df = pd.concat([pq.read_pandas(full_path).to_pandas(), df])
except OSError:
pass
df.to_parquet(full_path)

def write_raw_to_parquet(df, full_path):
def write_raw_to_parquet(df, full_path, limit_to_today=True):
"""Takes raw df and writes it to an existing parquet file, overwriting existin data. If the file does not exist,
it is created."""
df = polish_df(df)
df = polish_df(df, limit_to_today)
df.to_parquet(full_path)

def polish_df(df):
def polish_df(df, limit_to_today=True):
# some candlesticks do not span a full minute
# these points are not reliable and thus filtered
df = df[~(df['open_time'] - df['close_time'] != -59999)]
Expand All @@ -102,29 +101,29 @@ def polish_df(df):
df = set_dtypes_compressed(df)

# give all pairs the same nice cut-off
if SHAVE_OFF_TODAY:
if limit_to_today:
df = df[df.index < str(date.today())]
return df

def groom_data(dirname='data'):
def groom_data(csv_dir):
"""go through data folder and perform a quick clean on all csv files"""

for filename in os.listdir(dirname):
for filename in os.listdir(csv_dir):
if filename.endswith('.csv'):
full_path = f'{dirname}/{filename}'
full_path = csv_dir + f'/{filename}'
quick_clean(pd.read_csv(full_path)).to_csv(full_path)


def compress_data(dirname=f'{CSV_PATH}'):
def compress_data(csv_dir, parquet_path):
"""go through data folder and rewrite csv files to parquets"""

os.makedirs(f'{COMPRESSED_PATH}', exist_ok=True)
for filename in os.listdir(dirname):
os.makedirs(parquet_path, exist_ok=True)
for filename in os.listdir(csv_dir):
if filename.endswith('.csv'):
full_path = f'{dirname}/{filename}'
full_path = f'{csv_dir}/{filename}'

df = pd.read_csv(full_path)

new_filename = filename.replace('.csv', '.parquet')
new_full_path = f'{COMPRESSED_PATH}/{new_filename}'
new_full_path = parquet_path + f'/{new_filename}'
write_raw_to_parquet(df, new_full_path)

0 comments on commit b6f4df9

Please sign in to comment.