diff --git a/function/telegram/message.py b/function/telegram/message.py index efed719..433253f 100644 --- a/function/telegram/message.py +++ b/function/telegram/message.py @@ -1,7 +1,9 @@ from urllib.parse import quote_plus +from functools import cached_property from emoji import emojize from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto +from pyrogram.enums import ParseMode from function.settings import ( TELEGRAM_MAX_ALBUM_QUANTITY, @@ -17,11 +19,15 @@ class TelegramMessage: def __init__(self, message: Message): self.message = message - @property + @cached_property def _album_images(self) -> list[str]: - return self.message.images[1 : TELEGRAM_MAX_ALBUM_QUANTITY + 1][::-1] + return self.message.images[1 : TELEGRAM_MAX_ALBUM_QUANTITY + 1] - @property + @cached_property + def _album_caption(self) -> str: + return emojize(f":framed_picture: **{self.message.title}**") + + @cached_property def _emoji(self) -> str: if self.message.youtube: return emojize(":play_button:") @@ -29,26 +35,34 @@ def _emoji(self) -> str: return emojize(":camera_with_flash:") return emojize(":page_facing_up:") - @property + @cached_property def _domain(self) -> str: return get_domain(self.message.link) - @property + @cached_property def _link(self) -> str: return f"__[{self._domain}]({self.message.link})__" - @property + @cached_property def _title(self) -> str: return f"{self._emoji} **{self.message.title}**" - @property + @cached_property def _body(self) -> str: text = " ".join(self.message.content) + if len(text) > TELEGRAM_MAX_CONTENT_SIZE: return text[:TELEGRAM_MAX_CONTENT_SIZE] + " **[...]**" + + elif not text.strip(): + if self.message.videos: + return "Assista o vídeo no YouTube clicando no botão abaixo." + elif self.message.instagram: + return "Veja a publicação no Instagram clicando no botão abaixo." + return text - @property + @cached_property def _whatsapp_link_text(self) -> str: return join_lines( self._title.replace("**", "*"), @@ -65,25 +79,24 @@ def _whatsapp_link(self) -> str: def _button(self, text: str, link: str) -> list[InlineKeyboardButton]: return [InlineKeyboardButton(text=text, url=link)] - @property + @cached_property def chat_id(self) -> str: return self.message.destiny - @property + @cached_property def album(self) -> list[InputMediaPhoto]: - return [InputMediaPhoto(img) for img in self._album_images] - - @property - def image(self) -> str | None: - if self.message.images: - return self.message.images[0] - return None + return [ + InputMediaPhoto( + img, caption=self._album_caption, parse_mode=ParseMode.MARKDOWN + ) + for img in self._album_images + ] - @property + @cached_property def content(self) -> str: return join_lines(self._link, self._title, self._body) - @property + @cached_property def buttons(self) -> InlineKeyboardMarkup: keyboard = [ self._button( diff --git a/function/telegram/sender.py b/function/telegram/sender.py index 1a0c85c..f92d353 100644 --- a/function/telegram/sender.py +++ b/function/telegram/sender.py @@ -1,9 +1,10 @@ +from typing import Any, Callable, Coroutine, TypeVar from uuid import uuid4 from random import shuffle from pyrogram import Client, utils from pyrogram.enums import ParseMode -from pyrogram.errors import FloodWait, BadRequest +from pyrogram.errors import FloodWait from aws_lambda_powertools import Logger from function.settings import TELEGRAM_API_HASH, TELEGRAM_API_ID @@ -11,16 +12,14 @@ from function.telegram.message import TelegramMessage +T = TypeVar("T") + # Fixes PEER_ID_INVALID for channels # https://github.com/pyrogram/pyrogram/issues/1314#issuecomment-2187830732 utils.get_peer_type = get_peer_type_fixed class TelegramSender: - message_options = { - "parse_mode": ParseMode.MARKDOWN, - } - def client_options(self, token: str): return { "name": str(uuid4()), @@ -35,73 +34,57 @@ def __init__(self, tokens: list[str], logger: Logger): self.tokens = tokens shuffle(self.tokens) - async def client(self, token: str) -> Client: + async def _get_client(self, token: str) -> Client: client = Client(**self.client_options(token)) await client.start() return client - async def _send_message(self, message: TelegramMessage): - for index, token in enumerate(self.tokens): - try: - client = await self.client(token) - return await client.send_message( - chat_id=message.chat_id, - text=message.content, - reply_markup=message.buttons, - disable_web_page_preview=True, - **self.message_options, # type: ignore - ) - except FloodWait as exc: - self.logger.warning(str(exc)) - if (index + 1) < len(self.tokens): - continue - raise exc - - async def _send_image(self, message: TelegramMessage): + async def _token_rotation( + self, method: Callable[..., Coroutine[Any, Any, T]], *args, **kwargs + ) -> T: for index, token in enumerate(self.tokens): - client = await self.client(token) - try: - return await client.send_photo( - chat_id=message.chat_id, - photo=message.image, # type: ignore - caption=message.content, - reply_markup=message.buttons, - **self.message_options, # type: ignore - ) + telegram = await self._get_client(token) + return await method(telegram, *args, **kwargs) # type: ignore except FloodWait as exc: - self.logger.warning(str(exc)) + self.logger.error(str(exc)) if (index + 1) < len(self.tokens): continue raise exc - except BadRequest: - return await self._send_message(message) - - async def _send_album(self, message: TelegramMessage): - for index, token in enumerate(self.tokens): - try: - client = await self.client(token) - return await client.send_media_group( - chat_id=message.chat_id, - media=message.album, # type: ignore - disable_notification=True, - ) - except FloodWait as exc: - self.logger.warning(str(exc)) - if (index + 1) < len(self.tokens): - continue - raise exc + raise EnvironmentError("Was not possible to send Telegram message!") + + async def _send_message( + self, client: Client, message: TelegramMessage, reply_to: int | None = None + ): + await client.send_message( + chat_id=message.chat_id, + text=message.content, + parse_mode=ParseMode.MARKDOWN, + disable_web_page_preview=True, + reply_to_message_id=reply_to, # type: ignore + reply_markup=message.buttons, + ) + + async def _send_album(self, client: Client, message: TelegramMessage) -> int: + sent_messages = await client.send_media_group( + chat_id=message.chat_id, + media=message.album, # type: ignore + disable_notification=True, + ) + return sent_messages[0].id async def send(self, message: TelegramMessage): + reply_to = None + try: if message.album: - await self._send_album(message) - if message.image: - return await self._send_image(message) - else: - return await self._send_message(message) + reply_to = await self._token_rotation(self._send_album, message=message) + + await self._token_rotation( + self._send_message, message=message, reply_to=reply_to + ) except FloodWait as exc: raise exc