Skip to content

Commit

Permalink
Merge pull request #3003 from digitalfabrik/refactor_function_replace…
Browse files Browse the repository at this point in the history
…_links

Refactor `replace_links` by using `get_region_links`
  • Loading branch information
MizukiTemma authored Sep 4, 2024
2 parents 4b84b44 + fcbdec7 commit 835cdd9
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 131 deletions.
39 changes: 39 additions & 0 deletions integreat_cms/cms/models/abstract_content_translation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from copy import deepcopy
from html import escape
from typing import TYPE_CHECKING

Expand All @@ -11,6 +12,7 @@
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from linkcheck.listeners import disable_listeners
from lxml.html import rewrite_links

from ..utils.tinymce_icon_utils import get_icon_html, make_icon

Expand All @@ -23,8 +25,10 @@

from .abstract_content_model import AbstractContentModel
from .regions.region import Region
from .users.user import User

from ..constants import status, translation_status
from ..utils.link_utils import fix_content_link_encoding
from ..utils.round_hix_score import round_hix_score
from ..utils.translation_utils import gettext_many_lazy as __
from .abstract_base_model import AbstractBaseModel
Expand Down Expand Up @@ -586,6 +590,41 @@ def get_all_used_slugs(self) -> Iterable[str]:
"""
return self.all_versions.values_list("slug", flat=True)

def create_new_version_copy(
self, user: User | None = None
) -> AbstractContentTranslation:
"""
Create a new version by copying
"""
new_translation = deepcopy(self)
new_translation.pk = None
new_translation.version += 1
new_translation.minor_edit = True
new_translation.creator = user
logger.debug("Created new translation version %r", new_translation)

return new_translation

def replace_urls(
self,
urls_to_replace: dict[str, str],
user: User | None = None,
commit: bool = True,
) -> None:
"""
Function to replace links that are in the translation and match the given keyword `search`
"""
new_translation = self.create_new_version_copy(user)
logger.debug("Replacing links of %r: %r", new_translation, urls_to_replace)
new_translation.content = rewrite_links(
new_translation.content,
lambda content_url: urls_to_replace.get(content_url, content_url),
)
new_translation.content = fix_content_link_encoding(new_translation.content)
if new_translation.content != self.content and commit:
self.links.all().delete()
new_translation.save()

def __str__(self) -> str:
"""
This overwrites the default Django :meth:`~django.db.models.Model.__str__` method.
Expand Down
9 changes: 5 additions & 4 deletions integreat_cms/cms/utils/content_translation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import logging
import operator
from copy import deepcopy
from functools import reduce
from typing import TYPE_CHECKING

Expand All @@ -15,7 +14,6 @@

from .content_utils import clean_content
from .internal_link_utils import get_public_translation_for_link
from .linkcheck_utils import save_new_version

