-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscraper.py
214 lines (179 loc) · 6.98 KB
/
scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# partially adapted from Matt Lewis's scraper
import requests
import sys
import collections
import dataset
from datetime import datetime
class Scraper:
def __init__(self, username):
self.USER = username
self.API_KEY = '2bf228a2ce0167b5b857dd53ea6f39c1'
self.ROOT_URL = 'https://ws.audioscrobbler.com/2.0/'
self.PER_PAGE = 200 # max allowed by last.fm
self.RECENT_URL = self.ROOT_URL + '/?method=user.getrecenttracks&user=' + username
self.RECENT_URL += '&api_key=' + self.API_KEY + '&format=json&page=%s&limit=%s'
self.ARTIST_URL = self.ROOT_URL + '/?method=artist.getinfo&api_key=' + self.API_KEY + '&artist=%s&format=json'
# flattens the JSON into a uniform depth (i.e. a series of keys and non-nested values)
# modified from http://stackoverflow.com/a/6027615/254187 to strip pound symbols
def flatten(self, d, parent_key=''):
items = []
for k, v in d.items():
new_key = parent_key + '_' + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(self.flatten(v, new_key).items())
else:
new_key = new_key.replace('#', '')
items.append((new_key, v))
return dict(items)
# removes all matching properties in the `props` list from the json object
def clean(self, json, props):
for prop in props:
if prop in json:
del json[prop]
return json
# processes scrobble for SQL insertion
def process_scrobble(self, scrobble):
# removes unnecessary attributes
props = ['image', 'streamable', 'url', '@attr', 'mbid']
scrobble = self.clean(scrobble, props)
# flattens the track JSON
flattened = self.flatten(scrobble)
for key, val in flattened.items():
if val == '':
flattened[key] = None
del flattened['artist_mbid']
del flattened['album_mbid']
flattened['artist'] = flattened.pop('artist_text')
flattened['track'] = flattened.pop('name')
flattened['album'] = flattened.pop('album_text')
if 'date_uts' in flattened:
flattened['timestamp'] = max(int(flattened.pop('date_uts')), 86400)
flattened['timestamp_text'] = flattened.pop('date_text')
dt = datetime.fromtimestamp(flattened['timestamp'])
flattened['play_year'] = dt.year
flattened['play_month'] = dt.month
flattened['play_date'] = dt.day
flattened['play_hour'] = dt.hour
else:
flattened = None
return flattened
# processes artist for SQL insertion
def process_artist(self, artist):
# removes unnecessary attributes
props = ['ontour', 'image', 'streamable', 'url', 'bio', 'mbid']
artist = self.clean(artist, props)
# compresses related artists to a single value
related = None
if 'similar' in artist:
related = []
for relatedArtist in artist['similar']['artist']:
relatedArtist = self.process_artist(relatedArtist)
related.append(relatedArtist['name'].replace(',', ''))
del artist['similar']
# compresses artist tags to a single value
tags = None
if 'tags' in artist:
tags = []
for tag in artist['tags']['tag']:
tags.append(tag['name'])
del artist['tags']
# converts all artist stats to ints
if 'stats' in artist:
for stat in artist['stats']:
artist['stats'][stat] = int(artist['stats'][stat])
# flattens the track JSON
flattened = flatten(artist)
for key, val in flattened.items():
if val == '':
flattened[key] = None
if related is not None:
flattened['related'] = ", ".join(related)
if tags is not None:
flattened['tags'] = ", ".join(tags)
return flattened
def get_all_scrobbles(self):
# gets total num of pages in last.fm user history
resp = requests.get(self.RECENT_URL % (1, self.PER_PAGE)).json()
try:
TOTAL_PAGES = int(resp['recenttracks']['@attr']['totalPages'])
except KeyError:
print("[ERROR] Invalid last.fm username.")
quit()
# adds all scrobbles to a list
for page in reversed(range(1, TOTAL_PAGES + 1)):
scrobbles = requests.get(self.RECENT_URL % (page, self.PER_PAGE)).json()['recenttracks']['track']
sys.stdout.write("\r" + (' ' * 60))
sys.stdout.write("\rRetrieving scrobble history...\t%d of %d\tfor %s." % (TOTAL_PAGES - page + 1, TOTAL_PAGES, self.USER))
sys.stdout.flush()
self.insert_scrobbles(reversed(scrobbles))
print("\rRetrieved scrobble history.")
def update_scrobbles(self):
with dataset.connect('sqlite:///last-fm.db') as db:
rts = db.query('SELECT MAX(timestamp) as recent from %s' % self.USER).next()['recent']
# gets total num of pages in last.fm user history
UPDATE_URL = self.RECENT_URL + ('&from=%d' % (rts + 1))
resp = requests.get(UPDATE_URL % (1, self.PER_PAGE)).json()
try:
TOTAL_PAGES = int(resp['recenttracks']['@attr']['totalPages'])
except KeyError:
print("[ERROR] Invalid last.fm username.")
quit()
inserted = 0
for page in reversed(range(1, TOTAL_PAGES + 1)):
scrobbles = requests.get(UPDATE_URL % (page, self.PER_PAGE)).json()['recenttracks']['track']
inserted += self.insert_scrobbles(reversed(scrobbles))
sys.stdout.write("\r" + (' ' * 60))
sys.stdout.write("\rUpdated scrobble history with %d new scrobble(s) for %s." % (inserted, self.USER))
sys.stdout.flush()
if inserted != 0:
print()
def insert_scrobbles(self, scrobbles):
inserted = 0
# iterates through, processes, and inserts each scrobble
with dataset.connect('sqlite:///last-fm.db') as db:
for scrobble in scrobbles:
processed = self.process_scrobble(scrobble)
if processed is not None:
db[self.USER].insert(processed)
inserted += 1
return inserted
def artists(self):
# grabs and inserts info for all distinct scrobbled artists
with dataset.connect('sqlite:///last-fm.db') as db:
errors = []
result = db['scrobbles'].distinct('artist')
sql = 'SELECT COUNT(DISTINCT artist) AS count FROM scrobbles'
totalArtists = int(db.query(sql).next()['count'])
for index, row in enumerate(result):
artist = requests.get(ARTIST_URL % row['artist']).json()
sys.stdout.write("\rRetrieving artist info...\t%s of %s" % (str(index), str(totalArtists)))
sys.stdout.flush()
try:
processed = self.process_artist(artist['artist'])
db['artists'].insert(processed)
except KeyError:
errors.append(row['artist'])
sys.stdout.write("\rRetrieving artist info...\t{0} of {0}".format(str(totalArtists)))
sys.stdout.flush()
print("\rRetrieved artist info.")
if errors:
print("\nThe following artists could not be located within the last.fm database: \n " + "\n ".join(errors))
if __name__ == '__main__':
if len(sys.argv) == 1:
print("[ERROR] No last.fm username specified.")
quit()
if sys.argv[1] == '--all':
with dataset.connect('sqlite:///last-fm.db') as db:
sql = 'SELECT name as username FROM sqlite_master WHERE type=\'table\''
usernames = [row['username'] for row in db.query(sql)]
else:
usernames = sys.argv[1:]
for username in usernames:
scr = Scraper(username)
with dataset.connect('sqlite:///last-fm.db') as db:
sql = 'SELECT COUNT(name) as count FROM sqlite_master WHERE type=\'table\' AND name=\'%s\'' % username
exists = int(db.query(sql).next()['count'])
if exists:
scr.update_scrobbles()
else:
scr.get_all_scrobbles()