Skip to content

Commit

Permalink
various bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
geo-martino committed Jun 1, 2024
1 parent eba8fe1 commit 4ba3e0b
Show file tree
Hide file tree
Showing 16 changed files with 121 additions and 76 deletions.
24 changes: 15 additions & 9 deletions docs/_howto/scripts/remote.new-music/p4.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@ async def get_albums(library: RemoteLibrary, start: date, end: date) -> list[Rem
albums = [album for artist in library.artists for album in artist.albums if match_date(album, start, end)]
albums_need_extend = [album for album in albums if len(album.tracks) < album.track_total]

if albums_need_extend:
kind = RemoteObjectType.ALBUM
key = api.collection_item_map[kind]

bar = library.logger.get_synchronous_iterator(albums_need_extend, desc="Getting album tracks", unit="albums")
async with library:
for album in bar:
await api.extend_items(album.response, kind=kind, key=key)
album.refresh(skip_checks=False)
if not albums_need_extend:
return albums

kind = RemoteObjectType.ALBUM
key = api.collection_item_map[kind]

async with library:
await library.logger.get_asynchronous_iterator(
(api.extend_items(album.response, kind=kind, key=key) for album in albums),
desc="Getting album tracks",
unit="albums"
)

for album in albums:
album.refresh(skip_checks=False)

return albums
8 changes: 4 additions & 4 deletions musify/api/cache/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ async def get_responses(self, requests: Collection[RepositoryRequestType[K]]) ->
Get the responses relating to the given ``requests`` from this repository if they exist.
Returns results unordered.
"""
results = await self.logger.get_asynchronous_iterator(map(self.get_response, requests), disable=True)
return list(filter(lambda result: result is not None, results))
bar = self.logger.get_asynchronous_iterator(map(self.get_response, requests), disable=True)
return list(filter(lambda result: result is not None, await bar))

async def save_response(self, response: Collection[K, V] | ClientResponse) -> None:
"""Save the given ``response`` to this repository if a key can be extracted from it. Safely fail if not"""
Expand Down Expand Up @@ -194,8 +194,8 @@ async def delete_responses(self, requests: Collection[RepositoryRequestType[K]])
Delete the given ``requests`` from this repository if they exist.
Returns the number of the given ``requests`` deleted in the repository.
"""
results = await self.logger.get_asynchronous_iterator(map(self.delete_response, requests), disable=True)
return sum(results)
bar = self.logger.get_asynchronous_iterator(map(self.delete_response, requests), disable=True)
return sum(await bar)


class ResponseCache[ST: ResponseRepository](MutableMapping[str, ST], metaclass=ABCMeta):
Expand Down
41 changes: 26 additions & 15 deletions musify/api/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,19 @@ async def request(self, method: str, url: str | URL, **kwargs) -> dict[str, Any]
break

await self._log_response(response=response, method=method, url=url)
await self._handle_bad_response(response=response)
handled = await self._handle_bad_response(response=response)
waited = await self._wait_for_rate_limit_timeout(response=response)

if not waited and (backoff > self.backoff_final or backoff == 0):
if handled or waited:
continue

if backoff > self.backoff_final or backoff == 0:
raise APIError("Max retries exceeded")

if not waited and backoff: # exponential backoff
self.logger.info_extra(f"Request failed: retrying in {backoff} seconds...")
sleep(backoff)
backoff *= self.backoff_factor
# exponential backoff
self.log(method=method, url=url, message=f"Request failed: retrying in {backoff} seconds...")
sleep(backoff)
backoff *= self.backoff_factor

return data

Expand Down Expand Up @@ -257,20 +260,28 @@ async def _log_response(self, response: ClientResponse, method: str, url: str |

async def _handle_bad_response(self, response: ClientResponse) -> bool:
"""Handle bad responses by extracting message and handling status codes that should raise an exception."""
message = (await self._get_json_response(response)).get("error", {}).get("message")
error_message_found = message is not None

if not error_message_found:
error_message = (await self._get_json_response(response)).get("error", {}).get("message")
if error_message is None:
status = HTTPStatus(response.status)
message = f"{status.phrase} | {status.description}"
error_message = f"{status.phrase} | {status.description}"

handled = False

def _log_bad_response(message: str) -> None:
self.logger.debug(f"Status code: {response.status} | {error_message} | {message}")

if 400 <= response.status < 408:
raise APIError(message, response=response)
if response.status == 401:
_log_bad_response("Re-authorising...")
await self.authorise()
handled = True
elif response.status == 429:
self.logger.debug(f"Rate limit hit. Increasing wait time between requests to {self.wait_time}")
self.wait_time += self.wait_increment
_log_bad_response(f"Rate limit hit. Increasing wait time between requests to {self.wait_time}")
handled = True
elif response.status == 400 <= response.status < 408:
raise APIError(error_message, response=response)

return error_message_found
return handled

async def _wait_for_rate_limit_timeout(self, response: ClientResponse) -> bool:
"""Handle rate limits when a 'retry-after' time is included in the response headers."""
Expand Down
9 changes: 6 additions & 3 deletions musify/libraries/local/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,13 @@ async def save_tracks(
"""
async def _save_track(track: T) -> tuple[T, SyncResultTrack]:
return track, await track.save(tags=tags, replace=replace, dry_run=dry_run)
results = await self.logger.get_asynchronous_iterator(
map(_save_track, self.tracks), desc="Updating tracks", unit="tracks"

# WARNING: making this run asynchronously will break tqdm; bar will get stuck after 1-2 ticks
bar = self.logger.get_synchronous_iterator(
self.tracks, desc="Updating tracks", unit="tracks"
)
return {track: result for track, result in results if result.saved or result.updated}
results = dict([await _save_track(track) for track in bar])
return {track: result for track, result in results.items() if result.saved or result.updated}

def log_save_tracks_result(self, results: Mapping[T, SyncResultTrack]) -> None:
"""Log stats from the results of a ``save_tracks`` operation"""
Expand Down
23 changes: 15 additions & 8 deletions musify/libraries/local/library/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,14 @@ async def load_tracks(self) -> None:
f"\33[1;95m >\33[1;97m Extracting metadata and properties for {len(self._track_paths)} tracks \33[0m"
)

self._tracks = await self.logger.get_asynchronous_iterator(
map(self.load_track, self._track_paths),
# WARNING: making this run asynchronously will break tqdm; bar will get stuck after 1-2 ticks
bar = self.logger.get_synchronous_iterator(
self._track_paths,
desc="Loading tracks",
unit="tracks",
total=len(self._track_paths)
)
self._tracks = [await self.load_track(path) for path in bar]

self._log_errors("Could not load the following tracks")
self.logger.debug(f"Load {self.name} tracks: DONE\n")
Expand Down Expand Up @@ -366,13 +368,16 @@ async def load_playlists(self) -> None:
f"\33[1;95m >\33[1;97m Loading playlist data for {len(self._playlist_paths)} playlists \33[0m"
)

playlists = await self.logger.get_asynchronous_iterator(
map(self.load_playlist, self._playlist_paths.values()),
# WARNING: making this run asynchronously will break tqdm; bar will get stuck after 1-2 ticks
bar = self.logger.get_synchronous_iterator(
self._playlist_paths.values(),
desc="Loading tracks",
unit="tracks",
total=len(self._playlist_paths)
)
self._playlists = {pl.name: pl for pl in sorted(playlists, key=lambda x: x.name.casefold())}
self._playlists = {
pl.name: pl for pl in sorted([await self.load_playlist(pl) for pl in bar], key=lambda x: x.name.casefold())
}

self._log_errors("Could not load the following playlists")
self.logger.debug(f"Load {self.name} playlists: DONE\n")
Expand All @@ -399,10 +404,12 @@ async def save_playlists(self, dry_run: bool = True) -> dict[LocalPlaylist, Resu
"""
async def _save_playlist(pl: LocalPlaylist) -> tuple[LocalPlaylist, Result]:
return pl, await pl.save(dry_run=dry_run)
results = await self.logger.get_asynchronous_iterator(
map(_save_playlist, self.playlists.values()), desc="Updating playlists", unit="tracks"

# WARNING: making this run asynchronously will break tqdm; bar will get stuck after 1-2 ticks
bar = self.logger.get_synchronous_iterator(
self.playlists.values(), desc="Updating playlists", unit="tracks"
)
return dict(results)
return dict([await _save_playlist(pl) for pl in bar])

###########################################################################
## Backup/restore
Expand Down
6 changes: 5 additions & 1 deletion musify/libraries/remote/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def print_item(
:param max_width: The maximum width to print names as. Any name lengths longer than this will be truncated.
"""
self.logger.print_message(
f"\t\33[92m{str(i).zfill(len(str(total)))} \33[0m- "
f"\33[92m{str(i).zfill(len(str(total)))} \33[0m- "
f"\33[97m{align_string(name, max_width=max_width)} \33[0m| "
f"\33[91m{str(int(length // 60)).zfill(2)}:{str(round(length % 60)).zfill(2)} \33[0m| "
f"\33[93m{uri} \33[0m- "
Expand Down Expand Up @@ -311,6 +311,7 @@ async def extend_items(
response: MutableMapping[str, Any] | RemoteResponse,
kind: RemoteObjectType | str | None = None,
key: RemoteObjectType | None = None,
leave_bar: bool | None = None,
) -> list[dict[str, Any]]:
"""
Extend the items for a given API ``response``.
Expand All @@ -325,6 +326,9 @@ async def extend_items(
:param response: A remote API JSON response for an items type endpoint.
:param kind: The type of response being extended. Optional, used only for logging.
:param key: The type of response of the child objects.
:param leave_bar: When a progress bar is displayed,
toggle whether this bar should continue to be displayed after the operation is finished.
When None, allow the logger to decide this setting.
:return: API JSON responses for each item
"""
raise NotImplementedError
Expand Down
47 changes: 28 additions & 19 deletions musify/libraries/remote/core/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Mapping[str, Iterable[str]] |
Mapping[str, Iterable[Mapping[str, Any]]]
)
type SyncPlaylistsType = Library | Mapping[str, Collection[MusifyItem]] | Collection[Playlist] | None


class RemoteLibrary[
Expand Down Expand Up @@ -391,8 +392,8 @@ async def _get_playlist(n: str) -> PL | None:

return pl

results: list[PL] = await self.logger.get_asynchronous_iterator(map(_get_playlist, playlists), disable=True)
playlists_remote = {pl.name: pl for pl in results if pl is not None}
bar = self.logger.get_asynchronous_iterator(map(_get_playlist, playlists), disable=True)
playlists_remote: dict[str, PL] = {pl.name: pl for pl in await bar if pl is not None}

for name, uri_list in playlists.items():
playlist = playlists_remote.get(name)
Expand Down Expand Up @@ -430,7 +431,7 @@ def _extract_playlists_from_backup(playlists: RestorePlaylistsType) -> Mapping[s

async def sync(
self,
playlists: Library | Mapping[str, Iterable[MusifyItem]] | Collection[Playlist] | None = None,
playlists: SyncPlaylistsType = None,
kind: Literal["new", "refresh", "sync"] = "new",
reload: bool = True,
dry_run: bool = True
Expand All @@ -456,14 +457,7 @@ async def sync(
"""
self.logger.debug(f"Sync {self.api.source} playlists: START")

if not playlists: # use the playlists as stored in this library object
playlists = self.playlists
elif isinstance(playlists, Library): # get map of playlists from the given library
playlists = playlists.playlists
elif isinstance(playlists, Collection) and all(isinstance(pl, Playlist) for pl in playlists):
# reformat list to map
playlists = {pl.name: pl for pl in playlists}
playlists: Mapping[str, Iterable[MusifyItem]]
playlists = self._extract_playlists_for_sync(playlists)

log_kind = "adding new items only"
if kind != "new":
Expand All @@ -474,26 +468,41 @@ async def sync(
f"{f" and reloading stored playlists" if reload else ""} \33[0m"
)

bar = self.logger.get_synchronous_iterator(
playlists.items(), desc=f"Synchronising {self.api.source}", unit="playlists"
)
results = {}
for name, pl in bar: # synchronise playlists
async def _sync_playlist(name: str, pl: Collection[MusifyItem]) -> tuple[str, SyncResultRemotePlaylist]:
if name not in self.playlists: # new playlist given, create it on remote first
if dry_run:
results[name] = SyncResultRemotePlaylist(
result = SyncResultRemotePlaylist(
start=0, added=len(pl), removed=0, unchanged=0, difference=len(pl), final=len(pl)
)
continue
return name, result

# noinspection PyArgumentList
self.playlists[name] = await self.factory.playlist.create(name=name)
results[name] = await self.playlists[name].sync(items=pl, kind=kind, reload=reload, dry_run=dry_run)

return name, await self.playlists[name].sync(items=pl, kind=kind, reload=reload, dry_run=dry_run)

bar = self.logger.get_asynchronous_iterator(
[_sync_playlist(name, pl) for name, pl in playlists.items()],
desc=f"Synchronising {self.api.source}",
unit="playlists"
)
results = dict(await bar)

self.logger.print()
self.logger.debug(f"Sync {self.api.source} playlists: DONE\n")
return results

def _extract_playlists_for_sync(self, playlists: SyncPlaylistsType) -> Mapping[str, Collection[MusifyItem]]:
if not playlists: # use the playlists as stored in this library object
playlists = self.playlists
elif isinstance(playlists, Library): # get map of playlists from the given library
playlists = playlists.playlists
elif isinstance(playlists, Collection) and all(isinstance(pl, Playlist) for pl in playlists):
# reformat list to map
playlists = {pl.name: pl for pl in playlists}

return playlists

def log_sync(self, results: SyncResultRemotePlaylist | Mapping[str, SyncResultRemotePlaylist]) -> None:
"""Log stats from the results of a ``sync`` operation"""
if not results:
Expand Down
1 change: 1 addition & 0 deletions musify/libraries/remote/core/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def _check_total(self) -> None:
"""
if self._total != len(self.items):
raise RemoteError(
f"{self.name} | "
"The total items available in the response does not equal the total item count for this collection. "
"Make sure the given collection response contains the right number of item responses: "
f"{self._total} != {len(self.items)}"
Expand Down
13 changes: 9 additions & 4 deletions musify/libraries/remote/core/processors/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _log_results(self, results: Mapping[str, ItemSearchResult]) -> None:
matched = len(result.matched)
unmatched = len(result.unmatched)
skipped = len(result.skipped)
total = total_matched + total_unmatched + total_skipped
total = matched + unmatched + skipped

total_matched += matched
total_unmatched += unmatched
Expand Down Expand Up @@ -218,8 +218,12 @@ async def search[T: MusifyItemSettable](
f"Searching for matches on {self.api.source} for {len(collections)} {kind}s\33[0m"
)

bar = self.logger.get_synchronous_iterator(collections, desc="Searching", unit=f"{kind}s")
search_results = {coll.name: await self._search_collection(coll) for coll in bar}
async def _get_result(coll: MusifyCollection[T]) -> tuple[str, ItemSearchResult]:
return coll.name, await self._search_collection(coll)

# WARNING: making this run asynchronously will break tqdm; bar will get stuck after 1-2 ticks
bar = self.logger.get_synchronous_iterator(collections, desc="Searching", unit=f"{kind}s")
search_results = dict([await _get_result(coll) for coll in bar])

self.logger.print()
self._log_results(search_results)
Expand Down Expand Up @@ -304,7 +308,8 @@ async def _search_collection_unit[T: MusifyItemSettable](self, collection: Musif
responses = await self._get_results(collection, kind=kind, settings=search_config)
key = self.api.collection_item_map[kind]
await self.logger.get_asynchronous_iterator(
(self.api.extend_items(response, kind=kind, key=key) for response in responses), disable=True
(self.api.extend_items(response, kind=kind, key=key, leave_bar=False) for response in responses),
disable=True
)

# noinspection PyProtectedMember,PyTypeChecker
Expand Down
4 changes: 2 additions & 2 deletions musify/libraries/remote/spotify/api/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,11 @@ async def _get_result(kind: str, url: str, key: str, batch: bool) -> dict[str, l
return {key: await self._get_items(url=url, id_list=id_list, kind=kind, key=_key, limit=_limit)}

results: list[dict[str, Any]] = []
results_map = await self.logger.get_asynchronous_iterator(
bar = self.logger.get_asynchronous_iterator(
(_get_result(kind=kind, url=url, key=key, batch=batch) for kind, (url, key, batch) in config.items()),
disable=True,
)
for result_map in results_map:
for result_map in await bar:
for key, responses in result_map.items():
responses.sort(key=lambda response: id_list.index(response[self.id_key]))
responses = [{self.id_key: response[self.id_key], key: response} for response in responses]
Expand Down
2 changes: 1 addition & 1 deletion musify/libraries/remote/spotify/api/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def print_collection(
if response.get("offset", 0) == 0: # first page, show header
url_ext = self.wrangler.convert(id_, kind=kind, type_in=RemoteIDType.ID, type_out=RemoteIDType.URL_EXT)
self.logger.print_message(
f"\n\t\33[96mShowing tracks for {kind.name.lower()}\33[0m: "
f"\n\33[96mShowing tracks for {kind.name.lower()}\33[0m: "
f"\33[94m{name} \33[97m- {url_ext} \33[0m\n"
)

Expand Down
2 changes: 1 addition & 1 deletion musify/libraries/remote/spotify/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ async def reload(self, extend_tracks: bool = False, extend_features: bool = Fals
self.__init__(response=response, api=self.api, skip_checks=skip_checks)

def _get_track_uris_from_api_response(self) -> list[str]:
return [track["track"]["uri"] for track in self.response["tracks"]["items"]]
return [track["track"]["uri"] for track in self.response["tracks"].get("items", [])]


class SpotifyAlbum(RemoteAlbum[SpotifyTrack], SpotifyCollectionLoader[SpotifyTrack]):
Expand Down
Loading

0 comments on commit 4ba3e0b

Please sign in to comment.