Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Big query script #72

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions data_collection_scripts/big_query_balance_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
data_collection_scripts directory of the project under the name 'google-service-account-key-0.json'. Any additional
keys should be named 'google-service-account-key-1.json', 'google-service-account-key-2.json', etc.
"""
import json
import google.cloud.bigquery as bq
import csv
from yaml import safe_load
import logging
import argparse
import tokenomics_decentralization.helper as hlp
from datetime import datetime


def collect_data(ledgers, snapshot_dates, force_query):
def collect_data(ledger_snapshot_dates, force_query):
input_dir = hlp.get_input_directories()[0]
root_dir = hlp.ROOT_DIR
if not input_dir.is_dir():
Expand All @@ -28,15 +30,17 @@ def collect_data(ledgers, snapshot_dates, force_query):

i = 0
all_quota_exceeded = False
ledger_last_updates = dict.fromkeys(ledger_snapshot_dates.keys())

for ledger in ledgers:
for ledger, snapshot_dates in ledger_snapshot_dates.items():
for date in snapshot_dates:
if all_quota_exceeded:
break
return ledger_last_updates
file = input_dir / f'{ledger}_{date}_raw_data.csv'
if not force_query and file.is_file():
logging.info(f'{ledger} data for {date} already exists locally. '
f'For querying {ledger} anyway please run the script using the flag --force-query')
ledger_last_updates[ledger] = date
continue
logging.info(f"Querying {ledger} at snapshot {date}..")

Expand All @@ -49,7 +53,7 @@ def collect_data(ledgers, snapshot_dates, force_query):
except FileNotFoundError:
logging.info(f'Exhausted all {i} service account keys. Aborting..')
all_quota_exceeded = True
break
return ledger_last_updates
query_job = client.query(query)
try:
rows = query_job.result()
Expand All @@ -72,24 +76,73 @@ def collect_data(ledgers, snapshot_dates, force_query):
writer.writerow([field.name for field in rows.schema])
writer.writerows(rows)
logging.info(f'Done writing {ledger} data to file.\n')
ledger_last_updates[ledger] = date
return ledger_last_updates


def get_from_dates(granularity):
"""
Get the dates from which to start querying for each ledger, which corresponds to the last updated date + the granularity
(e.g. the month following the last update if granularity is 'month').
:param granularity: The granularity of the data collection. Can be 'day', 'week', 'month', or 'year'.
:return: A dictionary with ledgers as keys and the corresponding start dates (or None if no date is set) as values.
"""
with open(hlp.ROOT_DIR / "data_collection_scripts/last_update.json") as f:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file doesn't exist, this will throw an error. If that's intended OK, otherwise I suggest you create a separate function which checks if the files exists and, if not, initializes it somehow (eg. reading the files in the input folder).

last_update = json.load(f)
last_update = last_update[granularity]
from_dates = {}
for ledger in last_update:
ledger_from_date = last_update[ledger]
if ledger_from_date is None:
from_dates[ledger] = None
else:
from_dates[ledger] = hlp.increment_date(date=hlp.get_date_beginning(last_update[ledger]), by=granularity)
return from_dates


def update_last_update(ledger_last_updates):
"""
Update the last_update.json file with the last date for which data was collected for each ledger.
:param ledger_last_updates: A dictionary with the ledgers for which data was collected and the last date for which data was collected for each of them.
"""
filepath = hlp.ROOT_DIR / "data_collection_scripts/last_update.json"
with open(filepath) as f:
last_update = json.load(f)
for ledger, date in ledger_last_updates.items():
if date is not None:
last_update[ledger] = date
with open(filepath, 'w') as f:
json.dump(last_update, f)


if __name__ == '__main__':
logging.basicConfig(format='[%(asctime)s] %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', level=logging.INFO)

default_ledgers = hlp.get_ledgers()
default_snapshot_dates = hlp.get_snapshot_dates()

parser = argparse.ArgumentParser()
parser.add_argument('--ledgers', nargs="*", type=str.lower, default=default_ledgers,
choices=[ledger for ledger in default_ledgers], help='The ledgers to collect data for.')
parser.add_argument('--snapshot_dates', nargs="*", type=hlp.valid_date, default=default_snapshot_dates,
help='The dates to collect data for.')
parser.add_argument('--to_date', type=hlp.valid_date,
default=datetime.today().strftime('%Y-%m-%d'),
help='The date until which to get data for (YYYY-MM-DD format). Defaults to today.')
parser.add_argument('--force-query', action='store_true',
help='Flag to specify whether to query for project data regardless if the relevant data '
'already exist.')
args = parser.parse_args()

snapshot_dates = [hlp.get_date_string_from_date(hlp.get_date_beginning(date)) for date in args.snapshot_dates]

collect_data(ledgers=args.ledgers, snapshot_dates=snapshot_dates, force_query=args.force_query)
to_date = hlp.get_date_beginning(args.to_date)
ledgers = args.ledgers
granularity = hlp.get_granularity()
if granularity is None:
# if no granularity is set, only the given snapshot date is queried
ledger_snapshot_dates = {ledger: [hlp.get_date_string_from_date(to_date)] for ledger in ledgers}
else:
default_from_date = hlp.get_date_beginning(hlp.get_snapshot_dates()[0])
ledger_from_dates = get_from_dates(granularity=granularity)
ledger_snapshot_dates = dict()
for ledger in ledgers:
from_date = ledger_from_dates[ledger] if ledger in ledger_from_dates and ledger_from_dates[ledger] is not None else default_from_date
ledger_snapshot_dates[ledger] = hlp.get_dates_between(from_date, to_date, granularity)
ledger_last_updates = collect_data(ledger_snapshot_dates=ledger_snapshot_dates, force_query=args.force_query)
update_last_update(ledger_last_updates=ledger_last_updates)
38 changes: 38 additions & 0 deletions data_collection_scripts/last_update.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"day": {
"bitcoin": null,
"bitcoin_cash": null,
"cardano": null,
"dogecoin": null,
"ethereum": null,
"litecoin": null,
"tezos": null
},
"week": {
"bitcoin": null,
"bitcoin_cash": null,
"cardano": null,
"dogecoin": null,
"ethereum": null,
"litecoin": null,
"tezos": null
},
"month": {
"bitcoin": "2023-11-01",
"bitcoin_cash": "2023-11-01",
"cardano": "2023-03-01",
"dogecoin": "2023-11-01",
"ethereum": "2023-08-01",
"litecoin": "2023-11-01",
"tezos": "2023-11-01"
},
"year": {
"bitcoin": null,
"bitcoin_cash": null,
"cardano": null,
"dogecoin": null,
"ethereum": null,
"litecoin": null,
"tezos": null
}
}
20 changes: 20 additions & 0 deletions tokenomics_decentralization/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ def get_date_string_from_date(date_object):
return date_object.strftime('%Y-%m-%d')


def increment_date(date, by):
"""
Increments a date by a given time period
:param date: a datetime.date object
:param by: a string in ['day', 'week', 'month', 'year']
:returns: a datetime.date object that corresponds to the date incremented by the number of days that correspond to the given granularity
:raises ValueError: if the granularity is not one of the allowed values
"""
if by == 'day':
return date + datetime.timedelta(days=1)
elif by == 'week':
return date + datetime.timedelta(weeks=1)
elif by == 'month':
return date + datetime.timedelta(days=calendar.monthrange(date.year, date.month)[1])
elif by == 'year':
return datetime.date(date.year + 1, date.month, date.day)
else:
raise ValueError(f'Invalid granularity: {by}')


def get_output_directories():
"""
Reads the config file and retrieves the output directories
Expand Down
Loading