if TYPE_CHECKING:
from ..models import User
Expand Down Expand Up @@ -49,9 +47,12 @@ def update_links_to(
if new_content == outdated_content_translation.content:
continue

fixed_content_translation = deepcopy(outdated_content_translation)
fixed_content_translation = (
outdated_content_translation.create_new_version_copy(user)
)
fixed_content_translation.content = new_content
save_new_version(outdated_content_translation, fixed_content_translation, user)
outdated_content_translation.links.all().delete()
fixed_content_translation.save()

logger.debug(
"Updated links to %s in %r",
Expand Down
2 changes: 1 addition & 1 deletion integreat_cms/cms/utils/content_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from ..models import MediaFile
from ..utils import internal_link_utils
from ..utils.linkcheck_utils import fix_content_link_encoding
from ..utils.link_utils import fix_content_link_encoding

logger = logging.getLogger(__name__)

Expand Down
26 changes: 26 additions & 0 deletions integreat_cms/cms/utils/link_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

import re
from urllib.parse import ParseResult, unquote, urlparse


def fix_domain_encoding(url: re.Match[str]) -> str:
"""
Fix the encoding of punycode domains
:param url: The input url match
:return: The fixed url
"""
parsed_url: ParseResult = urlparse(url.group(1))
parsed_url = parsed_url._replace(netloc=unquote(parsed_url.netloc))
return parsed_url.geturl()


def fix_content_link_encoding(content: str) -> str:
"""
Fix the encoding of punycode domains in an html content string
:param content: The input content
:return: The fixed content
"""
return re.sub(r"(?<=[\"'])(https?://.+?)(?=[\"'])", fix_domain_encoding, content)
153 changes: 61 additions & 92 deletions integreat_cms/cms/utils/linkcheck_utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
from __future__ import annotations

import logging
import re
import time
from copy import deepcopy
from functools import partial
from typing import TYPE_CHECKING
from urllib.parse import ParseResult, unquote, urlparse
from collections import defaultdict
from typing import DefaultDict, TYPE_CHECKING

from django.conf import settings
from django.db.models import Prefetch, Q, QuerySet, Subquery
from linkcheck import update_lock
from linkcheck.listeners import tasks_queue
from linkcheck.models import Link, Url
from lxml.html import rewrite_links

from integreat_cms.cms.models import (
EventTranslation,
Expand All @@ -23,12 +19,11 @@
Region,
)

from ..models.abstract_content_translation import AbstractContentTranslation

if TYPE_CHECKING:
from typing import Any

from ..models import Region, User
from ..models import User
from ..models.abstract_content_translation import AbstractContentTranslation

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -190,40 +185,6 @@ def filter_urls(
return urls, count_dict


def replace_link_helper(old_url: str, new_url: str, link: str) -> str:
"""
A small helper function which can be passed to :meth:`lxml.html.HtmlMixin.rewrite_links`
:param old_url: The url which should be replaced
:param new_url: The url which should be inserted instead of the old url
:param link: The current link
:return: The replaced link
"""
return new_url if link == old_url else link


def save_new_version(
translation: AbstractContentTranslation,
new_translation: AbstractContentTranslation,
user: Any | None,
) -> None:
"""
Save a new translation version
:param translation: The old translation
:param new_translation: The new translation
:param user: The creator of the new version
"""
translation.links.all().delete()
new_translation.pk = None
new_translation.version += 1
new_translation.minor_edit = True
new_translation.creator = user
new_translation.save()
logger.debug("Created new translation version %r", new_translation)


# pylint: disable=too-many-locals
def replace_links(
search: str,
replace: str,
Expand All @@ -243,69 +204,77 @@ def replace_links(
:param commit: Whether changes should be written to the database
:param link_types: Which kind of links should be replaced
"""
region_msg = f' of "{region!r}"' if region else ""
user_msg = f' by "{user!r}"' if user else ""
logger.info(
"Replacing %r with %r in content links%s%s",
search,
replace,
region_msg,
user_msg,
log_replacement_is_starting(search, replace, region, user)
content_objects = find_target_url_per_translation(
search, replace, region, link_types
)
models = [PageTranslation, EventTranslation, POITranslation, ImprintPageTranslation]
with update_lock:
for model in models:
filters = {}
if region:
filters[f"{model.foreign_field()}__region"] = region

for translation in model.objects.filter(**filters).distinct(
model.foreign_field(), "language"
):
new_translation = deepcopy(translation)
for link in translation.links.select_related("url"):
url = link.url.url
should_replace = search in url and (
not link_types or link.url.type in link_types
)
if should_replace:
fixed_url = url.replace(search, replace)
new_translation.content = rewrite_links(
new_translation.content,
partial(replace_link_helper, url, fixed_url),
)
logger.debug(
"Replacing %r with %r in %r", url, fixed_url, translation
)
new_translation.content = fix_content_link_encoding(
new_translation.content
)
if new_translation.content != translation.content and commit:
save_new_version(translation, new_translation, user)
for translation, urls_to_replace in content_objects.items():
translation.replace_urls(urls_to_replace, user, commit)

# Wait until all post-save signals have been processed
logger.debug("Waiting for linkcheck listeners to update link database...")
time.sleep(0.1)
tasks_queue.join()
logger.info("Finished replacing %r with %r in content links", search, replace)


def fix_domain_encoding(url: re.Match[str]) -> str:
def find_target_url_per_translation(
search: str, replace: str, region: Region | None, link_types: list[str] | None
) -> dict[AbstractContentTranslation, dict[str, str]]:
"""
Fix the encoding of punycode domains
returns in which translation what URL must be replaced
:param url: The input url match
:return: The fixed url
:param search: The (partial) URL to search
:param replace: The (partial) URL to replace
:param region: Optionally limit the replacement to one region (``None`` means a global replacement)
:param link_types: Which kind of links should be replaced
:return: A dictionary of translations and list of before&after of ULRs
"""
parsed_url: ParseResult = urlparse(url.group(1))
parsed_url = parsed_url._replace(netloc=unquote(parsed_url.netloc))
return parsed_url.geturl()
# This function is used in replace_links, which is used in the management command, where region can be None, too.
# However get_region_links currently requires a valid region.
# Collect all the link objects in case no region is given.
links = (
(get_region_links(region) if region else Link.objects.all())
.filter(url__url__contains=search)
.select_related("url")
)

links_to_replace = (
[link for link in links if link.url.type in link_types] if link_types else links
)

content_objects: DefaultDict[AbstractContentTranslation, dict[str, str]] = (
defaultdict(dict)
)
for link in links_to_replace:
content_objects[link.content_object][link.url.url] = link.url.url.replace(
search, replace
)
return content_objects


def fix_content_link_encoding(content: str) -> str:
def log_replacement_is_starting(
search: str,
replace: str,
region: Region | None,
user: User | None,
) -> None:
"""
Fix the encoding of punycode domains in an html content string
function to log the current link replacement
:param content: The input content
:return: The fixed content
:param search: The (partial) URL to search
:param replace: The (partial) URL to replace
:param region: Optionally limit the replacement to one region (``None`` means a global replacement)
:param user: The creator of the replaced translations
"""
return re.sub(r"(?<=[\"'])(https?://.+?)(?=[\"'])", fix_domain_encoding, content)
region_msg = f' of "{region!r}"' if region else ""
user_msg = f' by "{user!r}"' if user else ""
logger.info(
"Replacing %r with %r in content links%s%s",
search,
replace,
region_msg,
user_msg,
)
Loading

0 comments on commit 835cdd9

Please sign in to comment.