diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5fb16d1..4380fad 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,15 +7,15 @@ repos: - id: trailing-whitespace - id: end-of-file-fixer - repo: https://github.com/psf/black - rev: "23.7.0" + rev: "24.2.0" hooks: - id: black - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.282 + rev: "0.3.0" hooks: - id: ruff - repo: https://github.com/RobertCraigie/pyright-python - rev: v1.1.320 + rev: v1.1.352 hooks: - id: pyright name: pyright (system) diff --git a/.vscode/settings.json b/.vscode/settings.json index 7276606..9d9b8bd 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,8 +3,9 @@ "editor.defaultFormatter": "ms-python.black-formatter", "editor.formatOnSave": true, "editor.codeActionsOnSave": { - "source.organizeImports": true + "source.organizeImports": "explicit" }, }, - "python.analysis.typeCheckingMode": "strict", + "python.analysis.typeCheckingMode": "basic", + "editor.rulers": [88], } \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 2e8389a..bcbbe6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ classifiers = [ ] dependencies = [ "requests>=2.27.0,<2.28", - "zimscraperlib>=1.6.0,<1.7", + "zimscraperlib==3.3.1", "kiwixstorage>=0.8.2,<0.9", "Jinja2>=3.1.2,<3.2", "backoff>=2.0.1,<2.1", @@ -33,11 +33,11 @@ scripts = [ "invoke==2.2.0", ] lint = [ - "black==23.7.0", - "ruff==0.0.282", + "black==24.2.0", + "ruff==0.3.0", ] check = [ - "pyright==1.1.320", + "pyright==1.1.352", ] test = [ "pytest==7.4.0", @@ -111,6 +111,8 @@ target-version = ['py38'] target-version = "py38" line-length = 88 src = ["src"] + +[tool.ruff.lint] select = [ "A", # flake8-builtins # "ANN", # flake8-annotations @@ -187,17 +189,17 @@ unfixable = [ "F401", ] -[tool.ruff.isort] +[tool.ruff.lint.isort] known-first-party = ["ifixit2zim"] -[tool.ruff.flake8-bugbear] +[tool.ruff.lint.flake8-bugbear] # add exceptions to B008 for fastapi. extend-immutable-calls = ["fastapi.Depends", "fastapi.Query"] -[tool.ruff.flake8-tidy-imports] +[tool.ruff.lint.flake8-tidy-imports] ban-relative-imports = "all" -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] # Tests can use magic values, assertions, and relative imports "tests/**/*" = ["PLR2004", "S101", "TID252"] diff --git a/src/ifixit2zim/__main__.py b/src/ifixit2zim/__main__.py index cab22e9..e38e096 100644 --- a/src/ifixit2zim/__main__.py +++ b/src/ifixit2zim/__main__.py @@ -1,15 +1,4 @@ -import pathlib -import sys - - -def main(): - # allows running it from source using python ifixit2zim - sys.path = [str(pathlib.Path(__file__).parent.parent.resolve()), *sys.path] - - from ifixit2zim.entrypoint import main as entry - - entry() - +from ifixit2zim.entrypoint import main if __name__ == "__main__": main() diff --git a/src/ifixit2zim/constants.py b/src/ifixit2zim/constants.py index 0fcc6ee..56ef485 100644 --- a/src/ifixit2zim/constants.py +++ b/src/ifixit2zim/constants.py @@ -1,7 +1,7 @@ import pathlib import tempfile import urllib.parse -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import List, Optional, Set from zimscraperlib.i18n import get_language_details @@ -252,7 +252,7 @@ "disassembly_guides": "분해 안내서", "tools": "도구", "parts": "부품", - "tools_introduction": ("해당 기기를 고치는데 사용하는 일반 도구들 입니다. 매 단계에 모든 도구를 사용하지는 않습니다."), + "tools_introduction": "해당 기기를 고치는데 사용하는 일반 도구들 입니다. 매 단계에 모든 도구를 사용하지는 않습니다.", # noqa E501 }, "zh": { "author": "作者: ", @@ -268,7 +268,7 @@ "disassembly_guides": "拆卸指南", "tools": "工具", "parts": "配件", - "tools_introduction": ("这是用于在这个设备上工作的一些常用工具。你可能不需要在每个过程中使用到每个工具。"), + "tools_introduction": "这是用于在这个设备上工作的一些常用工具。你可能不需要在每个过程中使用到每个工具。", # noqa E501 }, "ru": { "author": "Автор: ", @@ -324,7 +324,7 @@ "disassembly_guides": "分解ガイド", "tools": "ツール", "parts": "パーツ", - "tools_introduction": ("以前、このデバイスの修理に使われていた一般的な工具です。修理過程において全部の工具が必要とは限りません。"), + "tools_introduction": "以前、このデバイスの修理に使われていた一般的な工具です。修理過程において全部の工具が必要とは限りません。", # noqa E501 }, "tr": { "author": "Yazar: ", @@ -803,61 +803,63 @@ @dataclass -class Conf: - required = [ +class Configuration: + fpath: pathlib.Path + + # zim params + name: str + title: str + description: str + long_description: Optional[str] + author: str + publisher: str + fname: str + tag: List[str] + + # filesystem + _output_dir: str # TODO: rename output_name + _tmp_dir: str # IDEM + output_dir: pathlib.Path # TODO: rename output_path + tmp_dir: pathlib.Path # IDEM + + required = ( "lang_code", "output_dir", - ] + ) - lang_code: str = "" - language: dict = field(default_factory=dict) - main_url: str = "" - - # zim params - name: str = "" - title: Optional[str] = "" - description: Optional[str] = "" - author: Optional[str] = "" - publisher: Optional[str] = "" - fname: Optional[str] = "" - tag: List[str] = field(default_factory=list) + lang_code: str + language: dict + main_url: urllib.parse.ParseResult # customization - icon: Optional[str] = "" - categories: Set[str] = field(default_factory=set) - no_category: Optional[bool] = False - guides: Set[str] = field(default_factory=set) - no_guide: Optional[bool] = False - infos: Set[str] = field(default_factory=set) - no_info: Optional[bool] = False - users: Set[str] = field(default_factory=set) - no_user: Optional[bool] = False - no_cleanup: Optional[bool] = False - - # filesystem - _output_dir: Optional[str] = "." - _tmp_dir: Optional[str] = "." - output_dir: Optional[pathlib.Path] = None - tmp_dir: Optional[pathlib.Path] = None + icon: str + categories: Set[str] + no_category: bool + guides: Set[str] + no_guide: bool + infos: Set[str] + no_info: bool + users: Set[str] + no_user: bool + no_cleanup: bool # performances - nb_threads: Optional[int] = -1 - s3_url_with_credentials: Optional[str] = "" + s3_url_with_credentials: Optional[str] # error handling - max_missing_items_percent: Optional[int] = 0 - max_error_items_percent: Optional[int] = 0 + max_missing_items_percent: int + max_error_items_percent: int # debug/devel - build_dir_is_tmp_dir: Optional[bool] = False - keep_build_dir: Optional[bool] = False - scrape_only_first_items: Optional[bool] = False - debug: Optional[bool] = False - delay: Optional[float] = 0 - api_delay: Optional[float] = 0 - cdn_delay: Optional[float] = 0 - stats_filename: Optional[str] = None - skip_checks: Optional[bool] = False + build_dir_is_tmp_dir: bool + keep_build_dir: bool + scrape_only_first_items: bool + debug: bool + delay: float + api_delay: float + cdn_delay: float + stats_filename: Optional[str] + skip_checks: bool @staticmethod def get_url(lang_code: str) -> urllib.parse.ParseResult: @@ -869,14 +871,14 @@ def domain(self) -> str: @property def api_url(self) -> str: - return self.main_url + API_PREFIX + return self.main_url.geturl() + API_PREFIX @property - def s3_url(self) -> str: + def s3_url(self) -> Optional[str]: return self.s3_url_with_credentials def __post_init__(self): - self.main_url = Conf.get_url(self.lang_code) + self.main_url = Configuration.get_url(self.lang_code) self.language = get_language_details(self.lang_code) self.output_dir = pathlib.Path(self._output_dir).expanduser().resolve() self.output_dir.mkdir(parents=True, exist_ok=True) @@ -890,9 +892,10 @@ def __post_init__(self): tempfile.mkdtemp(prefix=f"ifixit_{self.lang_code}_", dir=self.tmp_dir) ) + self.stats_path = None if self.stats_filename: - self.stats_filename = pathlib.Path(self.stats_filename).expanduser() - self.stats_filename.parent.mkdir(parents=True, exist_ok=True) + self.stats_path = pathlib.Path(self.stats_filename).expanduser() + self.stats_path.parent.mkdir(parents=True, exist_ok=True) # support semi-colon separated tags as well if self.tag: diff --git a/src/ifixit2zim/entrypoint.py b/src/ifixit2zim/entrypoint.py index 6389c3b..60b617a 100755 --- a/src/ifixit2zim/entrypoint.py +++ b/src/ifixit2zim/entrypoint.py @@ -5,7 +5,7 @@ import sys from ifixit2zim.constants import NAME, SCRAPER, URLS -from ifixit2zim.shared import Global, logger +from ifixit2zim.shared import logger, set_debug def main(): @@ -37,13 +37,19 @@ def main(): parser.add_argument( "--title", - help="Custom title for your ZIM. iFixit homepage title otherwise", + help="Custom title for your ZIM (30 chars max).", ) parser.add_argument( "--description", - help="Custom description for your ZIM. " - "iFixit homepage description (meta) otherwise", + help="Custom description for your ZIM (80 chars max). " + "Based on iFixit homepage description (meta) otherwise", + ) + + parser.add_argument( + "--long-description", + help="Custom long description for your ZIM (4000 chars max). " + "Based on iFixit homepage description (meta) otherwise", ) parser.add_argument( @@ -55,11 +61,13 @@ def main(): "--creator", help="Name of content creator. “iFixit” otherwise", dest="author", + default="iFixit", ) parser.add_argument( "--publisher", help="Custom publisher name (ZIM metadata). “openZIM” otherwise", + default="openZIM", ) parser.add_argument( @@ -87,6 +95,7 @@ def main(): "--debug", help="Enable verbose output", action="store_true", + dest="debug", default=False, ) @@ -257,18 +266,18 @@ def main(): ) args = parser.parse_args() - Global.set_debug(args.debug) + set_debug(args.debug) - from ifixit2zim.scraper import ifixit2zim + from ifixit2zim.scraper import IFixit2Zim try: - scraper = ifixit2zim(**dict(args._get_kwargs())) + scraper = IFixit2Zim(**dict(args._get_kwargs())) sys.exit(scraper.run()) except Exception as exc: logger.error(f"FAILED. An error occurred: {exc}") if args.debug: logger.exception(exc) - raise SystemExit(1) + raise SystemExit(1) from None if __name__ == "__main__": diff --git a/src/ifixit2zim/exceptions.py b/src/ifixit2zim/exceptions.py index 136b084..ed0c562 100644 --- a/src/ifixit2zim/exceptions.py +++ b/src/ifixit2zim/exceptions.py @@ -1,10 +1,14 @@ -class FinalScrapingFailure(Exception): +class FinalScrapingFailureError(Exception): pass -class UnexpectedDataKindException(Exception): +class UnexpectedDataKindExceptionError(Exception): pass class CategoryHomePageContentError(Exception): pass + + +class ImageUrlNotFoundError(Exception): + pass diff --git a/src/ifixit2zim/executor.py b/src/ifixit2zim/executor.py index a0a7566..0763e8a 100644 --- a/src/ifixit2zim/executor.py +++ b/src/ifixit2zim/executor.py @@ -140,7 +140,7 @@ def release_halt(self): """release the `no_more` flag preventing workers from taking up tasks""" self.no_more = False - def shutdown(self, wait=True): + def shutdown(self, *, wait=True): """stop the executor, either somewhat immediately or awaiting completion""" logger.debug(f"shutting down executor {self.prefix} with {wait=}") with self._shutdown_lock: diff --git a/src/ifixit2zim/imager.py b/src/ifixit2zim/imager.py index 5671840..25e92ca 100644 --- a/src/ifixit2zim/imager.py +++ b/src/ifixit2zim/imager.py @@ -6,6 +6,7 @@ import pathlib import re import urllib.parse +from typing import Optional from kiwixstorage import KiwixStorage, NotFoundError from PIL import Image @@ -13,20 +14,19 @@ from zimscraperlib.image.optimization import optimize_webp from ifixit2zim.constants import IMAGES_ENCODER_VERSION -from ifixit2zim.shared import Global -from ifixit2zim.utils import get_version_ident_for, to_url - -logger = Global.logger +from ifixit2zim.scraper import IFixit2Zim +from ifixit2zim.shared import logger class Imager: - def __init__(self): + def __init__(self, scraper: IFixit2Zim): self.aborted = False # list of source URLs that we've processed and added to ZIM self.handled = set() self.dedup_items = {} + self.scraper = scraper - Global.img_executor.start() + self.scraper.img_executor.start() def abort(self): """request imager to cancel processing of futures""" @@ -52,7 +52,7 @@ def get_image_data(self, url: str) -> io.BytesIO: lossless=False, quality=60, method=6, - ) + ) # pyright: ignore[reportReturnType] def get_path_for(self, url: urllib.parse.ParseResult) -> str: url_with_only_path = urllib.parse.ParseResult( @@ -66,21 +66,23 @@ def get_path_for(self, url: urllib.parse.ParseResult) -> str: unquoted_url = urllib.parse.unquote(url_with_only_path.geturl()) return "images/{}".format(re.sub(r"^(https?)://", r"\1/", unquoted_url)) - def defer(self, url: str) -> str: + def defer(self, url: str) -> Optional[str]: """request full processing of url, returning in-zim path immediately""" # find actual URL should it be from a provider try: - url = urllib.parse.urlparse(to_url(url)) + parsed_url = urllib.parse.urlparse(self.scraper.utils.to_url(url)) except Exception: logger.warning(f"Can't parse image URL `{url}`. Skipping") return - if url.scheme not in ("http", "https"): - logger.warning(f"Not supporting image URL `{url.geturl()}`. Skipping") + if parsed_url.scheme not in ("http", "https"): + logger.warning( + f"Not supporting image URL `{parsed_url.geturl()}`. Skipping" + ) return - path = self.get_path_for(url) + path = self.get_path_for(parsed_url) if path in self.handled: return path @@ -88,9 +90,9 @@ def defer(self, url: str) -> str: # record that we are processing this one self.handled.add(path) - Global.img_executor.submit( + self.scraper.img_executor.submit( self.process_image, - url=url, + url=parsed_url, path=path, mimetype="image/svg+xml" if path.endswith(".svg") else "image/webp", dont_release=True, @@ -107,34 +109,36 @@ def check_for_duplicate(self, path, content): def add_image_to_zim(self, path, content, mimetype): duplicate_path = self.check_for_duplicate(path, content) - with Global.lock: + with self.scraper.lock: if duplicate_path: - Global.creator.add_redirect( + self.scraper.creator.add_redirect( path=path, target_path=duplicate_path, ) else: - Global.creator.add_item_for( + self.scraper.creator.add_item_for( path=path, content=content, mimetype=mimetype, ) def add_missing_image_to_zim(self, path): - with Global.lock: - Global.creator.add_redirect( + with self.scraper.lock: + self.scraper.creator.add_redirect( path=path, target_path="assets/NoImage_300x225.jpg", ) - def process_image(self, url: str, path: str, mimetype: str) -> str: + def process_image( + self, url: urllib.parse.ParseResult, path: str, mimetype: str + ) -> Optional[str]: """download image from url or S3 and add to Zim at path. Upload if req.""" if self.aborted: return # just download, optimize and add to ZIM if not using S3 - if not Global.conf.s3_url: + if not self.scraper.configuration.s3_url: try: fileobj = self.get_image_data(url.geturl()) except Exception as exc: @@ -156,7 +160,7 @@ def process_image(self, url: str, path: str, mimetype: str) -> str: return path # we are using S3 cache - ident = get_version_ident_for(url.geturl()) + ident = self.scraper.utils.get_version_ident_for(url.geturl()) if ident is None: logger.error(f"Unable to query {url.geturl()}. Skipping") self.add_missing_image_to_zim( @@ -165,7 +169,7 @@ def process_image(self, url: str, path: str, mimetype: str) -> str: return path # key = self.get_s3_key_for(url.geturl()) - s3_storage = KiwixStorage(Global.conf.s3_url) + s3_storage = KiwixStorage(self.scraper.configuration.s3_url) meta = {"ident": ident, "encoder_version": str(IMAGES_ENCODER_VERSION)} download_failed = False # useful to trigger reupload or not diff --git a/src/ifixit2zim/processor.py b/src/ifixit2zim/processor.py new file mode 100644 index 0000000..d7536af --- /dev/null +++ b/src/ifixit2zim/processor.py @@ -0,0 +1,365 @@ +import datetime +import re +import urllib.parse + +import requests + +from ifixit2zim.constants import ( + DEFAULT_DEVICE_IMAGE_URL, + DEFAULT_GUIDE_IMAGE_URL, + DEFAULT_USER_IMAGE_URLS, + DEFAULT_WIKI_IMAGE_URL, + NOT_YET_AVAILABLE, + UNAVAILABLE_OFFLINE, +) +from ifixit2zim.exceptions import ImageUrlNotFoundError +from ifixit2zim.scraper import IFixit2Zim +from ifixit2zim.shared import logger, setlocale + + +class Processor: + def __init__(self, scraper: IFixit2Zim) -> None: + self.scraper = scraper + self.null_categories = set() + self.ifixit_external_content = set() + self.final_hrefs = {} + + def guides_in_progress(self, guides, *, in_progress=True): + if in_progress: + return [guide for guide in guides if "GUIDE_IN_PROGRESS" in guide["flags"]] + return [guide for guide in guides if "GUIDE_IN_PROGRESS" not in guide["flags"]] + + def category_count_parts(self, category): + if "parts" not in category: + return 0 + if "total" not in category["parts"]: + return 0 + return category["parts"]["total"] + + def category_count_tools(self, category): + if "tools" not in category: + return 0 + return len(category["tools"]) + + def get_image_path(self, image_url): + return self.scraper.imager.defer(url=image_url) + + def _get_image_url_search( + self, obj, *, for_guide: bool, for_device: bool, for_wiki: bool, for_user: bool + ) -> str: + if "standard" in obj: + return obj["standard"] + if "medium" in obj: + return obj["medium"] + if "large" in obj: + return obj["large"] + if "original" in obj: + return obj["original"] + if for_guide: + return DEFAULT_GUIDE_IMAGE_URL + if for_device: + return DEFAULT_DEVICE_IMAGE_URL + if for_wiki: + return DEFAULT_WIKI_IMAGE_URL + if for_user and "userid" in obj: + idx = obj["userid"] % len(DEFAULT_USER_IMAGE_URLS) + return DEFAULT_USER_IMAGE_URLS[idx] + raise ImageUrlNotFoundError(f"Unable to find image URL in object {obj}") + + def get_image_url( + self, obj, *, for_guide=False, for_device=False, for_wiki=False, for_user=False + ) -> str: + if obj.get("image"): + return self._get_image_url_search( + obj["image"], + for_guide=for_guide, + for_device=for_device, + for_wiki=for_wiki, + for_user=for_user, + ) + return self._get_image_url_search( + obj, + for_guide=for_guide, + for_device=for_device, + for_wiki=for_wiki, + for_user=for_user, + ) + + guide_regex_full = re.compile( + r"href=\"https://\w*\.ifixit\.\w*/Guide/.*/(?P\d*)\"" + ) + guide_regex_rel = re.compile(r"href=\"/Guide/.*/(?P\d*).*?\"") + + gbl_image_regex = r".*?)src\s*=\s*\"(?P.*?)\"" + gbl_href_regex = r"href\s*=\s*\"(?P.*?)\"" + gbl_youtube_regex = ( + r"(?!.*.+?)src=[\\\"']+(?P.+?)\"(?P.+?)" + ) + gbl_bgd_image_regex = ( + r"background-image:url\((?P"|\"|')" + r"(?P.*?)(?P"|\"|')\)" + ) + gbl_video_regex = r".*)" + gbl_iframe_regex = r".*?)\".*?" + gbl_regex = re.compile( + f"{gbl_image_regex}|{gbl_href_regex}|{gbl_youtube_regex}|{gbl_bgd_image_regex}" + f"|{gbl_video_regex}|{gbl_iframe_regex}" + ) + + href_anchor_regex = r"^(?P#.*)$" + href_object_kind_regex = ( + r"^(?:https*://[\w\.]*(?:ifixit)[\w\.]*)*/" + r"((?:(?P" + + "|".join(NOT_YET_AVAILABLE + UNAVAILABLE_OFFLINE) + + r")(?:/.+)?)" + r"|(?:(?PGuide|Anleitung|Guía|Guida|Tutoriel|Teardown)/" + r"(?P.+)/(?P\d+)(?P#.*)?.*)" + r"|(?:(?PDevice|Topic)/(?P[\w%_\.-]+)" + r"(?P#.*)?.*)" + r"|(?PUser)/(?P\d*)/(?P[\w%_\.+'-]+)" + r"(?P#.*)?.*" + r"|(?:(?PInfo)/(?P[\w%_\.-]+)(?P#.*)?.*))$" + ) + href_regex = re.compile( + f"{href_anchor_regex}|{href_object_kind_regex}", flags=re.IGNORECASE + ) + + def _process_external_url(self, url, rel_prefix): + if "ifixit" in url: + self.ifixit_external_content.add(url) + return f"{rel_prefix}home/external_content?url={urllib.parse.quote(url)}" + + def _process_unrecognized_href(self, url, rel_prefix): + return self._process_external_url(url, rel_prefix) + + def _process_href_regex_dynamics(self, href, rel_prefix): + if "Guide/login/register" in href or "Guide/new" in href: + return ( + f"{rel_prefix}home/unavailable_offline" + f"?url={urllib.parse.quote(href)}" + ) + return None + + def _process_href_regex_nomatch(self, href, rel_prefix, match): + if match: + return None + return self._process_unrecognized_href(href, rel_prefix) + + def _process_href_regex_anchor(self, match): + if not match.group("anchor"): + return None + return f"{match.group('anchor')}" + + def _process_href_regex_guide(self, rel_prefix, match): + if not match.group("guide"): + return None + link = self.scraper.get_guide_link_from_props( + guideid=match.group("guideid"), + guidetitle=urllib.parse.unquote_plus(match.group("guidetitle")), + ) + return f"{rel_prefix}{link}{match.group('guideafter') or ''}" + + def _process_href_regex_device(self, rel_prefix, match): + if not match.group("device"): + return None + link = self.scraper.get_category_link_from_props( + category_title=urllib.parse.unquote_plus(match.group("devicetitle")) + ) + return f"{rel_prefix}{link}{match.group('deviceafter') or ''}" + + def _process_href_regex_info(self, rel_prefix, match): + if not match.group("info"): + return None + link = self.scraper.get_info_link_from_props( + info_title=urllib.parse.unquote_plus(match.group("infotitle")) + ) + return f"{rel_prefix}{link}{match.group('infoafter') or ''}" + + def _process_href_regex_user(self, rel_prefix, match): + if not match.group("user"): + return None + link = self.scraper.get_user_link_from_props( + userid=match.group("userid"), + usertitle=urllib.parse.unquote_plus(match.group("usertitle")), + ) + return f"{rel_prefix}{link}{match.group('userafter') or ''}" + + def _process_href_regex_kind(self, href, rel_prefix, match): + if not match.group("kind"): + return None + if match.group("kind").lower() in NOT_YET_AVAILABLE: + return f"{rel_prefix}home/not_yet_available?url={urllib.parse.quote(href)}" + if match.group("kind").lower() in UNAVAILABLE_OFFLINE: + return ( + f"{rel_prefix}home/unavailable_offline" + f"?url={urllib.parse.quote(href)}" + ) + raise Exception( + f"Unsupported kind '{match.group('kind')}' in _process_href_regex" + ) + + def normalize_href(self, href): + if href in self.final_hrefs: + return self.final_hrefs[href] + try: + logger.debug(f"Normalizing href {href}") + # final_href = requests.head(href).headers.get("Location") + # if final_href is None: + # logger.debug(f"Failed to HEAD {href}, falling back to GET") + final_href = requests.get(href, stream=True, timeout=10).url + # parse final href and remove scheme + netloc + slash + parsed_final_href = urllib.parse.urlparse(final_href) + parsed_href = urllib.parse.urlparse(href) + chars_to_remove = len(parsed_final_href.scheme + "://") + + # remove domain if redirect is on same domain (almost always) + if parsed_final_href.netloc == parsed_href.netloc: + chars_to_remove += len(parsed_final_href.netloc) + + final_href = final_href[chars_to_remove:] + final_href = urllib.parse.unquote(final_href) + except Exception: + # this is quite expected for some missing items ; this will be taken care + # of at retrieval, no way to do something better + final_href = href + self.final_hrefs[href] = final_href + logger.debug(f"Result is {final_href}") + return final_href + + def _process_href_regex(self, href, rel_prefix): + if href.startswith("/"): + href = self.scraper.configuration.main_url.geturl() + href + if href.startswith("http") and "ifixit.com/" in href: + href = self.normalize_href(href) + href = urllib.parse.quote(href) + match = self.href_regex.search(href) + res = ( + self._process_href_regex_dynamics(href=href, rel_prefix=rel_prefix) + or self._process_href_regex_nomatch( + href=href, rel_prefix=rel_prefix, match=match + ) + or self._process_href_regex_anchor(match=match) + or self._process_href_regex_guide(rel_prefix=rel_prefix, match=match) + or self._process_href_regex_device(rel_prefix=rel_prefix, match=match) + or self._process_href_regex_info(rel_prefix=rel_prefix, match=match) + or self._process_href_regex_user(rel_prefix=rel_prefix, match=match) + or self._process_href_regex_kind( + href=href, rel_prefix=rel_prefix, match=match + ) + ) + if res is None: + raise Exception("Unsupported match in _process_href_regex") + return res + + def _process_youtube(self, match, rel_prefix): + return ( + f'" + f"" + ) + + def _process_bgdimgurl(self, match, rel_prefix): + return ( + f"background-image:url({match.group('quote1')}{rel_prefix}" + f"{self.get_image_path(match.group('bgdimgurl'))}" + f"{match.group('quote2')})" + ) + + def _process_video(self): + return "

