Skip to content

Commit

Permalink
feat - sending always an album
Browse files Browse the repository at this point in the history
  • Loading branch information
jjpaulo2 committed Oct 4, 2024
1 parent 470ba2c commit 34fc9e5
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 75 deletions.
51 changes: 32 additions & 19 deletions function/telegram/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from urllib.parse import quote_plus
from functools import cached_property

from emoji import emojize
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto
from pyrogram.enums import ParseMode

from function.settings import (
TELEGRAM_MAX_ALBUM_QUANTITY,
Expand All @@ -17,38 +19,50 @@ class TelegramMessage:
def __init__(self, message: Message):
self.message = message

@property
@cached_property
def _album_images(self) -> list[str]:
return self.message.images[1 : TELEGRAM_MAX_ALBUM_QUANTITY + 1][::-1]
return self.message.images[1 : TELEGRAM_MAX_ALBUM_QUANTITY + 1]

@property
@cached_property
def _album_caption(self) -> str:
return emojize(f":framed_picture: **{self.message.title}**")

@cached_property
def _emoji(self) -> str:
if self.message.youtube:
return emojize(":play_button:")
if self.message.instagram:
return emojize(":camera_with_flash:")
return emojize(":page_facing_up:")

@property
@cached_property
def _domain(self) -> str:
return get_domain(self.message.link)

@property
@cached_property
def _link(self) -> str:
return f"__[{self._domain}]({self.message.link})__"

@property
@cached_property
def _title(self) -> str:
return f"{self._emoji} **{self.message.title}**"

@property
@cached_property
def _body(self) -> str:
text = " ".join(self.message.content)

if len(text) > TELEGRAM_MAX_CONTENT_SIZE:
return text[:TELEGRAM_MAX_CONTENT_SIZE] + " **[...]**"

elif not text.strip():
if self.message.videos:
return "Assista o vídeo no YouTube clicando no botão abaixo."
elif self.message.instagram:
return "Veja a publicação no Instagram clicando no botão abaixo."

return text

@property
@cached_property
def _whatsapp_link_text(self) -> str:
return join_lines(
self._title.replace("**", "*"),
Expand All @@ -65,25 +79,24 @@ def _whatsapp_link(self) -> str:
def _button(self, text: str, link: str) -> list[InlineKeyboardButton]:
return [InlineKeyboardButton(text=text, url=link)]

@property
@cached_property
def chat_id(self) -> str:
return self.message.destiny

@property
@cached_property
def album(self) -> list[InputMediaPhoto]:
return [InputMediaPhoto(img) for img in self._album_images]

@property
def image(self) -> str | None:
if self.message.images:
return self.message.images[0]
return None
return [
InputMediaPhoto(
img, caption=self._album_caption, parse_mode=ParseMode.MARKDOWN
)
for img in self._album_images
]

@property
@cached_property
def content(self) -> str:
return join_lines(self._link, self._title, self._body)

@property
@cached_property
def buttons(self) -> InlineKeyboardMarkup:
keyboard = [
self._button(
Expand Down
95 changes: 39 additions & 56 deletions function/telegram/sender.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
from typing import Any, Callable, Coroutine, TypeVar
from uuid import uuid4
from random import shuffle

from pyrogram import Client, utils
from pyrogram.enums import ParseMode
from pyrogram.errors import FloodWait, BadRequest
from pyrogram.errors import FloodWait
from aws_lambda_powertools import Logger

from function.settings import TELEGRAM_API_HASH, TELEGRAM_API_ID
from function.utils import get_peer_type_fixed
from function.telegram.message import TelegramMessage


T = TypeVar("T")

# Fixes PEER_ID_INVALID for channels
# https://github.com/pyrogram/pyrogram/issues/1314#issuecomment-2187830732
utils.get_peer_type = get_peer_type_fixed


class TelegramSender:
message_options = {
"parse_mode": ParseMode.MARKDOWN,
}

def client_options(self, token: str):
return {
"name": str(uuid4()),
Expand All @@ -35,73 +34,57 @@ def __init__(self, tokens: list[str], logger: Logger):
self.tokens = tokens
shuffle(self.tokens)

async def client(self, token: str) -> Client:
async def _get_client(self, token: str) -> Client:
client = Client(**self.client_options(token))
await client.start()
return client

async def _send_message(self, message: TelegramMessage):
for index, token in enumerate(self.tokens):
try:
client = await self.client(token)
return await client.send_message(
chat_id=message.chat_id,
text=message.content,
reply_markup=message.buttons,
disable_web_page_preview=True,
**self.message_options, # type: ignore
)
except FloodWait as exc:
self.logger.warning(str(exc))
if (index + 1) < len(self.tokens):
continue
raise exc

async def _send_image(self, message: TelegramMessage):
async def _token_rotation(
self, method: Callable[..., Coroutine[Any, Any, T]], *args, **kwargs
) -> T:
for index, token in enumerate(self.tokens):
client = await self.client(token)

try:
return await client.send_photo(
chat_id=message.chat_id,
photo=message.image, # type: ignore
caption=message.content,
reply_markup=message.buttons,
**self.message_options, # type: ignore
)
telegram = await self._get_client(token)
return await method(telegram, *args, **kwargs) # type: ignore

except FloodWait as exc:
self.logger.warning(str(exc))
self.logger.error(str(exc))
if (index + 1) < len(self.tokens):
continue
raise exc

except BadRequest:
return await self._send_message(message)

async def _send_album(self, message: TelegramMessage):
for index, token in enumerate(self.tokens):
try:
client = await self.client(token)
return await client.send_media_group(
chat_id=message.chat_id,
media=message.album, # type: ignore
disable_notification=True,
)
except FloodWait as exc:
self.logger.warning(str(exc))
if (index + 1) < len(self.tokens):
continue
raise exc
raise EnvironmentError("Was not possible to send Telegram message!")

async def _send_message(
self, client: Client, message: TelegramMessage, reply_to: int | None = None
):
await client.send_message(
chat_id=message.chat_id,
text=message.content,
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True,
reply_to_message_id=reply_to, # type: ignore
reply_markup=message.buttons,
)

async def _send_album(self, client: Client, message: TelegramMessage) -> int:
sent_messages = await client.send_media_group(
chat_id=message.chat_id,
media=message.album, # type: ignore
disable_notification=True,
)
return sent_messages[0].id

async def send(self, message: TelegramMessage):
reply_to = None

try:
if message.album:
await self._send_album(message)
if message.image:
return await self._send_image(message)
else:
return await self._send_message(message)
reply_to = await self._token_rotation(self._send_album, message=message)

await self._token_rotation(
self._send_message, message=message, reply_to=reply_to
)

except FloodWait as exc:
raise exc
Expand Down

0 comments on commit 34fc9e5

Please sign in to comment.