Skip to content

Commit

Permalink
reduce cyclomatic complexity to 10 and enforce max complexity on flak…
Browse files Browse the repository at this point in the history
…e8 (#53)
  • Loading branch information
geo-martino authored Mar 31, 2024
1 parent 7d4d8a0 commit 72cfcaa
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 251 deletions.
3 changes: 1 addition & 2 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ per-file-ignores =
docs/_howto/scripts/*:E402
tests/**/test_*.py:F811
max-line-length = 120
# TODO: reduce this to 10
max-complexity = 17
max-complexity = 10
74 changes: 51 additions & 23 deletions musify/core/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,30 +106,21 @@ def _to_str(cls, attributes: Mapping[str, Any], indent: int = 2, increment: int
elif isinstance(attr_val, (datetime, date)):
attr_val_str = str(attr_val)
elif isinstance(attr_val, (list, tuple, set)) and len(attr_val) > 0:
pp_repr = "[{}]"
if isinstance(attr_val, set):
pp_repr = "{{}}"
elif isinstance(attr_val, tuple):
pp_repr = "({})"

attr_val_str = str(attr_val)

if len(attr_val_str) > max_val_width:
pp_repr = pp_repr.format("\n" + " " * indent + "{}\n" + " " * indent_prev)
attr_val_pp = []
for val in attr_val:
if isinstance(val, PrettyPrinter):
attr_val_pp.append(val.__str__(indent=indent + increment, increment=increment))
else:
attr_val_pp.append(str(val))
attr_val_str = pp_repr.format((",\n" + " " * indent).join(attr_val_pp))
attr_val_str = cls._get_attribute_value_from_collection(
attr_val=attr_val,
max_val_width=max_val_width,
indent=indent,
indent_prev=indent_prev,
increment=increment
)
elif isinstance(attr_val, Mapping) and len(attr_val) > 0:
attr_val_pp = cls._to_str(attr_val, indent=indent, increment=increment)
attr_val_str = "{" + ", ".join(attr_val_pp) + "}"

if len(attr_val_str) > max_val_width:
pp_repr = "\n" + " " * indent + "{}\n" + " " * indent_prev
attr_val_str = "{" + pp_repr.format((",\n" + " " * indent).join(attr_val_pp)) + "}"
attr_val_str = cls._get_attribute_value_from_mapping(
attr_val=attr_val,
max_val_width=max_val_width,
indent=indent,
indent_prev=indent_prev,
increment=increment
)
else:
attr_val_str = repr(attr_val)

Expand All @@ -141,6 +132,43 @@ def _to_str(cls, attributes: Mapping[str, Any], indent: int = 2, increment: int

return attributes_repr

@staticmethod
def _get_attribute_value_from_collection(
attr_val: list | tuple | set, max_val_width: int, indent: int, indent_prev: int, increment: int
) -> str:
pp_repr = "[{}]"
if isinstance(attr_val, set):
pp_repr = "{{}}"
elif isinstance(attr_val, tuple):
pp_repr = "({})"

attr_val_str = str(attr_val)

if len(attr_val_str) > max_val_width:
pp_repr = pp_repr.format("\n" + " " * indent + "{}\n" + " " * indent_prev)
attr_val_pp = []
for val in attr_val:
if isinstance(val, PrettyPrinter):
attr_val_pp.append(val.__str__(indent=indent + increment, increment=increment))
else:
attr_val_pp.append(str(val))
attr_val_str = pp_repr.format((",\n" + " " * indent).join(attr_val_pp))

return attr_val_str

@classmethod
def _get_attribute_value_from_mapping(
cls, attr_val: Mapping, max_val_width: int, indent: int, indent_prev: int, increment: int
) -> str:
attr_val_pp = cls._to_str(attr_val, indent=indent, increment=increment)
attr_val_str = "{" + ", ".join(attr_val_pp) + "}"

if len(attr_val_str) > max_val_width:
pp_repr = "\n" + " " * indent + "{}\n" + " " * indent_prev
attr_val_str = "{" + pp_repr.format((",\n" + " " * indent).join(attr_val_pp)) + "}"

return attr_val_str

def __repr__(self):
return f"{self.__class__.__name__}({repr(self.as_dict())})"

Expand Down
30 changes: 17 additions & 13 deletions musify/libraries/core/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,22 @@ def __getitem__(
if isinstance(__key, int) or isinstance(__key, slice): # simply index the list or items
return self.items[__key]

getters = self.__get_item_getters(__key)
if not getters:
raise MusifyKeyError(f"Unrecognised key type | {__key=} | type={type(__key).__name__}")

caught_exceptions = []
for getter in getters:
try:
return getter.get_item(self)
except (MusifyAttributeError, MusifyKeyError) as ex:
caught_exceptions.append(ex)

raise MusifyKeyError(
f"Key is invalid. The following errors were thrown: {[str(ex) for ex in caught_exceptions]}"
)

def __get_item_getters(self, __key: str) -> list[ItemGetterStrategy]:
getters = []
if isinstance(__key, File):
getters.append(PathGetter(__key.path))
Expand All @@ -316,19 +332,7 @@ def __getitem__(
NameGetter(__key),
])

if not getters:
raise MusifyKeyError(f"Unrecognised key type | {__key=} | type={type(__key).__name__}")

caught_exceptions = []
for getter in getters:
try:
return getter.get_item(self)
except (MusifyAttributeError, MusifyKeyError) as ex:
caught_exceptions.append(ex)

raise MusifyKeyError(
f"Key is invalid. The following errors were thrown: {[str(ex) for ex in caught_exceptions]}"
)
return getters

def __setitem__(self, __key: str | int | T, __value: T):
"""Replace the item at a given ``__key`` with the given ``__value``."""
Expand Down
83 changes: 44 additions & 39 deletions musify/libraries/remote/core/processors/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,53 +408,58 @@ def _match_to_input(self, name: str) -> None:

print("\n" + help_text)
for item in self._remaining.copy():
while item in self._remaining: # while item not matched or skipped
while current_input is not None and item in self._remaining: # while item not matched or skipped
self._log_padded([name, f"{len(self._remaining):>6} remaining items"])
if 'a' not in current_input:
current_input = self._get_user_input(align_string(item.name, max_width=max_width))

if current_input.casefold() == 'h': # print help
print("\n" + help_text)
else:
current_input = self._match_item_to_input(name=name, item=item, current_input=current_input)

elif current_input.casefold() == 's' or current_input.casefold() == 'q': # quit/skip
self._log_padded([name, "Skipping all loops"], pad="<")
self._quit = current_input.casefold() == 'q' or self._quit
self._skip = current_input.casefold() == 's' or self._skip
self._remaining.clear()
return

elif current_input.casefold().replace('a', '') == 'u': # mark item as unavailable
self._log_padded([name, "Marking as unavailable"], pad="<")
item.uri = self.api.wrangler.unavailable_uri_dummy
self._remaining.remove(item)

elif current_input.casefold().replace('a', '') == 'n': # leave item without URI and unprocessed
self._log_padded([name, "Skipping"], pad="<")
item.uri = None
self._remaining.remove(item)

elif current_input.casefold() == 'r': # return to former 'while' loop
self._log_padded([name, "Refreshing playlist metadata and restarting loop"])
return

elif current_input.casefold() == 'p' and hasattr(item, "path"): # print item path
print(f"\33[96m{item.path}\33[0m")

elif self.api.wrangler.validate_id_type(current_input): # update URI and add item to switched list
uri = self.api.wrangler.convert(
current_input, kind=RemoteObjectType.TRACK, type_out=RemoteIDType.URI
)
if current_input is None or not self._remaining:
break

def _match_item_to_input(self, name: str, item: MusifyItemSettable, current_input: str) -> str | None:
if current_input.casefold() == 's' or current_input.casefold() == 'q': # quit/skip
self._log_padded([name, "Skipping all loops"], pad="<")
self._quit = current_input.casefold() == 'q' or self._quit
self._skip = current_input.casefold() == 's' or self._skip
self._remaining.clear()
return

self._log_padded([name, f"Updating URI: {item.uri} -> {uri}"], pad="<")
item.uri = uri
elif current_input.casefold().replace('a', '') == 'u': # mark item as unavailable
self._log_padded([name, "Marking as unavailable"], pad="<")
item.uri = self.api.wrangler.unavailable_uri_dummy
self._remaining.remove(item)

self._switched.append(item)
self._remaining.remove(item)
current_input = ""
elif current_input.casefold().replace('a', '') == 'n': # leave item without URI and unprocessed
self._log_padded([name, "Skipping"], pad="<")
item.uri = None
self._remaining.remove(item)

elif current_input: # invalid input
self.logger.warning("Input not recognised.")
current_input = ""
elif current_input.casefold() == 'r': # return to former 'while' loop
self._log_padded([name, "Refreshing playlist metadata and restarting loop"])
return

if not self._remaining:
break
elif current_input.casefold() == 'p' and hasattr(item, "path"): # print item path
print(f"\33[96m{item.path}\33[0m")

elif self.api.wrangler.validate_id_type(current_input): # update URI and add item to switched list
uri = self.api.wrangler.convert(
current_input, kind=RemoteObjectType.TRACK, type_out=RemoteIDType.URI
)

self._log_padded([name, f"Updating URI: {item.uri} -> {uri}"], pad="<")
item.uri = uri

self._switched.append(item)
self._remaining.remove(item)
current_input = ""

elif current_input: # invalid input
self.logger.warning("Input not recognised.")
current_input = ""

return current_input
84 changes: 50 additions & 34 deletions musify/libraries/remote/spotify/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,9 @@ def _get_item_type(
cls, value: str | Mapping[str, Any] | RemoteResponse, kind: RemoteObjectType | None = None
) -> RemoteObjectType | None:
if isinstance(value, RemoteResponse):
response_kind = cls._get_item_type(value.response)
if value.kind != response_kind:
raise RemoteObjectTypeError(
f"RemoteResponse kind != actual response kind: {value.kind} != {response_kind}"
)
return value.kind
return cls._get_item_type_from_response(value)
if isinstance(value, Mapping):
if value.get("is_local", False):
raise RemoteObjectTypeError("Cannot process local items")
if "type" not in value:
raise RemoteObjectTypeError(f"Given map does not contain a 'type' key: {value}")
return RemoteObjectType.from_name(value["type"].casefold().rstrip('s'))[0]
return cls._get_item_type_from_mapping(value)

value = value.strip()
uri_check = value.split(':')
Expand All @@ -99,6 +90,23 @@ def _get_item_type(
return kind
raise RemoteObjectTypeError(f"Could not determine item type of given value: {value}")

@classmethod
def _get_item_type_from_response(cls, value: RemoteResponse) -> RemoteObjectType:
response_kind = cls._get_item_type_from_mapping(value.response)
if value.kind != response_kind:
raise RemoteObjectTypeError(
f"RemoteResponse kind != actual response kind: {value.kind} != {response_kind}"
)
return value.kind

@classmethod
def _get_item_type_from_mapping(cls, value: Mapping[str, Any]) -> RemoteObjectType:
if value.get("is_local", False):
raise RemoteObjectTypeError("Cannot process local items")
if "type" not in value:
raise RemoteObjectTypeError(f"Given map does not contain a 'type' key: {value}")
return RemoteObjectType.from_name(value["type"].casefold().rstrip('s'))[0]

@classmethod
def convert(
cls,
Expand All @@ -114,34 +122,14 @@ def convert(

value = value.strip()

if type_in == RemoteIDType.URL_EXT or type_in == RemoteIDType.URL: # open/API URL
url_path = urlparse(value).path.split("/")
for chunk in url_path:
try:
kind = RemoteObjectType.from_name(chunk.rstrip('s'))[0]
break
except MusifyEnumError:
continue

if kind == RemoteObjectType.USER:
name = kind.name.lower()
try:
id_ = url_path[url_path.index(name) + 1]
except ValueError:
id_ = url_path[url_path.index(name + "s") + 1]
else:
id_ = next(p for p in url_path if len(p) == RemoteIDType.ID.value)

if type_in == RemoteIDType.URL_EXT or type_in == RemoteIDType.URL:
kind, id_ = cls._get_id_from_url(value=value, kind=kind)
elif type_in == RemoteIDType.URI:
uri_split = value.split(':')
kind = RemoteObjectType.from_name(uri_split[1])[0]
id_ = uri_split[2]

kind, id_ = cls._get_id_from_uri(value=value)
elif type_in == RemoteIDType.ID:
if kind is None:
raise RemoteIDTypeError("Input value is an ID and no defined 'kind' has been given.", RemoteIDType.ID)
id_ = value

else:
raise RemoteIDTypeError(f"Could not determine item type: {value}")

Expand All @@ -156,6 +144,34 @@ def convert(
else:
return id_

@classmethod
def _get_id_from_url(cls, value: str, kind: RemoteObjectType | None = None) -> tuple[RemoteObjectType, str]:
url_path = urlparse(value).path.split("/")
for chunk in url_path:
try:
kind = RemoteObjectType.from_name(chunk.rstrip('s'))[0]
break
except MusifyEnumError:
continue

if kind == RemoteObjectType.USER:
name = kind.name.lower()
try:
id_ = url_path[url_path.index(name) + 1]
except ValueError:
id_ = url_path[url_path.index(name + "s") + 1]
else:
id_ = next(p for p in url_path if len(p) == RemoteIDType.ID.value)

return kind, id_

@classmethod
def _get_id_from_uri(cls, value: str) -> tuple[RemoteObjectType, str]:
uri_split = value.split(':')
kind = RemoteObjectType.from_name(uri_split[1])[0]
id_ = uri_split[2]
return kind, id_

@classmethod
def extract_ids(cls, values: APIInputValue, kind: RemoteObjectType | None = None) -> list[str]:
def extract_id(value: str | Mapping[str, Any] | RemoteResponse) -> str:
Expand Down
Loading

0 comments on commit 72cfcaa

Please sign in to comment.