Video not scrapped

" + + def _process_iframe(self, match, rel_prefix): + return ( + f'External content' + ) + + def _process_gbl_regex(self, match, rel_prefix): + if match.group("image_url"): + return ( + f" 0: + return user["username"] + if user["unique_username"] and len(user["unique_username"]) > 0: + return f"@{user['unique_username']}" + return "Anonymous" diff --git a/src/ifixit2zim/scraper.py b/src/ifixit2zim/scraper.py index 33775b0..3c05b1b 100644 --- a/src/ifixit2zim/scraper.py +++ b/src/ifixit2zim/scraper.py @@ -1,33 +1,46 @@ +import datetime import json import pathlib import shutil -from datetime import datetime +import threading +from jinja2 import Environment, FileSystemLoader, select_autoescape from schedule import every from zimscraperlib.image.transformation import resize_image - -from ifixit2zim.constants import ROOT_DIR, Conf +from zimscraperlib.inputs import compute_descriptions +from zimscraperlib.zim.creator import Creator + +from ifixit2zim.constants import ( + DEFAULT_HOMEPAGE, + ROOT_DIR, + Configuration, +) +from ifixit2zim.executor import Executor +from ifixit2zim.imager import Imager +from ifixit2zim.processor import Processor from ifixit2zim.scraper_category import ScraperCategory from ifixit2zim.scraper_guide import ScraperGuide from ifixit2zim.scraper_homepage import ScraperHomepage from ifixit2zim.scraper_info import ScraperInfo from ifixit2zim.scraper_user import ScraperUser -from ifixit2zim.shared import Global, GlobalMixin, logger -from ifixit2zim.utils import setup_s3_and_check_credentials +from ifixit2zim.shared import logger +from ifixit2zim.utils import Utils + +LOCALE_LOCK = threading.Lock() -class ifixit2zim(GlobalMixin): +class IFixit2Zim: def __init__(self, **kwargs): - Global.conf = Conf(**kwargs) - for option in Global.conf.required: - if getattr(Global.conf, option) is None: + self.configuration = Configuration(**kwargs) + for option in self.configuration.required: + if getattr(self.configuration, option) is None: raise ValueError(f"Missing parameter `{option}`") - self.scraper_homepage = ScraperHomepage() - self.scraper_guide = ScraperGuide() - self.scraper_category = ScraperCategory() - self.scraper_info = ScraperInfo() - self.scraper_user = ScraperUser() + self.scraper_homepage = ScraperHomepage(scraper=self) + self.scraper_guide = ScraperGuide(scraper=self) + self.scraper_category = ScraperCategory(scraper=self) + self.scraper_info = ScraperInfo(scraper=self) + self.scraper_user = ScraperUser(scraper=self) self.scrapers = [ self.scraper_homepage, self.scraper_category, @@ -35,14 +48,17 @@ def __init__(self, **kwargs): self.scraper_info, self.scraper_user, ] + self.lock = threading.Lock() + self.processor = Processor(scraper=self) + self.utils = Utils(configuration=self.configuration) @property def build_dir(self): - return self.conf.build_dir + return self.configuration.build_dir def cleanup(self): """Remove temp files and release resources before exiting""" - if not self.conf.keep_build_dir: + if not self.configuration.keep_build_dir: logger.debug(f"Removing {self.build_dir}") shutil.rmtree(self.build_dir, ignore_errors=True) @@ -50,48 +66,56 @@ def sanitize_inputs(self): """input & metadata sanitation""" logger.debug("Checking user-provided metadata") - if not self.conf.name: + if not self.configuration.name: is_selection = ( - self.conf.categories - or self.conf.guides - or self.conf.infos - or self.conf.no_category - or self.conf.no_guide - or self.conf.no_info + self.configuration.categories + or self.configuration.guides + or self.configuration.infos + or self.configuration.no_category + or self.configuration.no_guide + or self.configuration.no_info ) - self.conf.name = "ifixit_{lang}_{selection}".format( - lang=self.conf.language["iso-639-1"], + self.configuration.name = "ifixit_{lang}_{selection}".format( + lang=self.configuration.language["iso-639-1"], selection="selection" if is_selection else "all", ) - period = datetime.now().strftime("%Y-%m") - if self.conf.fname: + period = datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m") + if self.configuration.fname: # make sure we were given a filename and not a path - self.conf.fname = pathlib.Path(self.conf.fname.format(period=period)) - if pathlib.Path(self.conf.fname.name) != self.conf.fname: - raise ValueError(f"filename is not a filename: {self.conf.fname}") + self.configuration.fpath = pathlib.Path( + self.configuration.fname.format(period=period) + ) + if pathlib.Path(self.configuration.fpath.name) != self.configuration.fpath: + raise ValueError( + f"filename is not a filename: {self.configuration.fname}" + ) else: - self.conf.fname = f"{self.conf.name}_{period}.zim" - - if not self.conf.title: - self.conf.title = self.metadata["title"] - self.conf.title = self.conf.title.strip() + self.configuration.fpath = pathlib.Path( + f"{self.configuration.name}_{period}.zim" + ) - if not self.conf.description: - self.conf.description = self.metadata["description"] - self.conf.description = self.conf.description.strip() + # TODO: fixed title based on defined convention (30 chars only) + if not self.configuration.title: + self.configuration.title = self.metadata["title"] + self.configuration.title = self.configuration.title.strip() + + ( + self.configuration.description, + self.configuration.long_description, + ) = compute_descriptions( + self.metadata["description"], + self.configuration.description, + self.configuration.long_description, + ) - if not self.conf.author: - self.conf.author = "iFixit" - self.conf.author = self.conf.author.strip() + self.configuration.author = self.configuration.author.strip() - if not self.conf.publisher: - self.conf.publisher = "openZIM" - self.conf.publisher = self.conf.publisher.strip() + self.configuration.publisher = self.configuration.publisher.strip() - self.conf.tags = list( + self.configuration.tag = list( { - *self.conf.tag, + *self.configuration.tag, "_category:iFixit", "iFixit", "_videos:yes", @@ -101,10 +125,10 @@ def sanitize_inputs(self): logger.debug( "Configuration after sanitization:\n" - f"name: {self.conf.name}\n" - f"fname: {self.conf.fname}\n" - f"name: {self.conf.author}\n" - f"fname: {self.conf.publisher}" + f"name: {self.configuration.name}\n" + f"fname: {self.configuration.fname}\n" + f"author: {self.configuration.author}\n" + f"publisher: {self.configuration.publisher}" ) def add_assets(self): @@ -137,13 +161,77 @@ def add_illustrations(self): with self.lock: self.creator.add_illustration(size, fh.read()) + def setup(self): + # order matters are there are references between them + + # images handled on a different queue. + # mostly network I/O to retrieve and/or upload image. + # if not in S3 bucket, convert/optimize webp image + # svg images, stored but not optimized + + self.img_executor = Executor( + queue_size=100, + nb_workers=50, + prefix="IMG-T-", + ) + + self.imager = Imager(scraper=self) + + self.creator = Creator( + filename=self.configuration.output_dir / self.configuration.fpath, + main_path=DEFAULT_HOMEPAGE, + workaround_nocancel=False, + ).config_metadata( + Illustration_48x48_at_1=b"illustration", + Language=self.configuration.language["iso-639-3"], + Title=self.configuration.title, + Description=self.configuration.description, + Creator=self.configuration.author, + Publisher=self.configuration.publisher, + Name=self.configuration.name, + Tags=";".join(self.configuration.tag), + Date=datetime.datetime.now(tz=datetime.timezone.utc).date(), + ) + + # jinja2 environment setup + self.env = Environment( + loader=FileSystemLoader(ROOT_DIR.joinpath("templates")), + autoescape=select_autoescape(), + ) + + def _raise_helper(msg): + raise Exception(msg) + + self.env.globals["raise"] = _raise_helper + self.env.globals["str"] = lambda x: str(x) + self.env.filters["guides_in_progress"] = self.processor.guides_in_progress + self.env.filters["category_count_parts"] = self.processor.category_count_parts + self.env.filters["category_count_tools"] = self.processor.category_count_tools + self.env.filters["get_image_path"] = self.processor.get_image_path + self.env.filters["get_image_url"] = self.processor.get_image_url + self.env.filters["cleanup_rendered_content"] = ( + self.processor.cleanup_rendered_content + ) + self.env.filters["get_timestamp_day_rendered"] = ( + self.processor.get_timestamp_day_rendered + ) + self.env.filters["get_item_comments_count"] = ( + self.processor.get_item_comments_count + ) + self.env.filters["get_guide_total_comments_count"] = ( + self.processor.get_guide_total_comments_count + ) + self.env.filters["get_user_display_name"] = self.processor.get_user_display_name + def run(self): # first report => creates a file with appropriate structure self.report_progress() s3_storage = ( - setup_s3_and_check_credentials(self.conf.s3_url_with_credentials) - if self.conf.s3_url_with_credentials + self.utils.setup_s3_and_check_credentials( + self.configuration.s3_url_with_credentials + ) + if self.configuration.s3_url_with_credentials else None ) s3_msg = ( @@ -157,14 +245,14 @@ def run(self): logger.info( f"Starting scraper with:\n" - f" language: {self.conf.language['english']}" - f" ({self.conf.domain})\n" - f" output_dir: {self.conf.output_dir}\n" + f" language: {self.configuration.language['english']}" + f" ({self.configuration.domain})\n" + f" output_dir: {self.configuration.output_dir}\n" f" build_dir: {self.build_dir}\n" f"{s3_msg}" ) - Global.metadata = self.scraper_homepage.get_online_metadata() + self.metadata = self.scraper_homepage.get_online_metadata() logger.debug( f"Additional metadata scrapped online:\n" f"title: {self.metadata['title']}\n" @@ -174,37 +262,37 @@ def run(self): self.sanitize_inputs() logger.debug("Starting Zim creation") - Global.setup() - Global.env.filters[ - "get_category_link_from_obj" - ] = self.scraper_category.get_category_link_from_obj - Global.env.filters[ - "get_category_link_from_props" - ] = self.scraper_category.get_category_link_from_props - Global.env.filters[ - "get_guide_link_from_obj" - ] = self.scraper_guide.get_guide_link_from_obj - Global.env.filters[ - "get_guide_link_from_props" - ] = self.scraper_guide.get_guide_link_from_props - Global.env.filters[ - "get_info_link_from_obj" - ] = self.scraper_info.get_info_link_from_obj - Global.env.filters[ - "get_info_link_from_props" - ] = self.scraper_info.get_info_link_from_props - Global.env.filters[ - "get_user_link_from_obj" - ] = self.scraper_user.get_user_link_from_obj - Global.env.filters[ - "get_user_link_from_props" - ] = self.scraper_user.get_user_link_from_props - Global.get_category_link_from_props = ( + self.setup() + self.env.filters["get_category_link_from_obj"] = ( + self.scraper_category.get_category_link_from_obj + ) + self.env.filters["get_category_link_from_props"] = ( + self.scraper_category.get_category_link_from_props + ) + self.env.filters["get_guide_link_from_obj"] = ( + self.scraper_guide.get_guide_link_from_obj + ) + self.env.filters["get_guide_link_from_props"] = ( + self.scraper_guide.get_guide_link_from_props + ) + self.env.filters["get_info_link_from_obj"] = ( + self.scraper_info.get_info_link_from_obj + ) + self.env.filters["get_info_link_from_props"] = ( + self.scraper_info.get_info_link_from_props + ) + self.env.filters["get_user_link_from_obj"] = ( + self.scraper_user.get_user_link_from_obj + ) + self.env.filters["get_user_link_from_props"] = ( + self.scraper_user.get_user_link_from_props + ) + self.get_category_link_from_props = ( self.scraper_category.get_category_link_from_props ) - Global.get_guide_link_from_props = self.scraper_guide.get_guide_link_from_props - Global.get_info_link_from_props = self.scraper_info.get_info_link_from_props - Global.get_user_link_from_props = self.scraper_user.get_user_link_from_props + self.get_guide_link_from_props = self.scraper_guide.get_guide_link_from_props + self.get_info_link_from_props = self.scraper_info.get_info_link_from_props + self.get_user_link_from_props = self.scraper_user.get_user_link_from_props for scraper in self.scrapers: scraper.setup() self.creator.start() @@ -225,7 +313,7 @@ def run(self): for scraper in self.scrapers: scraper.scrape_items() needs_rerun = False - if not Global.conf.scrape_only_first_items: + if not self.configuration.scrape_only_first_items: for scraper in self.scrapers: if not scraper.items_queue.empty(): needs_rerun = True @@ -233,7 +321,7 @@ def run(self): break logger.info("Awaiting images") - Global.img_executor.shutdown() + self.img_executor.shutdown() self.report_progress() @@ -257,11 +345,11 @@ def run(self): logger.info(stats) logger.info("Null categories:") - for key in Global.null_categories: + for key in self.processor.null_categories: logger.info(f"\t{key}") logger.info("IFIXIT_EXTERNAL URLS:") - for exturl in sorted(Global.ifixit_external_content): + for exturl in sorted(self.processor.ifixit_external_content): logger.info(f"\t{exturl}") except Exception as exc: @@ -273,7 +361,7 @@ def run(self): logger.error(f"Interrupting process due to error: {exc}") logger.exception(exc) self.imager.abort() - Global.img_executor.shutdown(wait=False) + self.img_executor.shutdown(wait=False) return 1 else: if self.creator.can_finish: @@ -292,7 +380,7 @@ def run(self): logger.info("Scraper has finished normally") def report_progress(self): - if not Global.conf.stats_filename: + if not self.configuration.stats_path: return done = 0 total = 0 @@ -308,5 +396,5 @@ def report_progress(self): "done": done, "total": total, } - with open(Global.conf.stats_filename, "w") as outfile: + with open(self.configuration.stats_path, "w") as outfile: json.dump(progress, outfile, indent=2) diff --git a/src/ifixit2zim/scraper_category.py b/src/ifixit2zim/scraper_category.py index 9ae4040..8b0e59f 100644 --- a/src/ifixit2zim/scraper_category.py +++ b/src/ifixit2zim/scraper_category.py @@ -1,18 +1,18 @@ -import urllib +import urllib.parse from ifixit2zim.constants import CATEGORY_LABELS, URLS -from ifixit2zim.exceptions import UnexpectedDataKindException +from ifixit2zim.exceptions import UnexpectedDataKindExceptionError +from ifixit2zim.scraper import IFixit2Zim from ifixit2zim.scraper_generic import ScraperGeneric -from ifixit2zim.shared import Global, logger -from ifixit2zim.utils import get_api_content +from ifixit2zim.shared import logger class ScraperCategory(ScraperGeneric): - def __init__(self): - super().__init__() + def __init__(self, scraper: IFixit2Zim): + super().__init__(scraper) def setup(self): - self.category_template = Global.env.get_template("category.html") + self.category_template = self.env.get_template("category.html") def get_items_name(self): return "category" @@ -27,19 +27,19 @@ def _add_category_to_scrape(self, category_key, category_title, is_expected): ) def _get_category_key_from_title(self, category_title): - return Global.convert_title_to_filename(category_title.lower()) + return self.processor.convert_title_to_filename(category_title.lower()) def _build_category_path(self, category_title): href = ( - Global.conf.main_url.geturl() + self.configuration.main_url.geturl() + f"/Device/{category_title.replace('/', ' ')}" ) - final_href = Global.normalize_href(href) + final_href = self.processor.normalize_href(href) return final_href[1:] def get_category_link_from_obj(self, category): if "title" not in category or not category["title"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( f"Impossible to extract category title from {category}" ) category_title = category["title"] @@ -47,12 +47,12 @@ def get_category_link_from_obj(self, category): def get_category_link_from_props(self, category_title): category_path = urllib.parse.quote(self._build_category_path(category_title)) - if Global.conf.no_category: + if self.configuration.no_category: return f"home/not_scrapped?url={category_path}" category_key = self._get_category_key_from_title(category_title) - if Global.conf.categories: + if self.configuration.categories: is_not_included = True - for other_category in Global.conf.categories: + for other_category in self.configuration.categories: other_category_key = self._get_category_key_from_title(other_category) if other_category_key == category_key: is_not_included = False @@ -69,39 +69,43 @@ def _process_categories(self, categories): self._process_categories(categories[category]) def build_expected_items(self): - if Global.conf.no_category: + if self.configuration.no_category: logger.info("No category required") return - if Global.conf.categories: + if self.configuration.categories: logger.info("Adding required categories as expected") - for category in Global.conf.categories: + for category in self.configuration.categories: category_key = self._get_category_key_from_title(category) self._add_category_to_scrape(category_key, category, True) return logger.info("Downloading list of categories") - categories = get_api_content("/categories", includeStubs=True) + categories = self.scraper.utils.get_api_content( + "/categories", includeStubs=True + ) self._process_categories(categories) logger.info(f"{len(self.expected_items_keys)} categories found") - def get_one_item_content(self, item_key, item_data): + def get_one_item_content(self, item_key, item_data): # noqa ARG002 categoryid = item_key - category_content = get_api_content( - f"/wikis/CATEGORY/{categoryid}", langid=Global.conf.lang_code + category_content = self.scraper.utils.get_api_content( + f"/wikis/CATEGORY/{categoryid}", langid=self.configuration.lang_code ) if category_content and category_content["revisionid"] > 0: return category_content logger.warning("Falling back to category in EN") - category_content = get_api_content(f"/wikis/CATEGORY/{categoryid}", langid="en") + category_content = self.scraper.utils.get_api_content( + f"/wikis/CATEGORY/{categoryid}", langid="en" + ) if category_content and category_content["revisionid"] > 0: return category_content for lang in URLS.keys(): logger.warning(f"Falling back to category in {lang}") - category_content = get_api_content( + category_content = self.scraper.utils.get_api_content( f"/wikis/CATEGORY/{categoryid}", langid=lang ) @@ -109,28 +113,28 @@ def get_one_item_content(self, item_key, item_data): return category_content logger.warning(f"Impossible to get category content: {item_key}") - Global.null_categories.add(item_key) + self.processor.null_categories.add(item_key) return None - def add_item_redirect(self, item_key, item_data, redirect_kind): + def add_item_redirect(self, item_key, item_data, redirect_kind): # noqa ARG002 path = self._build_category_path(item_data["category_title"]) - Global.add_redirect( + self.processor.add_redirect( path=path, target_path=f"home/{redirect_kind}?{urllib.parse.urlencode({'url':path})}", ) - def process_one_item(self, item_key, item_data, item_content): + def process_one_item(self, item_key, item_data, item_content): # noqa ARG002 category_content = item_content category_rendered = self.category_template.render( category=category_content, - label=CATEGORY_LABELS[Global.conf.lang_code], - metadata=Global.metadata, - lang=Global.conf.lang_code, + label=CATEGORY_LABELS[self.configuration.lang_code], + metadata=self.metadata, + lang=self.configuration.lang_code, ) - Global.add_html_item( + self.processor.add_html_item( path=self._build_category_path(category_title=category_content["title"]), title=category_content["display_title"], content=category_rendered, diff --git a/src/ifixit2zim/scraper_generic.py b/src/ifixit2zim/scraper_generic.py index 7bf1a91..8f73045 100644 --- a/src/ifixit2zim/scraper_generic.py +++ b/src/ifixit2zim/scraper_generic.py @@ -4,18 +4,50 @@ from schedule import run_pending -from ifixit2zim.exceptions import FinalScrapingFailure -from ifixit2zim.shared import Global, logger +from ifixit2zim.exceptions import FinalScrapingFailureError +from ifixit2zim.scraper import IFixit2Zim +from ifixit2zim.shared import logger + +FIRST_ITEMS_COUNT = 5 class ScraperGeneric(ABC): - def __init__(self): + def __init__(self, scraper: IFixit2Zim): + self.scraper = scraper self.expected_items_keys = {} self.unexpected_items_keys = {} self.items_queue = Queue() self.missing_items_keys = set() self.error_items_keys = set() + @property + def configuration(self): + return self.scraper.configuration + + @property + def utils(self): + return self.scraper.utils + + @property + def metadata(self): + return self.scraper.metadata + + @property + def env(self): + return self.scraper.env + + @property + def lock(self): + return self.scraper.lock + + @property + def creator(self): + return self.scraper.creator + + @property + def processor(self): + return self.scraper.processor + @abstractmethod def setup(self): pass @@ -41,7 +73,7 @@ def process_one_item(self, item_key, item_data, item_content): pass def add_item_to_scrape( - self, item_key, item_data, is_expected, warn_unexpected=True + self, item_key, item_data, is_expected, *, warn_unexpected=True ): item_key = str(item_key) # just in case it's an int if ( @@ -101,7 +133,10 @@ def scrape_items(self): num_items = 1 while not self.items_queue.empty(): run_pending() - if Global.conf.scrape_only_first_items and num_items > 5: + if ( + self.configuration.scrape_only_first_items + and num_items > FIRST_ITEMS_COUNT + ): break item = self.items_queue.get(block=False) item_key = item["key"] @@ -124,9 +159,9 @@ def scrape_items(self): len(self.missing_items_keys) * 100 / (len(self.expected_items_keys) + len(self.unexpected_items_keys)) - > Global.conf.max_missing_items_percent + > self.configuration.max_missing_items_percent ): - raise FinalScrapingFailure( + raise FinalScrapingFailureError( f"Too many {self.get_items_name()}s found missing: " f"{len(self.missing_items_keys)}" ) @@ -134,9 +169,9 @@ def scrape_items(self): len(self.error_items_keys) * 100 / (len(self.expected_items_keys) + len(self.unexpected_items_keys)) - > Global.conf.max_error_items_percent + > self.configuration.max_error_items_percent ): - raise FinalScrapingFailure( + raise FinalScrapingFailureError( f"Too many {self.get_items_name()}s failed to be processed: " f"{len(self.error_items_keys)}" ) diff --git a/src/ifixit2zim/scraper_guide.py b/src/ifixit2zim/scraper_guide.py index ed3a29b..21b3747 100644 --- a/src/ifixit2zim/scraper_guide.py +++ b/src/ifixit2zim/scraper_guide.py @@ -1,4 +1,4 @@ -import urllib +import urllib.parse from ifixit2zim.constants import ( DIFFICULTY_EASY, @@ -10,18 +10,18 @@ UNKNOWN_LOCALE, UNKNOWN_TITLE, ) -from ifixit2zim.exceptions import UnexpectedDataKindException +from ifixit2zim.exceptions import UnexpectedDataKindExceptionError +from ifixit2zim.scraper import IFixit2Zim from ifixit2zim.scraper_generic import ScraperGeneric -from ifixit2zim.shared import Global, logger -from ifixit2zim.utils import get_api_content +from ifixit2zim.shared import logger class ScraperGuide(ScraperGeneric): - def __init__(self): - super().__init__() + def __init__(self, scraper: IFixit2Zim): + super().__init__(scraper) def setup(self): - self.guide_template = Global.env.get_template("guide.html") + self.guide_template = self.env.get_template("guide.html") def get_items_name(self): return "guide" @@ -37,22 +37,22 @@ def _add_guide_to_scrape(self, guideid, guidetitle, locale, is_expected): is_expected, ) - def _build_guide_path(self, guideid, guidetitle): - href = Global.conf.main_url.geturl() + f"/Guide/-/{guideid}" - final_href = Global.normalize_href(href) + def _build_guide_path(self, guideid, guidetitle): # noqa ARG002 + href = self.configuration.main_url.geturl() + f"/Guide/-/{guideid}" + final_href = self.processor.normalize_href(href) return final_href[1:] def get_guide_link_from_obj(self, guide): if "guideid" not in guide or not guide["guideid"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( f"Impossible to extract guide id from {guide}" ) if "locale" not in guide or not guide["locale"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( f"Impossible to extract guide locale from {guide}" ) if "title" not in guide or not guide["title"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( f"Impossible to extract guide title from {guide}" ) guideid = guide["guideid"] @@ -80,28 +80,30 @@ def get_guide_link_from_props( guide_path = urllib.parse.quote( self._build_guide_path(guideid=guideid, guidetitle=guidetitle) ) - if Global.conf.no_guide: + if self.configuration.no_guide: return f"home/not_scrapped?url={guide_path}" - if Global.conf.guides and str(guideid) not in Global.conf.guides: + if self.configuration.guides and str(guideid) not in self.configuration.guides: return f"home/not_scrapped?url={guide_path}" self._add_guide_to_scrape(guideid, guidetitle, guidelocale, False) return guide_path def build_expected_items(self): - if Global.conf.no_guide: + if self.configuration.no_guide: logger.info("No guide required") return - if Global.conf.guides: + if self.configuration.guides: logger.info("Adding required guides as expected") - for guide in Global.conf.guides: + for guide in self.configuration.guides: self._add_guide_to_scrape(guide, UNKNOWN_TITLE, UNKNOWN_LOCALE, True) return logger.info("Downloading list of guides") limit = 200 offset = 0 while True: - guides = get_api_content("/guides", limit=limit, offset=offset) - if len(guides) == 0: + guides = self.scraper.utils.get_api_content( + "/guides", limit=limit, offset=offset + ) + if not guides or len(guides) == 0: break for guide in guides: # we ignore archived guides since they are not accessible anywayß @@ -114,7 +116,7 @@ def build_expected_items(self): # on this endpoint, so we consider it as unknown for now self._add_guide_to_scrape(guideid, UNKNOWN_TITLE, UNKNOWN_LOCALE, True) offset += limit - if Global.conf.scrape_only_first_items: + if self.configuration.scrape_only_first_items: logger.warning( "Aborting the retrieval of all guides since only first items" " will be scraped anyway" @@ -127,14 +129,18 @@ def get_one_item_content(self, item_key, item_data): guide = item_data locale = guide["locale"] if locale == UNKNOWN_LOCALE: - locale = Global.conf.lang_code # fallback value + locale = self.configuration.lang_code # fallback value if locale == "ja": locale = "jp" # Unusual iFixit convention - guide_content = get_api_content(f"/guides/{guideid}", langid=locale) + guide_content = self.scraper.utils.get_api_content( + f"/guides/{guideid}", langid=locale + ) if guide_content is None and locale != "en": # guide is most probably available in English anyway - guide_content = get_api_content(f"/guides/{guideid}", langid="en") + guide_content = self.scraper.utils.get_api_content( + f"/guides/{guideid}", langid="en" + ) return guide_content @@ -146,12 +152,12 @@ def add_item_redirect(self, item_key, item_data, redirect_kind): logger.warning(f"Cannot add redirect for guide {guideid} in error") return path = self._build_guide_path(guideid, guidetitle) - Global.add_redirect( + self.processor.add_redirect( path=path, target_path=f"home/{redirect_kind}?{urllib.parse.urlencode({'url':path})}", ) - def process_one_item(self, item_key, item_data, item_content): + def process_one_item(self, item_key, item_data, item_content): # noqa ARG002 guide_content = item_content if guide_content["type"] != "teardown": @@ -166,7 +172,7 @@ def process_one_item(self, item_key, item_data, item_content): elif guide_content["difficulty"] in DIFFICULTY_VERY_HARD: guide_content["difficulty_class"] = "difficulty-5" else: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Unknown guide difficulty: '{}' in guide {}".format( guide_content["difficulty"], guide_content["guideid"], @@ -175,7 +181,7 @@ def process_one_item(self, item_key, item_data, item_content): for step in guide_content["steps"]: if not step["media"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Missing media attribute in step {} of guide {}".format( step["stepid"], guide_content["guideid"] ) @@ -185,14 +191,14 @@ def process_one_item(self, item_key, item_data, item_content): "video", "embed", ]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Unrecognized media type in step {} of guide {}".format( step["stepid"], guide_content["guideid"] ) ) if step["media"]["type"] == "video": if "data" not in step["media"] or not step["media"]["data"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Missing 'data' in step {} of guide {}".format( step["stepid"], guide_content["guideid"] ) @@ -201,7 +207,7 @@ def process_one_item(self, item_key, item_data, item_content): "image" not in step["media"]["data"] or not step["media"]["data"]["image"] ): - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Missing outer 'image' in step {} of guide {}".format( step["stepid"], guide_content["guideid"] ) @@ -210,14 +216,14 @@ def process_one_item(self, item_key, item_data, item_content): "image" not in step["media"]["data"]["image"] or not step["media"]["data"]["image"]["image"] ): - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Missing inner 'image' in step {} of guide {}".format( step["stepid"], guide_content["guideid"] ) ) if step["media"]["type"] == "embed": if "data" not in step["media"] or not step["media"]["data"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Missing 'data' in step {} of guide {}".format( step["stepid"], guide_content["guideid"] ) @@ -226,7 +232,7 @@ def process_one_item(self, item_key, item_data, item_content): "html" not in step["media"]["data"] or not step["media"]["data"]["html"] ): - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Missing 'html' in step {} of guide {}".format( step["stepid"], guide_content["guideid"] ) @@ -246,7 +252,7 @@ def process_one_item(self, item_key, item_data, item_content): "icon_caution", "icon_reminder", ]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( "Unrecognized bullet '{}' in step {} of guide {}".format( line["bullet"], step["stepid"], @@ -255,11 +261,11 @@ def process_one_item(self, item_key, item_data, item_content): ) guide_rendered = self.guide_template.render( guide=guide_content, - label=GUIDE_LABELS[Global.conf.lang_code], - metadata=Global.metadata, + label=GUIDE_LABELS[self.configuration.lang_code], + metadata=self.metadata, ) - Global.add_html_item( + self.processor.add_html_item( path=self._build_guide_path( guideid=guide_content["guideid"], guidetitle=guide_content["title"] ), diff --git a/src/ifixit2zim/scraper_homepage.py b/src/ifixit2zim/scraper_homepage.py index c3342f8..e6faf7b 100644 --- a/src/ifixit2zim/scraper_homepage.py +++ b/src/ifixit2zim/scraper_homepage.py @@ -4,18 +4,18 @@ from ifixit2zim.constants import DEFAULT_HOMEPAGE, HOME_LABELS from ifixit2zim.exceptions import CategoryHomePageContentError +from ifixit2zim.scraper import IFixit2Zim from ifixit2zim.scraper_generic import ScraperGeneric -from ifixit2zim.shared import Global, logger -from ifixit2zim.utils import get_soup +from ifixit2zim.shared import logger class ScraperHomepage(ScraperGeneric): - def __init__(self): - super().__init__() + def __init__(self, scraper: IFixit2Zim): + super().__init__(scraper) def setup(self): - self.homepage_template = Global.env.get_template("home.html") - self.not_here_template = Global.env.get_template("not_here.html") + self.homepage_template = self.env.get_template("home.html") + self.not_here_template = self.env.get_template("not_here.html") def get_items_name(self): return "home" @@ -23,14 +23,14 @@ def get_items_name(self): def build_expected_items(self): self.add_item_to_scrape(1, 1, True) - def get_one_item_content(self, item_key, item_data): - soup, _ = get_soup("/Guide") + def get_one_item_content(self, item_key, item_data): # noqa ARG002 + soup, _ = self.scraper.utils.get_soup("/Guide") return soup - def add_item_redirect(self, item_key, item_data, redirect_kind): + def add_item_redirect(self, item_key, item_data, redirect_kind): # noqa ARG002 logger.warning("Not supposed to add a redirect for a home item") - def process_one_item(self, item_key, item_data, item_content): + def process_one_item(self, item_key, item_data, item_content): # noqa ARG002 soup = item_content # extract and clean main content @@ -45,99 +45,102 @@ def process_one_item(self, item_key, item_data, item_content): } logger.debug( - "Content extracted from /Guide:\n" f"{json.dumps(home_content,indent=2)}" + f"Content extracted from /Guide:\n {json.dumps(home_content,indent=2)}" ) homepage = self.homepage_template.render( home_content=home_content, - metadata=Global.metadata, - label=HOME_LABELS[Global.conf.lang_code], + metadata=self.metadata, + label=HOME_LABELS[self.configuration.lang_code], ) not_scrapped = self.not_here_template.render( - metadata=Global.metadata, + metadata=self.metadata, kind="not_scrapped", ) external_content = self.not_here_template.render( - metadata=Global.metadata, + metadata=self.metadata, kind="external_content", ) unavailable_offline = self.not_here_template.render( - metadata=Global.metadata, + metadata=self.metadata, kind="unavailable_offline", ) not_yet_available = self.not_here_template.render( - metadata=Global.metadata, + metadata=self.metadata, kind="not_yet_available", ) missing = self.not_here_template.render( - metadata=Global.metadata, + metadata=self.metadata, kind="missing", ) error_content = self.not_here_template.render( - metadata=Global.metadata, + metadata=self.metadata, kind="error", ) - with Global.lock: - Global.creator.add_item_for( + with self.lock: + if not self.creator: + raise Exception("Please set creator first") + + self.creator.add_item_for( path="home/home", - title=Global.conf.title, + title=self.configuration.title, content=homepage, mimetype="text/html", is_front=True, ) - Global.creator.add_redirect(path=DEFAULT_HOMEPAGE, target_path="home/home") + self.creator.add_redirect(path=DEFAULT_HOMEPAGE, target_path="home/home") - Global.creator.add_item_for( + self.creator.add_item_for( path="home/not_scrapped", - title=Global.conf.title, + title=self.configuration.title, content=not_scrapped, mimetype="text/html", is_front=False, ) - Global.creator.add_item_for( + self.creator.add_item_for( path="home/external_content", - title=Global.conf.title, + title=self.configuration.title, content=external_content, mimetype="text/html", is_front=False, ) - Global.creator.add_item_for( + self.creator.add_item_for( path="home/unavailable_offline", - title=Global.conf.title, + title=self.configuration.title, content=unavailable_offline, mimetype="text/html", is_front=False, ) - Global.creator.add_item_for( + self.creator.add_item_for( path="home/not_yet_available", - title=Global.conf.title, + title=self.configuration.title, content=not_yet_available, mimetype="text/html", is_front=False, ) - Global.creator.add_item_for( + self.creator.add_item_for( path="home/missing", - title=Global.conf.title, + title=self.configuration.title, content=missing, mimetype="text/html", is_front=False, ) - Global.creator.add_item_for( + self.creator.add_item_for( path="home/error", - title=Global.conf.title, + title=self.configuration.title, content=error_content, mimetype="text/html", is_front=False, @@ -150,16 +153,16 @@ def _extract_page_title_from_page(self, soup): p = soup.select(page_title_selector) if len(p) == 0: raise CategoryHomePageContentError( - "No text found in page with selector " f"'{page_title_selector}'" + f"No text found in page with selector '{page_title_selector}'" ) if len(p) > 1: raise CategoryHomePageContentError( - "Too many text found in page with selector " f"'{page_title_selector}'" + f"Too many text found in page with selector '{page_title_selector}'" ) text = p[0].text if len(text) == 0: raise CategoryHomePageContentError( - "Empty text found in page with selector " f"'{page_title_selector}'" + f"Empty text found in page with selector '{page_title_selector}'" ) return text @@ -168,7 +171,7 @@ def _extract_primary_title_from_page(self, soup): p = soup.select(primary_title_selector) if len(p) == 0: raise CategoryHomePageContentError( - "No text found in page with selector " f"'{primary_title_selector}'" + f"No text found in page with selector '{primary_title_selector}'" ) if len(p) > 1: raise CategoryHomePageContentError( @@ -178,7 +181,7 @@ def _extract_primary_title_from_page(self, soup): text = p[0].text if len(text) == 0: raise CategoryHomePageContentError( - "Empty text found in page with selector " f"'{primary_title_selector}'" + f"Empty text found in page with selector '{primary_title_selector}'" ) return text @@ -187,7 +190,7 @@ def _extract_secondary_title_from_page(self, soup): p = soup.select(secondary_title_selector) if len(p) == 0: raise CategoryHomePageContentError( - "No text found in page with selector " f"'{secondary_title_selector}'" + f"No text found in page with selector '{secondary_title_selector}'" ) if len(p) > 1: raise CategoryHomePageContentError( @@ -395,8 +398,8 @@ def _extract_count_from_sub_category(self, sc): return int(text) except ValueError: raise CategoryHomePageContentError( - f"Failed to convert span text '{text}' to integer for " "sub-category" - ) + f"Failed to convert span text '{text}' to integer for sub-category" + ) from None def _extract_title_from_sub_category(self, sc): sub_category_img_css_selector = "span.overflow-slide-in" @@ -433,8 +436,8 @@ def _extract_stats_from_page(self, soup): kpi_d = json.loads(kpi) except json.decoder.JSONDecodeError as e: raise CategoryHomePageContentError( - f"Failed to decode stats from '{kpi}' to integer for stat {e}" - ) + f"Failed to decode stats from '{kpi}' to integer" + ) from e if "stats" not in kpi_d: raise CategoryHomePageContentError(f"Stats not found in KPIs '{kpi}'") @@ -446,11 +449,11 @@ def _extract_stats_from_page(self, soup): for stat in stats: if "value" not in stat: raise CategoryHomePageContentError( - f"No value found in stat '{json.dump(stat)}'" + f"No value found in stat '{json.dumps(stat)}'" ) if "label" not in stat: raise CategoryHomePageContentError( - f"No label found in stat '{json.dump(stat)}'" + f"No label found in stat '{json.dumps(stat)}'" ) return stats @@ -460,7 +463,7 @@ def _extract_details_from_single_stat(self, fs): p = fs.select(stat_text_css_selector) if len(p) == 0: raise CategoryHomePageContentError( - "No text found in stat with selector " f"'{stat_text_css_selector}'" + f"No text found in stat with selector '{stat_text_css_selector}'" ) if len(p) > 1: raise CategoryHomePageContentError( @@ -470,14 +473,14 @@ def _extract_details_from_single_stat(self, fs): stat_text = p[0].text if len(stat_text) == 0: raise CategoryHomePageContentError( - "Empty text found in stat with selector " f"'{stat_text_css_selector}'" + f"Empty text found in stat with selector '{stat_text_css_selector}'" ) stat_number_css_selector = "chakra-stat__number" p = fs.select(stat_number_css_selector) if len(p) == 0: raise CategoryHomePageContentError( - "No number found in stat with selector " f"'{stat_number_css_selector}'" + f"No number found in stat with selector '{stat_number_css_selector}'" ) if len(p) > 1: raise CategoryHomePageContentError( @@ -503,19 +506,23 @@ def _extract_details_from_single_stat(self, fs): except ValueError: raise CategoryHomePageContentError( f"Failed to convert text '{stat_number}' to integer for stat" - ) + ) from None def get_online_metadata(self): """metadata from online website, looking at homepage source code""" logger.info("Fetching website metadata") - soup, _ = get_soup("/") + soup, _ = self.scraper.utils.get_soup("/") return { - "title": soup.find("title").string, - "description": soup.find("meta", attrs={"name": "description"}).attrs.get( + "title": soup.find( + "title" + ).string, # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] + "description": soup.find( + "meta", attrs={"name": "description"} + ).attrs.get( # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] "content" ), "stats": self._extract_stats_from_page(soup), - "current_year": datetime.date.today().year, + "current_year": datetime.datetime.now(tz=datetime.timezone.utc).year, } diff --git a/src/ifixit2zim/scraper_info.py b/src/ifixit2zim/scraper_info.py index 5918497..5254d7f 100644 --- a/src/ifixit2zim/scraper_info.py +++ b/src/ifixit2zim/scraper_info.py @@ -1,18 +1,18 @@ -import urllib +import urllib.parse from ifixit2zim.constants import UNAVAILABLE_OFFLINE_INFOS -from ifixit2zim.exceptions import UnexpectedDataKindException +from ifixit2zim.exceptions import UnexpectedDataKindExceptionError +from ifixit2zim.scraper import IFixit2Zim from ifixit2zim.scraper_generic import ScraperGeneric -from ifixit2zim.shared import Global, logger -from ifixit2zim.utils import get_api_content +from ifixit2zim.shared import logger class ScraperInfo(ScraperGeneric): - def __init__(self): - super().__init__() + def __init__(self, scraper: IFixit2Zim): + super().__init__(scraper) def setup(self): - self.info_template = Global.env.get_template("info.html") + self.info_template = self.env.get_template("info.html") def get_items_name(self): return "info" @@ -27,16 +27,19 @@ def _add_info_to_scrape(self, info_key, info_title, is_expected): ) def _get_info_key_from_title(self, info_title): - return Global.convert_title_to_filename(info_title.lower()) + return self.processor.convert_title_to_filename(info_title.lower()) def _build_info_path(self, info_title): - href = Global.conf.main_url.geturl() + f"/Info/{info_title.replace('/', ' ')}" - final_href = Global.normalize_href(href) + href = ( + self.configuration.main_url.geturl() + + f"/Info/{info_title.replace('/', ' ')}" + ) + final_href = self.processor.normalize_href(href) return final_href[1:] def get_info_link_from_obj(self, info): if "title" not in info or not info["title"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( f"Impossible to extract info title from {info}" ) info_title = info["title"] @@ -44,14 +47,14 @@ def get_info_link_from_obj(self, info): def get_info_link_from_props(self, info_title): info_path = urllib.parse.quote(self._build_info_path(info_title)) - if Global.conf.no_info: + if self.configuration.no_info: return f"home/not_scrapped?url={info_path}" if info_title in UNAVAILABLE_OFFLINE_INFOS: return f"home/unavailable_offline?url={info_path}" info_key = self._get_info_key_from_title(info_title) - if Global.conf.infos: + if self.configuration.infos: is_not_included = True - for other_info in Global.conf.infos: + for other_info in self.configuration.infos: other_info_key = self._get_info_key_from_title(other_info) if other_info_key == info_key: is_not_included = False @@ -61,12 +64,12 @@ def get_info_link_from_props(self, info_title): return info_path def build_expected_items(self): - if Global.conf.no_info: + if self.configuration.no_info: logger.info("No info required") return - if Global.conf.infos: + if self.configuration.infos: logger.info("Adding required infos as expected") - for info_title in Global.conf.infos: + for info_title in self.configuration.infos: info_key = self._get_info_key_from_title(info_title) self._add_info_to_scrape(info_key, info_title, True) return @@ -74,15 +77,17 @@ def build_expected_items(self): limit = 200 offset = 0 while True: - info_wikis = get_api_content("/wikis/INFO", limit=limit, offset=offset) - if len(info_wikis) == 0: + info_wikis = self.scraper.utils.get_api_content( + "/wikis/INFO", limit=limit, offset=offset + ) + if not info_wikis or len(info_wikis) == 0: break for info_wiki in info_wikis: info_title = info_wiki["title"] info_key = self._get_info_key_from_title(info_title) self._add_info_to_scrape(info_key, info_title, True) offset += limit - if Global.conf.scrape_only_first_items: + if self.configuration.scrape_only_first_items: logger.warning( "Aborting the retrieval of all infos since only first items" " will be scraped anyway" @@ -90,29 +95,31 @@ def build_expected_items(self): break logger.info(f"{len(self.expected_items_keys)} info found") - def get_one_item_content(self, item_key, item_data): + def get_one_item_content(self, item_key, item_data): # noqa ARG002 info_wiki_title = item_key - info_wiki_content = get_api_content(f"/wikis/INFO/{info_wiki_title}") + info_wiki_content = self.scraper.utils.get_api_content( + f"/wikis/INFO/{info_wiki_title}" + ) return info_wiki_content - def add_item_redirect(self, item_key, item_data, redirect_kind): + def add_item_redirect(self, item_key, item_data, redirect_kind): # noqa ARG002 path = self._build_info_path(item_data["info_title"]) - Global.add_redirect( + self.processor.add_redirect( path=path, target_path=f"home/{redirect_kind}?{urllib.parse.urlencode({'url':path})}", ) - def process_one_item(self, item_key, item_data, item_content): + def process_one_item(self, item_key, item_data, item_content): # noqa ARG002 info_wiki_content = item_content info_wiki_rendered = self.info_template.render( info_wiki=info_wiki_content, # label=INFO_WIKI_LABELS[self.conf.lang_code], - metadata=Global.metadata, - lang=Global.conf.lang_code, + metadata=self.metadata, + lang=self.configuration.lang_code, ) - Global.add_html_item( + self.processor.add_html_item( path=self._build_info_path(info_wiki_content["title"]), title=info_wiki_content["display_title"], content=info_wiki_rendered, diff --git a/src/ifixit2zim/scraper_user.py b/src/ifixit2zim/scraper_user.py index bab7b28..faff008 100644 --- a/src/ifixit2zim/scraper_user.py +++ b/src/ifixit2zim/scraper_user.py @@ -1,19 +1,19 @@ -import urllib +import urllib.parse from ifixit2zim.constants import UNKNOWN_TITLE, USER_LABELS -from ifixit2zim.exceptions import UnexpectedDataKindException +from ifixit2zim.exceptions import UnexpectedDataKindExceptionError +from ifixit2zim.scraper import IFixit2Zim from ifixit2zim.scraper_generic import ScraperGeneric -from ifixit2zim.shared import Global, logger -from ifixit2zim.utils import get_api_content +from ifixit2zim.shared import logger class ScraperUser(ScraperGeneric): - def __init__(self): - super().__init__() + def __init__(self, scraper: IFixit2Zim): + super().__init__(scraper) self.user_id_to_titles = {} def setup(self): - self.user_template = Global.env.get_template("user.html") + self.user_template = self.env.get_template("user.html") def get_items_name(self): return "user" @@ -26,7 +26,7 @@ def _add_user_to_scrape(self, userid, usertitle, is_expected): "usertitle": usertitle, }, is_expected, - False, + warn_unexpected=False, ) if userid in self.user_id_to_titles: self.user_id_to_titles[userid].append(usertitle) @@ -35,15 +35,15 @@ def _add_user_to_scrape(self, userid, usertitle, is_expected): def _build_user_path(self, userid, usertitle): href = ( - Global.conf.main_url.geturl() + self.configuration.main_url.geturl() + f"/User/{userid}/{usertitle.replace('/', ' ')}" ) - final_href = Global.normalize_href(href) + final_href = self.processor.normalize_href(href) return final_href[1:] def get_user_link_from_obj(self, user): if "userid" not in user or not user["userid"]: - raise UnexpectedDataKindException( + raise UnexpectedDataKindExceptionError( f"Impossible to extract user id from {user}" ) userid = user["userid"] @@ -62,20 +62,20 @@ def get_user_link_from_props(self, userid, usertitle): user_path = urllib.parse.quote( self._build_user_path(userid=userid, usertitle=usertitle) ) - if Global.conf.no_user: + if self.configuration.no_user: return f"home/not_scrapped?url={user_path}" - if Global.conf.users and str(userid) not in Global.conf.users: + if self.configuration.users and str(userid) not in self.configuration.users: return f"home/not_scrapped?url={user_path}" self._add_user_to_scrape(userid, usertitle, False) return user_path def build_expected_items(self): - if Global.conf.no_user: + if self.configuration.no_user: logger.info("No user required") return - if Global.conf.users: + if self.configuration.users: logger.info("Adding required users as expected") - for userid in Global.conf.users: + for userid in self.configuration.users: self._add_user_to_scrape(userid, UNKNOWN_TITLE, True) return # WE DO NOT BUILD A LIST OF EXPECTED USERS, THE LIST IS WAY TOO BIG WITH LOTS @@ -93,41 +93,41 @@ def build_expected_items(self): # offset += limit # logger.info("{} user found".format(len(self.expected_items_keys))) - def get_one_item_content(self, item_key, item_data): + def get_one_item_content(self, item_key, _): # ARG002 userid = item_key - user_content = get_api_content(f"/users/{userid}") + user_content = self.utils.get_api_content(f"/users/{userid}") # other content is available in other endpoints, but not retrieved for now # (badges: not easy to process ; guides: does not seems to work properly) return user_content - def add_item_redirect(self, item_key, item_data, redirect_kind): + def add_item_redirect(self, _, item_data, redirect_kind): userid = item_data["userid"] usertitle = item_data["usertitle"] if usertitle == UNKNOWN_TITLE: logger.warning(f"Cannot add redirect for user {userid} in error") return path = self._build_user_path(userid, usertitle) - Global.add_redirect( + self.processor.add_redirect( path=path, target_path=f"home/{redirect_kind}?{urllib.parse.urlencode({'url':path})}", ) - def process_one_item(self, item_key, item_data, item_content): + def process_one_item(self, _, item_data, item_content): userid = item_data["userid"] usertitle = item_data["usertitle"] user_content = item_content user_rendered = self.user_template.render( user=user_content, - label=USER_LABELS[Global.conf.lang_code], - metadata=Global.metadata, + label=USER_LABELS[self.configuration.lang_code], + metadata=self.metadata, ) normal_path = self._build_user_path( userid=user_content["userid"], usertitle=user_content["username"], ) - Global.add_html_item( + self.processor.add_html_item( path=normal_path, title=user_content["username"], content=user_rendered, @@ -146,7 +146,7 @@ def process_one_item(self, item_key, item_data, item_content): "Adding user redirect for alternate user path from " f"{alternate_path} to {normal_path}" ) - Global.add_redirect( + self.processor.add_redirect( path=alternate_path, target_path=normal_path, ) diff --git a/src/ifixit2zim/shared.py b/src/ifixit2zim/shared.py index c5f25df..2752b8f 100644 --- a/src/ifixit2zim/shared.py +++ b/src/ifixit2zim/shared.py @@ -1,37 +1,27 @@ -#!/usr/bin/env python -# vim: ai ts=4 sts=4 et sw=4 nu -# pylint: disable=cyclic-import - import locale import logging -import re import threading -import urllib from contextlib import contextmanager -from datetime import date, datetime -import requests -from jinja2 import Environment, FileSystemLoader, select_autoescape from zimscraperlib.logging import getLogger as lib_getLogger -from zimscraperlib.zim.creator import Creator -from ifixit2zim.constants import ( - DEFAULT_DEVICE_IMAGE_URL, - DEFAULT_GUIDE_IMAGE_URL, - DEFAULT_HOMEPAGE, - DEFAULT_USER_IMAGE_URLS, - DEFAULT_WIKI_IMAGE_URL, +from ifixit2zim.constants import NAME + +logger = lib_getLogger( NAME, - NOT_YET_AVAILABLE, - ROOT_DIR, - UNAVAILABLE_OFFLINE, + level=logging.INFO, + log_format="[%(threadName)s::%(asctime)s] %(levelname)s:%(message)s", ) -LOCALE_LOCK = threading.Lock() +def set_debug(value): + level = logging.DEBUG if value else logging.INFO + logger.setLevel(level) + for handler in logger.handlers: + handler.setLevel(level) -class ImageUrlNotFound(Exception): - pass + +LOCALE_LOCK = threading.Lock() @contextmanager @@ -42,480 +32,3 @@ def setlocale(name): yield locale.setlocale(locale.LC_ALL, name) finally: locale.setlocale(locale.LC_ALL, saved) - - -class Global: - """Shared context accross all scraper components""" - - debug = False - logger = lib_getLogger( - NAME, - level=logging.INFO, - log_format="[%(threadName)s::%(asctime)s] %(levelname)s:%(message)s", - ) - conf = None - - metadata = {} - - creator = None - imager = None - env = None - lock = threading.Lock() - - null_categories = set() - ifixit_external_content = set() - final_hrefs = {} - - @staticmethod - def set_debug(value): - Global.debug = value - level = logging.DEBUG if value else logging.INFO - Global.logger.setLevel(level) - for handler in Global.logger.handlers: - handler.setLevel(level) - - @staticmethod - def setup(): - # order matters are there are references between them - - # images handled on a different queue. - # mostly network I/O to retrieve and/or upload image. - # if not in S3 bucket, convert/optimize webp image - # svg images, stored but not optimized - from ifixit2zim.executor import Executor - - Global.img_executor = Executor( - queue_size=100, - nb_workers=50, - prefix="IMG-T-", - ) - - from ifixit2zim.imager import Imager - - Global.imager = Imager() - - Global.creator = Creator( - filename=Global.conf.output_dir.joinpath(Global.conf.fname), - main_path=DEFAULT_HOMEPAGE, - favicon_path="illustration", - language=Global.conf.language["iso-639-3"], - workaround_nocancel=False, - title=Global.conf.title, - description=Global.conf.description, - creator=Global.conf.author, - publisher=Global.conf.publisher, - name=Global.conf.name, - tags=";".join(Global.conf.tags), - date=date.today(), - ).config_verbose(True) - - # jinja2 environment setup - Global.env = Environment( - loader=FileSystemLoader(ROOT_DIR.joinpath("templates")), - autoescape=select_autoescape(), - ) - Global.env.globals["raise"] = Global._raise_helper - Global.env.globals["str"] = lambda x: str(x) - Global.env.filters["guides_in_progress"] = Global.guides_in_progress - Global.env.filters["category_count_parts"] = Global.category_count_parts - Global.env.filters["category_count_tools"] = Global.category_count_tools - Global.env.filters["get_image_path"] = Global.get_image_path - Global.env.filters["get_image_url"] = Global.get_image_url - Global.env.filters["cleanup_rendered_content"] = Global.cleanup_rendered_content - Global.env.filters[ - "get_timestamp_day_rendered" - ] = Global.get_timestamp_day_rendered - Global.env.filters["get_item_comments_count"] = Global.get_item_comments_count - Global.env.filters[ - "get_guide_total_comments_count" - ] = Global.get_guide_total_comments_count - Global.env.filters["get_user_display_name"] = Global.get_user_display_name - - @staticmethod - def _raise_helper(msg): - raise Exception(msg) - - @staticmethod - def guides_in_progress(guides, in_progress=True): - if in_progress: - return [guide for guide in guides if "GUIDE_IN_PROGRESS" in guide["flags"]] - return [guide for guide in guides if "GUIDE_IN_PROGRESS" not in guide["flags"]] - - @staticmethod - def category_count_parts(category): - if "parts" not in category: - return 0 - if "total" not in category["parts"]: - return 0 - return category["parts"]["total"] - - @staticmethod - def category_count_tools(category): - if "tools" not in category: - return 0 - return len(category["tools"]) - - @staticmethod - def get_image_path(image_url): - return Global.imager.defer(url=image_url) - - @staticmethod - def _get_image_url_search(obj, for_guide, for_device, for_wiki, for_user): - if "standard" in obj: - return obj["standard"] - if "medium" in obj: - return obj["medium"] - if "large" in obj: - return obj["large"] - if "original" in obj: - return obj["original"] - if for_guide: - return DEFAULT_GUIDE_IMAGE_URL - if for_device: - return DEFAULT_DEVICE_IMAGE_URL - if for_wiki: - return DEFAULT_WIKI_IMAGE_URL - if for_user and "userid" in obj: - idx = obj["userid"] % len(DEFAULT_USER_IMAGE_URLS) - return DEFAULT_USER_IMAGE_URLS[idx] - raise ImageUrlNotFound(f"Unable to find image URL in object {obj}") - - @staticmethod - def get_image_url( - obj, for_guide=False, for_device=False, for_wiki=False, for_user=False - ): - if "image" in obj and obj["image"]: - return Global._get_image_url_search( - obj["image"], for_guide, for_device, for_wiki, for_user - ) - return Global._get_image_url_search( - obj, for_guide, for_device, for_wiki, for_user - ) - - guide_regex_full = re.compile( - r"href=\"https://\w*\.ifixit\.\w*/Guide/.*/(?P\d*)\"" - ) - guide_regex_rel = re.compile(r"href=\"/Guide/.*/(?P\d*).*?\"") - - gbl_image_regex = r".*?)src\s*=\s*\"(?P.*?)\"" - gbl_href_regex = r"href\s*=\s*\"(?P.*?)\"" - gbl_youtube_regex = ( - r"(?!.*.+?)src=[\\\"']+(?P.+?)\"(?P.+?)" - ) - gbl_bgd_image_regex = ( - r"background-image:url\((?P"|\"|')" - r"(?P.*?)(?P"|\"|')\)" - ) - gbl_video_regex = r".*)" - gbl_iframe_regex = r".*?)\".*?" - gbl_regex = re.compile( - f"{gbl_image_regex}|{gbl_href_regex}|{gbl_youtube_regex}|{gbl_bgd_image_regex}" - f"|{gbl_video_regex}|{gbl_iframe_regex}" - ) - - href_anchor_regex = r"^(?P#.*)$" - href_object_kind_regex = ( - r"^(?:https*://[\w\.]*(?:ifixit)[\w\.]*)*/" - r"((?:(?P" - + "|".join(NOT_YET_AVAILABLE + UNAVAILABLE_OFFLINE) - + r")(?:/.+)?)" - r"|(?:(?PGuide|Anleitung|Guía|Guida|Tutoriel|Teardown)/" - r"(?P.+)/(?P\d+)(?P#.*)?.*)" - r"|(?:(?PDevice|Topic)/(?P[\w%_\.-]+)" - r"(?P#.*)?.*)" - r"|(?PUser)/(?P\d*)/(?P[\w%_\.+'-]+)" - r"(?P#.*)?.*" - r"|(?:(?PInfo)/(?P[\w%_\.-]+)(?P#.*)?.*))$" - ) - href_regex = re.compile( - f"{href_anchor_regex}|{href_object_kind_regex}", flags=re.IGNORECASE - ) - - @staticmethod - def _process_external_url(url, rel_prefix): - if "ifixit" in url: - Global.ifixit_external_content.add(url) - return f"{rel_prefix}home/external_content?url={urllib.parse.quote(url)}" - - @staticmethod - def _process_unrecognized_href(url, rel_prefix): - return Global._process_external_url(url, rel_prefix) - - def _process_href_regex_dynamics(href, rel_prefix): - if "Guide/login/register" in href or "Guide/new" in href: - return ( - f"{rel_prefix}home/unavailable_offline" - f"?url={urllib.parse.quote(href)}" - ) - return None - - def _process_href_regex_nomatch(href, rel_prefix, match): - if match: - return None - return Global._process_unrecognized_href(href, rel_prefix) - - def _process_href_regex_anchor(href, rel_prefix, match): - if not match.group("anchor"): - return None - return f"{match.group('anchor')}" - - def _process_href_regex_guide(href, rel_prefix, match): - if not match.group("guide"): - return None - link = Global.get_guide_link_from_props( - guideid=match.group("guideid"), - guidetitle=urllib.parse.unquote_plus(match.group("guidetitle")), - ) - return f"{rel_prefix}{link}{match.group('guideafter') or ''}" - - def _process_href_regex_device(href, rel_prefix, match): - if not match.group("device"): - return None - link = Global.get_category_link_from_props( - category_title=urllib.parse.unquote_plus(match.group("devicetitle")) - ) - return f"{rel_prefix}{link}{match.group('deviceafter') or ''}" - - def _process_href_regex_info(href, rel_prefix, match): - if not match.group("info"): - return None - link = Global.get_info_link_from_props( - info_title=urllib.parse.unquote_plus(match.group("infotitle")) - ) - return f"{rel_prefix}{link}{match.group('infoafter') or ''}" - - def _process_href_regex_user(href, rel_prefix, match): - if not match.group("user"): - return None - link = Global.get_user_link_from_props( - userid=match.group("userid"), - usertitle=urllib.parse.unquote_plus(match.group("usertitle")), - ) - return f"{rel_prefix}{link}{match.group('userafter') or ''}" - - def _process_href_regex_kind(href, rel_prefix, match): - if not match.group("kind"): - return None - if match.group("kind").lower() in NOT_YET_AVAILABLE: - return f"{rel_prefix}home/not_yet_available?url={urllib.parse.quote(href)}" - if match.group("kind").lower() in UNAVAILABLE_OFFLINE: - return ( - f"{rel_prefix}home/unavailable_offline" - f"?url={urllib.parse.quote(href)}" - ) - raise Exception( - f"Unsupported kind '{match.group('kind')}' in _process_href_regex" - ) - - @staticmethod - def normalize_href(href): - if href in Global.final_hrefs: - return Global.final_hrefs[href] - try: - logger.debug(f"Normalizing href {href}") - # final_href = requests.head(href).headers.get("Location") - # if final_href is None: - # logger.debug(f"Failed to HEAD {href}, falling back to GET") - final_href = requests.get(href, stream=True).url - # parse final href and remove scheme + netloc + slash - parsed_final_href = urllib.parse.urlparse(final_href) - parsed_href = urllib.parse.urlparse(href) - chars_to_remove = len(parsed_final_href.scheme + "://") - - # remove domain if redirect is on same domain (almost always) - if parsed_final_href.netloc == parsed_href.netloc: - chars_to_remove += len(parsed_final_href.netloc) - - final_href = final_href[chars_to_remove:] - final_href = urllib.parse.unquote(final_href) - except Exception: - # this is quite expected for some missing items ; this will be taken care - # of at retrieval, no way to do something better - final_href = href - Global.final_hrefs[href] = final_href - logger.debug(f"Result is {final_href}") - return final_href - - @staticmethod - def _process_href_regex(href, rel_prefix): - if href.startswith("/"): - href = Global.conf.main_url.geturl() + href - if href.startswith("http") and "ifixit.com/" in href: - href = Global.normalize_href(href) - href = urllib.parse.quote(href) - match = Global.href_regex.search(href) - res = ( - Global._process_href_regex_dynamics(href, rel_prefix) - or Global._process_href_regex_nomatch(href, rel_prefix, match) - or Global._process_href_regex_anchor(href, rel_prefix, match) - or Global._process_href_regex_guide(href, rel_prefix, match) - or Global._process_href_regex_device(href, rel_prefix, match) - or Global._process_href_regex_info(href, rel_prefix, match) - or Global._process_href_regex_user(href, rel_prefix, match) - or Global._process_href_regex_kind(href, rel_prefix, match) - ) - if res is None: - raise Exception("Unsupported match in _process_href_regex") - return res - - @staticmethod - def _process_youtube(match, rel_prefix): - return ( - f'" - f"" - ) - - @staticmethod - def _process_bgdimgurl(match, rel_prefix): - return ( - f"background-image:url({match.group('quote1')}{rel_prefix}" - f"{Global.get_image_path(match.group('bgdimgurl'))}" - f"{match.group('quote2')})" - ) - - @staticmethod - def _process_video(match, rel_prefix): - return "

Video not scrapped

" - - @staticmethod - def _process_iframe(match, rel_prefix): - return ( - f'External content' - ) - - @staticmethod - def _process_gbl_regex(match, rel_prefix): - if match.group("image_url"): - return ( - f" 0: - return user["username"] - if user["unique_username"] and len(user["unique_username"]) > 0: - return f"@{user['unique_username']}" - return "Anonymous" - - -class GlobalMixin: - @property - def conf(self): - return Global.conf - - @property - def metadata(self): - return Global.metadata - - @property - def creator(self): - return Global.creator - - @property - def lock(self): - return Global.lock - - @property - def imager(self): - return Global.imager - - @property - def executor(self): - return Global.executor - - @property - def env(self): - return Global.env - - @property - def info_wiki_template(self): - return Global.info_wiki_template - - @property - def ifixit_external_content(self): - return Global.ifixit_external_content - - -logger = Global.logger diff --git a/src/ifixit2zim/utils.py b/src/ifixit2zim/utils.py index 3eeb23a..905ea5d 100644 --- a/src/ifixit2zim/utils.py +++ b/src/ifixit2zim/utils.py @@ -1,11 +1,9 @@ -#!/usr/bin/env python -# vim: ai ts=4 sts=4 et sw=4 nu - import io import re import urllib.parse import zlib -from typing import Union +from http import HTTPStatus +from typing import List, Optional, Tuple, Union import backoff import bs4 @@ -14,143 +12,11 @@ from pif import get_public_ip from zimscraperlib.download import _get_retry_adapter, stream_file -from ifixit2zim.constants import API_PREFIX -from ifixit2zim.shared import Global, logger - - -def to_path(url: str) -> str: - """Path-part of an URL, without leading slash""" - return re.sub(r"^/", "", urllib.parse.urlparse(url).path) - - -def get_url(path: str, **params) -> str: - """url-encoded in-source website url for a path""" - params_str = f"?{urllib.parse.urlencode(params)}" if params else "" - return f"{Global.conf.main_url.geturl()}{urllib.parse.quote(path)}{params_str}" - - -def get_url_raw(path: str): - """in-source website url for a path, untainted""" - return f"{Global.conf.main_url.geturl()}{path}" - - -def to_url(value: str) -> str: - """resolved potentially relative url from in-source link""" - return value if value.startswith("http") else get_url_raw(value) - - -def to_rel(url: str) -> Union[None, str]: - """path from URL if on our main domain, else None""" - uri = urllib.parse.urlparse(url) - if uri.netloc != Global.conf.domain: - return None - return uri.path - - -def no_leading_slash(text: str) -> str: - """text with leading slash removed if present""" - return re.sub(r"^/", "", text) - - -def no_trailing_slash(text: str) -> str: - """text with trailing slash removed if present""" - return re.sub(r"/$", "", text) - +from ifixit2zim.constants import API_PREFIX, Configuration +from ifixit2zim.shared import logger -def only_path_of(url: str): - """normalized path part of an url""" - return normalize_ident(urllib.parse.urlparse(url).path) - -def fetch(path: str, **params) -> str: - """(source text, actual_paths) of a path from source website - - actual_paths is amn ordered list of paths that were traversed to get to content. - Without redirection, it should be a single path, equal to request - Final, target path is always last""" - session = requests.Session() - session.mount("http", _get_retry_adapter(10)) # tied to http and https - resp = session.get(get_url(path, **params), params=params) - resp.raise_for_status() - - # we have params meaning we requested a page (?pg=xxx) - # assumption: this must be a category page (so on same domain) - # we thus need to use redirection target (which lost param) with params - if params and resp.history: - return fetch(only_path_of(resp.url), **params) - return resp.text, [ - no_leading_slash(only_path_of(r.url)) for r in [*resp.history, resp] - ] - - -def get_soup_of(text: str, unwrap: bool = False): - """an lxml soup of an HTML string""" - soup = bs4.BeautifulSoup(text, "lxml") - if unwrap: - for elem in ("body", "html"): - getattr(soup, elem).unwrap() - return soup - - -def get_soup(path: str, **params) -> bs4.BeautifulSoup: - """an lxml soup of a path on source website""" - content, paths = fetch(path, **params) - return get_soup_of(content), paths - - -def get_digest(url: str) -> str: - """simple digest of an url for mapping purpose""" - return str(zlib.adler32(url.encode("UTF-8"))) - - -def normalize_ident(ident: str) -> str: - """URL-decoded category identifier""" - return urllib.parse.unquote(ident) - - -def get_version_ident_for(url: str) -> str: - """~version~ of the URL data to use for comparisons. Built from headers""" - try: - resp = requests.head(url) - headers = resp.headers - except Exception as exc: - logger.warning(f"Unable to HEAD {url}") - logger.exception(exc) - try: - _, headers = stream_file( - url=url, - byte_stream=io.BytesIO(), - block_size=1, - only_first_block=True, - ) - except Exception as exc2: - logger.warning(f"Unable to query image at {url}") - logger.exception(exc2) - return - - for header in ("ETag", "Last-Modified", "Content-Length"): - if headers.get(header): - return headers.get(header) - - return "-1" - - -def setup_s3_and_check_credentials(s3_url_with_credentials): - logger.info("testing S3 Optimization Cache credentials") - s3_storage = KiwixStorage(s3_url_with_credentials) - if not s3_storage.check_credentials( - list_buckets=True, bucket=True, write=True, read=True, failsafe=True - ): - logger.error("S3 cache connection error testing permissions.") - logger.error(f" Server: {s3_storage.url.netloc}") - logger.error(f" Bucket: {s3_storage.bucket_name}") - logger.error(f" Key ID: {s3_storage.params.get('keyid')}") - logger.error(f" Public IP: {get_public_ip()}") - raise ValueError("Unable to connect to Optimization Cache. Check its URL.") - return s3_storage - - -def backoff_hdlr(details): +def __backoff_hdlr(details): logger.warning( "Backing off {wait:0.1f} seconds after {tries} tries " "calling function {target} with args {args} and kwargs " @@ -158,15 +24,145 @@ def backoff_hdlr(details): ) -@backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - max_time=16, - on_backoff=backoff_hdlr, -) -def get_api_content(path, **params): - full_path = get_url(API_PREFIX + path, **params) - logger.debug(f"Retrieving {full_path}") - response = requests.get(full_path) - json_data = response.json() if response and response.status_code == 200 else None - return json_data +class Utils: + def __init__(self, configuration: Configuration) -> None: + self.configuration = configuration + + def to_path(self, url: str) -> str: + """Path-part of an URL, without leading slash""" + return re.sub(r"^/", "", urllib.parse.urlparse(url).path) + + def get_url(self, path: str, **params) -> str: + """url-encoded in-source website url for a path""" + params_str = f"?{urllib.parse.urlencode(params)}" if params else "" + return ( + f"{self.configuration.main_url.geturl()}" + f"{urllib.parse.quote(path)}" + f"{params_str}" + ) + + def get_url_raw(self, path: str): + """in-source website url for a path, untainted""" + return f"{self.configuration.main_url.geturl()}{path}" + + def to_url(self, value: str) -> str: + """resolved potentially relative url from in-source link""" + return value if value.startswith("http") else self.get_url_raw(value) + + def to_rel(self, url: str) -> Union[None, str]: + """path from URL if on our main domain, else None""" + uri = urllib.parse.urlparse(url) + if uri.netloc != self.configuration.domain: + return None + return uri.path + + def no_leading_slash(self, text: str) -> str: + """text with leading slash removed if present""" + return re.sub(r"^/", "", text) + + def no_trailing_slash(self, text: str) -> str: + """text with trailing slash removed if present""" + return re.sub(r"/$", "", text) + + def only_path_of(self, url: str): + """normalized path part of an url""" + return self.normalize_ident(urllib.parse.urlparse(url).path) + + def fetch(self, path: str, **params) -> Tuple[str, List[str]]: + """(source text, actual_paths) of a path from source website + + actual_paths is amn ordered list of paths that were traversed to get to content. + Without redirection, it should be a single path, equal to request + Final, target path is always last""" + session = requests.Session() + session.mount("http", _get_retry_adapter(10)) # tied to http and https + resp = session.get(self.get_url(path, **params), params=params) + resp.raise_for_status() + + # we have params meaning we requested a page (?pg=xxx) + # assumption: this must be a category page (so on same domain) + # we thus need to use redirection target (which lost param) with params + if params and resp.history: + return self.fetch(self.only_path_of(resp.url), **params) + return resp.text, [ + self.no_leading_slash(self.only_path_of(r.url)) + for r in [*resp.history, resp] + ] + + def get_soup_of(self, text: str, *, unwrap: bool = False): + """an lxml soup of an HTML string""" + soup = bs4.BeautifulSoup(text, "lxml") + if unwrap: + for elem in ("body", "html"): + getattr(soup, elem).unwrap() + return soup + + def get_soup(self, path: str, **params) -> Tuple[bs4.BeautifulSoup, List[str]]: + """an lxml soup of a path on source website""" + content, paths = self.fetch(path, **params) + return self.get_soup_of(content), paths + + def get_digest(self, url: str) -> str: + """simple digest of an url for mapping purpose""" + return str(zlib.adler32(url.encode("UTF-8"))) + + def normalize_ident(self, ident: str) -> str: + """URL-decoded category identifier""" + return urllib.parse.unquote(ident) + + def get_version_ident_for(self, url: str) -> Optional[str]: + """~version~ of the URL data to use for comparisons. Built from headers""" + try: + resp = requests.head(url, timeout=10) + headers = resp.headers + except Exception as exc: + logger.warning(f"Unable to HEAD {url}") + logger.exception(exc) + try: + _, headers = stream_file( + url=url, + byte_stream=io.BytesIO(), + block_size=1, + only_first_block=True, + ) + except Exception as exc2: + logger.warning(f"Unable to query image at {url}") + logger.exception(exc2) + return + + for header in ("ETag", "Last-Modified", "Content-Length"): + if headers.get(header): + return headers.get(header) + + return "-1" + + def setup_s3_and_check_credentials(self, s3_url_with_credentials): + logger.info("testing S3 Optimization Cache credentials") + s3_storage = KiwixStorage(s3_url_with_credentials) + if not s3_storage.check_credentials( + list_buckets=True, bucket=True, write=True, read=True, failsafe=True + ): + logger.error("S3 cache connection error testing permissions.") + logger.error(f" Server: {s3_storage.url.netloc}") + logger.error(f" Bucket: {s3_storage.bucket_name}") + logger.error(f" Key ID: {s3_storage.params.get('keyid')}") + logger.error(f" Public IP: {get_public_ip()}") + raise ValueError("Unable to connect to Optimization Cache. Check its URL.") + return s3_storage + + @backoff.on_exception( + backoff.expo, + requests.exceptions.RequestException, + max_time=16, + on_backoff=__backoff_hdlr, + ) + def get_api_content(self, path, **params): + full_path = self.get_url(API_PREFIX + path, **params) + logger.debug(f"Retrieving {full_path}") + response = requests.get(full_path, timeout=10) + json_data = ( + response.json() + if response and response.status_code == HTTPStatus.OK + else None + ) + return json_data diff --git a/tasks.py b/tasks.py index 90854e8..a95c71a 100644 --- a/tasks.py +++ b/tasks.py @@ -92,7 +92,7 @@ def fix_black(ctx: Context, args: str = "."): def fix_ruff(ctx: Context, args: str = "."): """fix all ruff rules""" args = args or "." # needed for hatch script - ctx.run(f"ruff --fix {args}", pty=use_pty) + ctx.run(f"ruff check --fix {args}", pty=use_pty) @task( diff --git a/tests/test_basic.py b/tests/test_basic.py index b35cc95..0ede956 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,16 +1,5 @@ -# pyright: strict, reportUnusedExpression=false - -import pytest -from great_project import compute, entrypoint -from great_project.__about__ import __version__ +from ifixit2zim.__about__ import __version__ def test_version(): - assert "dev" in __version__ - - -def test_compute(): - assert compute(1, 2) == 3 - with pytest.raises(TypeError): - compute(1.0, 2) # pyright: ignore [reportGeneralTypeIssues] - assert entrypoint() is None + assert __version__