-
Notifications
You must be signed in to change notification settings - Fork 1
/
export-starknet.py
235 lines (211 loc) · 12.1 KB
/
export-starknet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# import starknet transactions from Voyager
import os, sys, argparse, requests, json, csv, configparser, re
from datetime import datetime,timezone #imported timezone in order to apply rules needed after python version 3.16
def get_stark_domain(domain):
try:
json_page = requests.get("https://api.starknet.id/domain_to_addr?domain=" + domain).json()
if "addr" in json_page:
return json_page["addr"]
except:
raise Exception("no address found for "+domain+" using starknet api")
def check_api_valid(page_data):
if "message" in page_data:
raise Exception('Voyager API not valid, error returned "'+page_data['message']+'".\nRequest a free API key from the team here: https://forms.gle/34RE6d4aiiv16HoW6\nDocs here: https://docs.voyager.online/#overview')
def convert_wei_to_eth(wei_val):
return float(int(wei_val)*0.000000000000000001)
def get_transactions_data():
transactions_url = base_url + "/txns"
payload = {'ps': per_page, 'p': 1}
payload['to'] = wallet_address
json_page = requests.get(transactions_url, headers=headers, params=payload).json()
check_api_valid(json_page) ## check API key is working
num_pages = json_page["lastPage"]
page_data = json_page["items"]
# random strings passed as the address lead to the entire blockchain's index being passed (currently 900,000 ish) so don't save it all
if num_pages == 0 or num_pages > max_pages:
raise Exception("address not found, double check on voyager")
## loop through and add the rest of the items from the other pages
for i in range(2, num_pages + 1):
payload['p'] = i # update page number
json_page = requests.get(transactions_url, headers=headers, params=payload).json()
# append this page data to the first page's
page_data = page_data + json_page["items"]
return page_data
def get_transfers_data(type, token_type):
transfers_url = base_url + "/event-activity"
payload = {'page_size': per_page, 'page': page}
payload[type] = wallet_address
payload["sort"] = "asc" # ascending sort by default, used for ordering next page too
payload["token_type"] = token_type
payload["from_timestamp"] = from_beginning_timestamp
json_page = requests.get(transfers_url, headers=headers, params=payload).json()
check_api_valid(json_page) ## check API key is working, die if not
has_more = json_page["hasMore"]
page_data = json_page["items"]
# random strings passed as the address may lead to the entire blockchain's index being passed (currently 900,000 ish) so don't save it all
# loop through and add the rest of the items from the other pages
while has_more:
payload['last_id'] = page_data[-1]["id"]
json_page = requests.get(transfers_url, headers=headers, params=payload).json()
# append this page data to the first page's
page_data = page_data + json_page["items"]
has_more = json_page["hasMore"]
return page_data
def process_fields(page_data, fields):
# sanitize / tidy up - set blank entries if missing values, replace new lines
for row in page_data:
for field in fields:
if field == "utcTime":
row[field] = datetime.utcfromtimestamp(row["timestamp"]) ## set utc readable date
if field == "in_or_out": # explicitly point out if it's incoming / outgoing txn from wallet
if row["transferFrom"] == wallet_address:
row[field] = "OUT"
elif row["transferTo"] == wallet_address:
row[field] = "IN"
if convert_wei and field == "actualFee":
row[field] = convert_wei_to_eth(row[field])
if field == "transferValues": #transferValues[0] is the quantity of thing
if len(row[field]) > 1:
raise Exception("transfer_amounts has more than one value")
row[field] = (row[field][0])
if convert_wei:
if row["tokenAddress"] == eth_contract: # only convert for the eth contract
row[field] = convert_wei_to_eth(row[field])
if not field in row:
row[field] = ""
if isinstance(row[field], str): # sanitise line breaks in some inputs
row[field] = row[field].replace("\n", "\\n")
def koinly_format(page_data, koinly_fields):
koinly_array = []
for row in page_data:
koinly_datarow = {}
koinly_datarow["Date"] = datetime.utcfromtimestamp(row["timestamp"]).strftime("%Y-%m-%d %H:%M UTC") # needs to be this format: 2018-01-03 14:25 UTC
token_symbol = row["tokenSymbol"] # may be invalid for koinly if not supported
tokeninfo = row["tokenAddress"]
if "tokenName" in row:
tokeninfo = row["tokenName"] or "" + " - " + tokeninfo
if "fromAlias" in row and "toAlias" in row:
tokeninfo = (row["fromAlias"] or "") + (row["toAlias"] or "") + " - " + tokeninfo
# split up LPs and NFTs
if token_symbol in starknet_unsupported_lp_currencies: # check through global list for LP tokens
koinly_datarow["Description"] = row["callName"] + " " + tokeninfo + " (starknet LP #"+ row["transferIds"][0] + ")"
if row["transferIds"][0] not in lp_list:
lp_list.append(row["transferIds"][0])
token_symbol = "LP" + str(initial_lp_counter + lp_list.index(row["transferIds"][0]))
else: # it's an NFT
if token_symbol in starknet_unsupported_nft_currencies:
if row["transferIds"][0] not in nft_list:
nft_list.append(row["transferIds"][0])
token_symbol = "NFT" + str(initial_nft_counter + nft_list.index(row["transferIds"][0]))
koinly_datarow["Description"] = row["callName"] + " " + tokeninfo + " - " + row["transferIds"][0] + " (starknet nft)"
# if download_type == "ERC721": row["tokenSymbol"] = 'NFT' + row['transferIds'][0] + " " + row["tokenSymbol"] # experimenting with this for Koinly integration, but it doesn't like it
if row["in_or_out"] == "OUT":
koinly_datarow["Sent Amount"] = row["transferValues"]
koinly_datarow["Sent Currency"] = token_symbol
koinly_datarow["Received Amount"] = 0
koinly_datarow["Received Currency"] = 0
elif row["in_or_out"] == "IN":
koinly_datarow["Received Amount"] = row["transferValues"]
koinly_datarow["Received Currency"] = token_symbol
koinly_datarow["Sent Amount"] = 0
koinly_datarow["Sent Currency"] = 0
fee = 0
if "actualFee" in row: fee = row["actualFee"]
koinly_datarow["Fee Amount"] = fee
koinly_datarow["Fee Currency"] = "eth"
koinly_datarow["Net Worth Amount"] = ""
koinly_datarow["Net Worth Currency"] = ""
koinly_datarow["Label"] = ""
koinly_datarow["TxHash"] = row["txHash"]
koinly_array.append(koinly_datarow)
# now update indexes
overwrite_count_from_filename(lp_id_tracker_fname, initial_lp_counter + len(lp_list))
overwrite_count_from_filename(nft_id_tracker_fname, initial_nft_counter + len(nft_list))
return koinly_array
def get_count_from_filename(fname):
f = open(fname, "r")
return int(f.read())
def overwrite_count_from_filename(fname, new_count):
f = open(fname, "w")
f.write(str(new_count))
f.close()
# "Date", "Sent Amount", "Sent Currency", "Received Amount", "Received Currency", "Fee Amount", "Fee Currency", "Net Worth Amount", "Net Worth Currency", "Label", "Description", "TxHash"
# init, globals
page = 1
per_page = 100
max_pages = 1000
base_url = "https://api.voyager.online/beta"
from_beginning_timestamp = 1633309200 # Starknet mainnet launch date, 4 Oct 2021
convert_wei = True
lp_id_tracker_fname = "last_used_lp_id.txt"
nft_id_tracker_fname = "last_used_nft_id.txt"
eth_contract = "0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7"
starknet_unsupported_lp_currencies = {"EkuPo", "JEDI-V2-POS"} # we'll iterate over these to see whether to update description
starknet_unsupported_nft_currencies = {"PIXEL BANNERS", "INFAST", "INFCRW", "ID", "BRVSTTNARMR", "STRKJOURSET"} # we'll iterate over these to see whether to update description
initial_lp_counter = get_count_from_filename(lp_id_tracker_fname)
initial_nft_counter = get_count_from_filename(nft_id_tracker_fname)
lp_list = []
nft_list = []
# set defaults
#cli args input
parser=argparse.ArgumentParser(description="add numbers")
parser.add_argument('-w', '--wallet', type=str, required=True)
parser.add_argument('-t', '--type', type=str, choices=['ERC20','ERC721','ERC1155','transactions'], required=True)
parser.add_argument('-a', '--api_key', type=str, required=True)
parser.add_argument('-f','--format', type = str, default="verbose", choices=['verbose', 'standard', 'koinly'], required=False)
#parser.add_argument('-s','--since', type = str, required=False) ## TO DO
args=parser.parse_args()
fname_address = args.wallet
wallet_address = args.wallet
if re.search("stark", wallet_address):
wallet_address = get_stark_domain(wallet_address)
download_type = args.type
format = args.format
# sanity check:
if format == 'koinly' and download_type == 'transactions': raise Exception("Koinly needs to import ERC20 / ERC721 / ERC1155 transfers instead of transactions")
#process to grab date and avoid windows issues when creating files by replacing special characters included on the date
file_created_time = datetime.now(timezone.utc).replace(tzinfo=None).isoformat(sep=" ", timespec="seconds")
time_for_file_name = re.sub(r'[:+,.]',".",file_created_time)
f_name = 'output' + os.sep + download_type + "_" + fname_address + "_" + format + "_" + time_for_file_name + ".csv"
api_key = args.api_key
headers = {
'Accept': 'application/json',
'X-Api-Key': api_key
}
if download_type == 'transactions':
page_data = get_transactions_data()
fields = ["utcTime","status","type","blockNumber","hash","index","l1VerificationHash","classHash","contractAddress","timestamp","actualFee","actions","contractAlias","classAlias"]
else:
# this is token transfers
to_page_data = get_transfers_data("to_address", download_type)
from_page_data = get_transfers_data("from_address", download_type)
page_data = to_page_data + from_page_data
# sort by timestamp
page_data.sort(key = lambda x: x['timestamp'])
fields = ["utcTime", "blockNumber", "callName", "tokenSymbol", "tokenName", "tokenAddress", "txHash", "timestamp", "invocationType", "fromAlias", "toAlias", "transferFrom", "transferTo", "transferValues", "in_or_out"]
# available: 'blockHash', 'blockNumber', 'timestamp', 'tokenAddress', 'tokenName', 'tokenSymbol', 'tokenDecimals', 'txHash', 'callName', 'invocationType', 'eventId', 'data', 'keys', 'id', 'transferFrom', 'transferTo', 'transferDataLen', 'transferValues', 'transferIds', 'selector', 'name', 'nestedName', 'nestedEventNames', 'dataDecoded', {...}, {...}], 'keyDecoded', 'fromAlias', 'toAlias', 'abiVerified'
process_fields(page_data, fields)
if len(page_data) == 0:
raise Exception("no results returned")
#output
if args.format == 'koinly':
koinly_fields = ["Date", "Sent Amount", "Sent Currency", "Received Amount", "Received Currency", "Fee Amount", "Fee Currency", "Net Worth Amount", "Net Worth Currency", "Label", "Description", "TxHash"]
koinly_dataset = koinly_format(page_data, koinly_fields)
with open(f_name, 'w') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(koinly_fields) # header
# Write each data row to the CSV file
for row in koinly_dataset:
writer.writerow([row[field] for field in koinly_fields])
else:
with open(f_name, 'w') as csv_file:
writer = csv.writer(csv_file)
if format == 'standard':
writer.writerow(fields) # header
for row in page_data:
writer.writerow([row[field] for field in fields])
else: # write out verbose
verbose_fields = list(page_data[0].keys()) # redefine to use all fields
writer.writerow(verbose_fields) # header
for row in page_data:
writer.writerow([row[field] for field in verbose_fields])