diff --git a/src/command/administration.py b/src/command/administration.py
index 7439bea6d4..98b4b92f8f 100644
--- a/src/command/administration.py
+++ b/src/command/administration.py
@@ -15,13 +15,12 @@
# along with this program. If not, see .
from __future__ import annotations
-from typing import Union, Optional
+from typing import Optional
from typing_extensions import Final
import asyncio
import re
-from telethon import events, Button
-from telethon.tl.patched import Message
+from telethon import Button
from telethon.tl import types
from telethon.utils import get_peer_id
@@ -30,6 +29,7 @@
from ..parsing.post import get_post_from_entry
from .utils import command_gatekeeper, parse_command, logger, parse_customization_callback_data
from . import inner
+from .types import *
SELECTED_EMOJI: Final = '🔘'
UNSELECTED_EMOJI: Final = '⚪️'
@@ -38,17 +38,19 @@
@command_gatekeeper(only_manager=True)
-async def cmd_set_option(event: Union[events.NewMessage.Event, Message], *_, lang: Optional[str] = None, **__):
+async def cmd_set_option(event: TypeEventMsgHint, *_, lang: Optional[str] = None, **__):
kv = parseKeyValuePair.match(event.raw_text)
if not kv: # return options info
options = db.EffectiveOptions.options
- msg = (
- f'{i18n[lang]["current_options"]}\n\n'
- + '\n'.join(f'{key}
= {value}
'
- f'({i18n[lang]["option_value_type"]}: {type(value).__name__}
)'
- for key, value in options.items())
- + '\n\n' + i18n[lang]['cmd_set_option_usage_prompt_html']
- )
+ msg = '\n\n'.join((
+ f'{i18n[lang]["current_options"]}',
+ '\n'.join(
+ f'{key}
= {value}
'
+ f'({i18n[lang]["option_value_type"]}: {type(value).__name__}
)'
+ for key, value in options.items()
+ ),
+ i18n[lang]['cmd_set_option_usage_prompt_html'],
+ ))
await event.respond(msg, parse_mode='html')
return
key, value = kv.groups()
@@ -72,18 +74,21 @@ async def cmd_set_option(event: Union[events.NewMessage.Event, Message], *_, lan
env.loop.create_task(inner.utils.update_interval(feed))
logger.info("Flushed the interval of all feeds")
- await event.respond(f'{i18n[lang]["option_updated"]}\n'
- f'{key}
= {value}
',
- parse_mode='html')
+ await event.respond(
+ f'{i18n[lang]["option_updated"]}\n'
+ f'{key}
= {value}
',
+ parse_mode='html',
+ )
@command_gatekeeper(only_manager=True, only_in_private_chat=False, timeout=None if env.DEBUG else 300)
async def cmd_test(
- event: Union[events.NewMessage.Event, Message],
+ event: TypeEventMsgHint,
*_,
lang: Optional[str] = None,
chat_id: Optional[int] = None,
- **__):
+ **__,
+):
chat_id = chat_id or event.chat_id
args = parse_command(event.raw_text)
@@ -123,7 +128,10 @@ async def cmd_test(
return
await asyncio.gather(
- *(__send(chat_id, entry, rss_d.feed.title, wf.url) for entry in entries_to_send)
+ *(
+ __send(chat_id, entry, rss_d.feed.title, wf.url)
+ for entry in entries_to_send
+ )
)
except Exception as e:
@@ -139,26 +147,38 @@ async def __send(chat_id, entry, feed_title, link):
@command_gatekeeper(only_manager=True)
-async def cmd_user_info_or_callback_set_user(event: Union[events.NewMessage.Event, Message, events.CallbackQuery.Event],
- *_,
- lang: Optional[str] = None,
- user_id: Optional[int] = None,
- **__):
+async def cmd_user_info_or_callback_set_user(
+ event: TypeEventCollectionMsgOrCb,
+ *_,
+ lang: Optional[str] = None,
+ user_id: Optional[int] = None,
+ **__,
+):
"""
command = `/user_info user_id` or `/user_info @username` or `/user_info`
callback data = set_user={user_id},{state}
"""
- is_callback = isinstance(event, events.CallbackQuery.Event)
+ is_callback = isinstance(event, TypeEventCb)
if user_id:
state = None
user_entity_like = user_id
elif is_callback:
user_entity_like, state, _, _ = parse_customization_callback_data(event.data)
+ assert user_entity_like is not None
state = int(state)
else:
state = None
args = parse_command(event.raw_text, strip_target_chat=False)
- if len(args) < 2 or (not args[1].lstrip('-').isdecimal() and not args[1].startswith('@')):
+ if (
+ len(args) < 2
+ or
+ not (
+ args[1].lstrip('-').isdecimal()
+ or
+ args[1].startswith('@')
+ )
+
+ ):
await event.respond(i18n[lang]['cmd_user_info_usage_prompt_html'], parse_mode='html')
return
user_entity_like = int(args[1]) if args[1].lstrip('-').isdecimal() else args[1].lstrip('@')
@@ -192,9 +212,11 @@ async def cmd_user_info_or_callback_set_user(event: Union[events.NewMessage.Even
user.state = state
await user.save()
state = None if user_id in env.MANAGER else user.state
- default_sub_limit = (db.EffectiveOptions.user_sub_limit
- if user_id > 0
- else db.EffectiveOptions.channel_or_group_sub_limit)
+ default_sub_limit = (
+ db.EffectiveOptions.user_sub_limit
+ if user_id > 0
+ else db.EffectiveOptions.channel_or_group_sub_limit
+ )
if user_created:
sub_count = 0
sub_limit = default_sub_limit
@@ -202,39 +224,63 @@ async def cmd_user_info_or_callback_set_user(event: Union[events.NewMessage.Even
else:
_, sub_count, sub_limit, is_default_limit = await inner.utils.check_sub_limit(user_id, force_count_current=True)
- msg_text = (
- f"{i18n[lang]['user_info']}\n\n"
- + (f"{name}\n" if name else '')
- + (f"{user_type} " if user_type else '') + f"{user_id}
\n"
- + (f"@{username}\n" if username else '')
- + f"\n{i18n[lang]['sub_count']}: {sub_count}"
- + f"\n{i18n[lang]['sub_limit']}: {sub_limit if sub_limit > 0 else i18n[lang]['sub_limit_unlimited']}"
- + (f" ({i18n[lang]['sub_limit_default']})" if is_default_limit else '')
- + (f"\n{i18n[lang]['participant_count']}: {participant_count}" if participant_count else '')
- + (f"\n\n{i18n[lang]['user_state']}: {i18n[lang][f'user_state_{state}']} "
- f"({i18n[lang][f'user_state_description_{state}']})" if state is not None else '')
- )
- buttons = None if user_id in env.MANAGER else tuple(filter(None, (
- *(
- (Button.inline(
- (SELECTED_EMOJI if user.state == btn_state else UNSELECTED_EMOJI)
- + '{prompt} "{state}"'.format(prompt=i18n[lang]['set_user_state_as'],
- state=i18n[lang][f'user_state_{btn_state}']),
- data='null' if user.state == btn_state else f"set_user={user_id},{btn_state}"
- ),)
- for btn_state in range(-1, 2)
- ),
- (Button.inline(f"{i18n[lang]['reset_sub_limit_to_default']} "
- f"({default_sub_limit if default_sub_limit > 0 else i18n[lang]['sub_limit_unlimited']})",
- data=f"reset_sub_limit={user_id}"),) if not is_default_limit else None,
- (Button.switch_inline(i18n[lang]['set_sub_limit_to'], query=f'/set_sub_limit {user_id} ', same_peer=True),),
+ msg_text = '\n\n'.join(filter(None, (
+ f"{i18n[lang]['user_info']}",
+ '\n'.join(filter(None, (
+ name,
+ (f'{user_type} ' if user_type else '') + f'{user_id}
',
+ f'@{username}' if username else '',
+ ))),
+ '\n'.join(filter(None, (
+ f"{i18n[lang]['sub_count']}: {sub_count}",
+ f"{i18n[lang]['sub_limit']}: {sub_limit if sub_limit > 0 else i18n[lang]['sub_limit_unlimited']}" + (
+ f" ({i18n[lang]['sub_limit_default']})" if is_default_limit else ''
+ ),
+ f"{i18n[lang]['participant_count']}: {participant_count}" if participant_count else '',
+ ))),
+ ''
+ if state is None
+ else f"{i18n[lang]['user_state']}: {i18n[lang][f'user_state_{state}']} "
+ f"({i18n[lang][f'user_state_description_{state}']})",
)))
- await event.edit(msg_text, parse_mode='html', buttons=buttons) if is_callback \
- else await event.respond(msg_text, parse_mode='html', buttons=buttons)
+ buttons = (
+ None
+ if user_id in env.MANAGER
+ else tuple(filter(None, (
+ *(
+ (Button.inline(
+ '{emoji}{prompt} "{state}"'.format(
+ emoji=SELECTED_EMOJI if user.state == btn_state else UNSELECTED_EMOJI,
+ prompt=i18n[lang]['set_user_state_as'],
+ state=i18n[lang][f'user_state_{btn_state}'],
+ ),
+ data='null' if user.state == btn_state else f"set_user={user_id},{btn_state}"
+ ),)
+ for btn_state in range(-1, 2)
+ ),
+ None
+ if is_default_limit
+ else (Button.inline(
+ f"{i18n[lang]['reset_sub_limit_to_default']} "
+ f"({default_sub_limit if default_sub_limit > 0 else i18n[lang]['sub_limit_unlimited']})",
+ data=f"reset_sub_limit={user_id}",
+ ),),
+ (Button.switch_inline(
+ i18n[lang]['set_sub_limit_to'],
+ query=f'/set_sub_limit {user_id} ',
+ same_peer=True,
+ ),),
+ )))
+ )
+ await (
+ event.edit(msg_text, parse_mode='html', buttons=buttons)
+ if is_callback
+ else event.respond(msg_text, parse_mode='html', buttons=buttons)
+ )
@command_gatekeeper(only_manager=True)
-async def callback_reset_sub_limit(event: events.CallbackQuery.Event, *_, lang: Optional[str] = None, **__):
+async def callback_reset_sub_limit(event: TypeEventCb, *_, lang: Optional[str] = None, **__):
"""
callback data = reset_sub_limit={user_id}
"""
@@ -248,7 +294,7 @@ async def callback_reset_sub_limit(event: events.CallbackQuery.Event, *_, lang:
@command_gatekeeper(only_manager=True)
-async def cmd_set_sub_limit(event: Union[events.NewMessage.Event, Message], *_, lang: Optional[str] = None, **__):
+async def cmd_set_sub_limit(event: TypeEventMsgHint, *_, lang: Optional[str] = None, **__):
"""
command = `/set_sub_limit user_id sub_limit`
"""
diff --git a/src/command/customization.py b/src/command/customization.py
index 150441d03c..0de19024da 100644
--- a/src/command/customization.py
+++ b/src/command/customization.py
@@ -17,10 +17,10 @@
from __future__ import annotations
from typing import Union, Optional
-from telethon import events, Button
-from telethon.tl.patched import Message
+from telethon import Button
from . import inner, misc
+from .types import *
from .utils import command_gatekeeper, parse_customization_callback_data, parse_callback_data_with_page, \
escape_html, parse_command_get_sub_or_user_and_param, get_callback_tail
from .. import db, env
@@ -28,15 +28,17 @@
@command_gatekeeper(only_manager=False)
-async def cmd_set_or_callback_get_set_page(event: Union[events.NewMessage.Event, Message, events.CallbackQuery.Event],
- *_,
- lang: Optional[str] = None,
- page: Optional[int] = None,
- chat_id: Optional[int] = None,
- **__): # command: /set ; callback data: get_set_page|{page_number}
+async def cmd_set_or_callback_get_set_page(
+ event: TypeEventCollectionMsgOrCb,
+ *_,
+ lang: Optional[str] = None,
+ page: Optional[int] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # command: /set ; callback data: get_set_page|{page_number}
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
- is_callback = isinstance(event, events.CallbackQuery.Event)
+ is_callback = isinstance(event, TypeEventCb)
if not page:
_, page = parse_callback_data_with_page(event.data) if is_callback else (None, 1)
@@ -46,22 +48,29 @@ async def cmd_set_or_callback_get_set_page(event: Union[events.NewMessage.Event,
buttons = None
else:
msg = i18n[lang]['set_choose_sub_prompt']
- buttons = await inner.utils.get_sub_choosing_buttons(chat_id, page_number=page, lang=lang,
- callback='set',
- get_page_callback='get_set_page',
- tail=callback_tail)
+ buttons = await inner.utils.get_sub_choosing_buttons(
+ chat_id, page_number=page, lang=lang,
+ callback='set',
+ get_page_callback='get_set_page',
+ tail=callback_tail,
+ )
- await event.edit(msg, buttons=buttons) if is_callback \
- else await event.respond(msg, buttons=buttons)
+ await (
+ event.edit(msg, buttons=buttons)
+ if is_callback
+ else event.respond(msg, buttons=buttons)
+ )
@command_gatekeeper(only_manager=False)
-async def callback_set(event: events.CallbackQuery.Event,
- set_user_default: bool,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def callback_set(
+ event: TypeEventCb,
+ set_user_default: bool,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
# callback data = set={sub_id}[,{action}[,{param}]][|{page_number}]
# or set_default[={action}[,{param}]]
"""
@@ -83,10 +92,10 @@ async def callback_set(event: events.CallbackQuery.Event,
await cmd_set_or_callback_get_set_page.__wrapped__(event, lang=lang, page=page, chat_id=chat_id)
return
- sub_or_user: Union[db.Sub, db.User] = (
- await db.User.get_or_none(id=chat_id)
+ sub_or_user: Union[db.Sub, db.User] = await (
+ db.User.get_or_none(id=chat_id)
if set_user_default
- else await db.Sub.get_or_none(id=sub_id, user=chat_id).prefetch_related('feed', 'user')
+ else db.Sub.get_or_none(id=sub_id, user=chat_id).prefetch_related('feed', 'user')
)
if sub_or_user is None:
await event.edit(i18n[lang]['subscription_not_exist'])
@@ -94,8 +103,12 @@ async def callback_set(event: events.CallbackQuery.Event,
if (
action is None
- or (action in {'interval', 'length_limit'} and (isinstance(param, int) or param == 'default'))
- or action == 'activate' and not set_user_default
+ or
+ (
+ action in {'interval', 'length_limit'}
+ and (isinstance(param, int) or param == 'default')
+ )
+ or (action == 'activate' and not set_user_default)
or action in inner.customization.SUB_OPTIONS_EXHAUSTIVE_VALUES
):
if action == 'interval' and (isinstance(param, int) or param == 'default'):
@@ -104,42 +117,57 @@ async def callback_set(event: events.CallbackQuery.Event,
await inner.customization.set_length_limit(sub_or_user, param if param != 'default' else -100)
elif action == 'activate' and not set_user_default:
await inner.customization.set_sub_activate(sub_or_user)
- elif action == 'display_media' and not set_user_default and \
- (sub_or_user.send_mode if sub_or_user.send_mode != -100 else sub_or_user.user.send_mode) in {1, -1}:
+ elif (
+ action == 'display_media'
+ and not set_user_default
+ and
+ (
+ sub_or_user.send_mode
+ if sub_or_user.send_mode != -100
+ else sub_or_user.user.send_mode
+ ) in {1, -1}
+ ):
await event.answer(i18n[lang]['display_media_only_effective_if_send_mode_auto_and_telegram'],
alert=True)
return
elif action is not None and action in inner.customization.SUB_OPTIONS_EXHAUSTIVE_VALUES:
await inner.customization.set_exhaustive_option(sub_or_user, action)
- info = (
- i18n[lang]['set_user_default_description']
- + '\n\n'
- + i18n[lang]['read_formatting_settings_guidebook_html']
+ info = '\n\n'.join((
+ i18n[lang]['set_user_default_description'],
+ i18n[lang]['read_formatting_settings_guidebook_html']
if set_user_default
- else await inner.customization.get_sub_info(
- sub_or_user, lang, additional_guide=True
- )
+ else await inner.customization.get_sub_info(sub_or_user, lang, additional_guide=True),
+ ))
+ buttons = await inner.customization.get_customization_buttons(
+ sub_or_user, lang=lang, page=page, tail=callback_tail,
)
- buttons = await inner.customization.get_customization_buttons(sub_or_user, lang=lang, page=page,
- tail=callback_tail)
await event.edit(info, buttons=buttons, parse_mode='html', link_preview=False)
return
if action == 'interval':
msg = i18n[lang]['set_interval_prompt']
- buttons = await inner.customization.get_set_interval_buttons(sub_or_user, lang=lang, page=page,
- tail=callback_tail)
+ buttons = await inner.customization.get_set_interval_buttons(
+ sub_or_user, lang=lang, page=page, tail=callback_tail,
+ )
await event.edit(msg, buttons=buttons)
return
if action == 'length_limit':
- if not set_user_default and \
- (sub_or_user.send_mode if sub_or_user.send_mode != -100 else sub_or_user.user.send_mode) != 0:
+ if (
+ not set_user_default
+ and
+ (
+ sub_or_user.send_mode
+ if sub_or_user.send_mode != -100
+ else sub_or_user.user.send_mode
+ ) != 0
+ ):
await event.answer(i18n[lang]['length_limit_only_effective_if_send_mode_auto'], alert=True)
return
msg = i18n[lang]['set_length_limit_prompt']
- buttons = await inner.customization.get_set_length_limit_buttons(sub_or_user, lang=lang, page=page,
- tail=callback_tail)
+ buttons = await inner.customization.get_set_length_limit_buttons(
+ sub_or_user, lang=lang, page=page, tail=callback_tail,
+ )
await event.edit(msg, buttons=buttons)
return
@@ -147,25 +175,29 @@ async def callback_set(event: events.CallbackQuery.Event,
@command_gatekeeper(only_manager=False)
-async def cmd_set_default(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # cmd: set_default
+async def cmd_set_default(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # cmd: set_default
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
user = await db.User.get_or_none(id=chat_id)
- msg = i18n[lang]['set_user_default_description'] + '\n\n' + i18n[lang]['read_formatting_settings_guidebook_html']
+ msg = f"{i18n[lang]['set_user_default_description']}\n\n{i18n[lang]['read_formatting_settings_guidebook_html']}"
buttons = await inner.customization.get_customization_buttons(user, lang=lang, tail=callback_tail)
await event.respond(msg, buttons=buttons, parse_mode='html', link_preview=False)
@command_gatekeeper(only_manager=False)
-async def callback_reset(event: events.CallbackQuery.Event,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # callback data = reset={sub_id}
+async def callback_reset(
+ event: TypeEventCb,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data = reset={sub_id}
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
sub_id, _, _, page = parse_customization_callback_data(event.data)
@@ -189,11 +221,13 @@ async def callback_reset(event: events.CallbackQuery.Event,
@command_gatekeeper(only_manager=False)
-async def callback_reset_all_confirm(event: events.CallbackQuery.Event,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # callback data = reset_all_confirm
+async def callback_reset_all_confirm(
+ event: TypeEventCb,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data = reset_all_confirm
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
if await inner.utils.have_subs(chat_id):
@@ -201,19 +235,21 @@ async def callback_reset_all_confirm(event: events.CallbackQuery.Event,
i18n[lang]['reset_all_confirm_prompt'],
buttons=[
[Button.inline(i18n[lang]['reset_all_confirm'], data=f'reset_all{callback_tail}')],
- [Button.inline(i18n[lang]['reset_all_cancel'], data=f'set_default{callback_tail}')]
- ]
+ [Button.inline(i18n[lang]['reset_all_cancel'], data=f'set_default{callback_tail}')],
+ ],
)
return
await event.edit(i18n[lang]['no_subscription'])
@command_gatekeeper(only_manager=False)
-async def callback_reset_all(event: events.CallbackQuery.Event,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # callback data = reset_all
+async def callback_reset_all(
+ event: TypeEventCb,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data = reset_all
chat_id = chat_id or event.chat_id
subs = await db.Sub.filter(user=chat_id)
tasks = []
@@ -224,43 +260,53 @@ async def callback_reset_all(event: events.CallbackQuery.Event,
sub.interval = None
sub.length_limit = sub.notify = sub.send_mode = sub.link_preview = sub.display_author = sub.display_media = \
sub.display_title = sub.display_entry_tags = sub.display_via = sub.style = -100
- await db.Sub.bulk_update(subs, ('interval', 'length_limit', 'notify', 'send_mode', 'link_preview', 'display_author',
- 'display_media', 'display_title', 'display_entry_tags', 'display_via', 'style'))
+ await db.Sub.bulk_update(
+ subs,
+ (
+ 'interval', 'length_limit', 'notify', 'send_mode', 'link_preview', 'display_author', 'display_media',
+ 'display_title', 'display_entry_tags', 'display_via', 'style',
+ )
+ )
for task in tasks:
env.loop.create_task(task)
await event.edit(i18n[lang]['reset_all_successful'])
@command_gatekeeper(only_manager=False)
-async def cmd_activate_or_deactivate_subs(event: Union[events.NewMessage.Event, Message],
- activate: bool,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # cmd: activate_subs | deactivate_subs
+async def cmd_activate_or_deactivate_subs(
+ event: TypeEventMsgHint,
+ activate: bool,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # cmd: activate_subs | deactivate_subs
await callback_get_activate_or_deactivate_page.__wrapped__(event, activate, lang=lang, chat_id=chat_id, page=1)
@command_gatekeeper(only_manager=False)
-async def callback_get_activate_or_deactivate_page(event: Union[events.CallbackQuery.Event,
- events.NewMessage.Event,
- Message],
- activate: bool,
- *_,
- lang: Optional[str] = None,
- page: Optional[int] = None,
- chat_id: Optional[int] = None,
- **__): # callback data: get_(activate|deactivate)_page|{page}
+async def callback_get_activate_or_deactivate_page(
+ event: TypeEventCollectionMsgOrCb,
+ activate: bool,
+ *_,
+ lang: Optional[str] = None,
+ page: Optional[int] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data: get_(activate|deactivate)_page|{page}
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
- event_is_msg = not isinstance(event, events.CallbackQuery.Event)
+ event_is_msg = not isinstance(event, TypeEventCb)
if page is None:
page = 1 if event_is_msg else int(parse_callback_data_with_page(event.data)[1])
have_subs = await inner.utils.have_subs(chat_id)
if not have_subs:
no_subscription_msg = i18n[lang]['no_subscription']
- await (event.respond(no_subscription_msg) if event_is_msg
- else event.edit(no_subscription_msg))
+ await (
+ event.respond(no_subscription_msg)
+ if event_is_msg
+ else event.edit(no_subscription_msg)
+ )
return
sub_buttons = await inner.utils.get_sub_choosing_buttons(
chat_id,
@@ -270,41 +316,51 @@ async def callback_get_activate_or_deactivate_page(event: Union[events.CallbackQ
lang=lang,
rows=11,
state=0 if activate else 1,
- tail=callback_tail
+ tail=callback_tail,
)
- msg = i18n[lang]['choose_sub_to_be_activated' if sub_buttons else 'all_subs_are_activated'] if activate \
+ msg = (
+ i18n[lang]['choose_sub_to_be_activated' if sub_buttons else 'all_subs_are_activated']
+ if activate
else i18n[lang]['choose_sub_to_be_deactivated' if sub_buttons else 'all_subs_are_deactivated']
+ )
activate_or_deactivate_all_subs_str = 'activate_all_subs' if activate else 'deactivate_all_subs'
buttons = (
- (
- (Button.inline(i18n[lang][activate_or_deactivate_all_subs_str],
- data=activate_or_deactivate_all_subs_str + callback_tail),),
- )
- + sub_buttons
+ (Button.inline(
+ i18n[lang][activate_or_deactivate_all_subs_str],
+ data=f'{activate_or_deactivate_all_subs_str}{callback_tail}',
+ ),),
+ *sub_buttons,
) if sub_buttons else None
- await (event.respond(msg, buttons=buttons) if event_is_msg
- else event.edit(msg, buttons=buttons))
+ await (
+ event.respond(msg, buttons=buttons)
+ if event_is_msg
+ else event.edit(msg, buttons=buttons)
+ )
@command_gatekeeper(only_manager=False)
-async def callback_activate_or_deactivate_all_subs(event: events.CallbackQuery.Event,
- activate: bool,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # callback data: (activate|deactivate)_all_subs
+async def callback_activate_or_deactivate_all_subs(
+ event: TypeEventCb,
+ activate: bool,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data: (activate|deactivate)_all_subs
chat_id = chat_id or event.chat_id
await inner.utils.activate_or_deactivate_all_subs(chat_id, activate=activate)
await callback_get_activate_or_deactivate_page.__wrapped__(event, activate, lang=lang, chat_id=chat_id, page=1)
@command_gatekeeper(only_manager=False)
-async def callback_activate_or_deactivate_sub(event: events.CallbackQuery.Event,
- activate: bool,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # callback data: (activate|deactivate)_sub={id}|{page}
+async def callback_activate_or_deactivate_sub(
+ event: TypeEventCb,
+ activate: bool,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data: (activate|deactivate)_sub={id}|{page}
chat_id = chat_id or event.chat_id
sub_id, page = parse_callback_data_with_page(event.data)
sub_id = int(sub_id)
@@ -316,10 +372,12 @@ async def callback_activate_or_deactivate_sub(event: events.CallbackQuery.Event,
@command_gatekeeper(only_manager=False)
-async def callback_del_subs_title(event: events.CallbackQuery.Event,
- *_,
- chat_id: Optional[int] = None,
- **__): # callback data: del_subs_title={id_start}-{id_end}|{id_start}-{id_end}|...
+async def callback_del_subs_title(
+ event: TypeEventCb,
+ *_,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data: del_subs_title={id_start}-{id_end}|{id_start}-{id_end}|...
chat_id = chat_id or event.chat_id
id_ranges_str = event.data.decode().strip().split('=')[-1].split('%')[0].split('|')
subs = []
@@ -333,11 +391,13 @@ async def callback_del_subs_title(event: events.CallbackQuery.Event,
@command_gatekeeper(only_manager=False)
-async def cmd_set_title(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def cmd_set_title(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
sub, title = await parse_command_get_sub_or_user_and_param(event.raw_text, chat_id, max_split=2)
@@ -350,27 +410,33 @@ async def cmd_set_title(event: Union[events.NewMessage.Event, Message],
return
await inner.customization.set_sub_title(sub, title)
await event.respond(
- (
- ((i18n[lang]['set_title_success'] + '\n' + f'{escape_html(title)}
')
- if title
- else i18n[lang]['set_title_success_cleared'])
- + '\n\n' +
- await inner.customization.get_sub_info(sub, lang=lang)
- ),
- buttons=(Button.inline(i18n[lang]['other_settings_button'], data=f'set={sub.id}' + callback_tail),),
- parse_mode='html', link_preview=False)
+ '\n\n'.join((
+ (
+ f"{i18n[lang]['set_title_success']}\n{escape_html(title)}
"
+ if title
+ else i18n[lang]['set_title_success_cleared']
+ ),
+ await inner.customization.get_sub_info(sub, lang=lang),
+ )),
+ buttons=(Button.inline(i18n[lang]['other_settings_button'], data=f'set={sub.id}{callback_tail}'),),
+ parse_mode='html',
+ link_preview=False,
+ )
@command_gatekeeper(only_manager=False)
-async def cmd_set_interval(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def cmd_set_interval(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
- sub_or_user, interval = await parse_command_get_sub_or_user_and_param(event.raw_text, chat_id,
- allow_setting_user_default=True)
+ sub_or_user, interval = await parse_command_get_sub_or_user_and_param(
+ event.raw_text, chat_id, allow_setting_user_default=True,
+ )
interval = int(interval) if interval and interval.isdigit() and int(interval) >= 1 else None
minimal_interval = db.EffectiveOptions.minimal_interval
if not sub_or_user:
@@ -380,32 +446,37 @@ async def cmd_set_interval(event: Union[events.NewMessage.Event, Message],
await event.respond(i18n[lang]['cmd_set_interval_usage_prompt_html'], parse_mode='html')
return
if interval < minimal_interval:
- await event.respond(i18n[lang]['set_interval_failure_too_small_html'] % minimal_interval,
- parse_mode='html')
+ await event.respond(
+ i18n[lang]['set_interval_failure_too_small_html'] % minimal_interval, parse_mode='html',
+ )
return
await inner.customization.set_interval(sub_or_user, interval)
await event.respond(
(
- ((i18n[lang]['set_interval_success_html'] % (interval,))
- + '\n\n' +
- await inner.customization.get_sub_info(sub_or_user, lang=lang))
+ '{msg}\n\n{sub_info}'.format(
+ msg=i18n[lang]['set_interval_success_html'] % interval,
+ sub_info=await inner.customization.get_sub_info(sub_or_user, lang=lang),
+ )
if isinstance(sub_or_user, db.Sub)
- else i18n[lang]['set_default_interval_success_html'] % (interval,)
+ else i18n[lang]['set_default_interval_success_html'] % interval
),
- buttons=(Button.inline(i18n[lang]['other_settings_button'],
- data=(
- (f'set={sub_or_user.id}' if isinstance(sub_or_user, db.Sub) else 'set_default')
- + callback_tail)
- ),),
- parse_mode='html', link_preview=False)
+ buttons=(Button.inline(
+ i18n[lang]['other_settings_button'],
+ data=(f'set={sub_or_user.id}' if isinstance(sub_or_user, db.Sub) else 'set_default') + callback_tail
+ ),),
+ parse_mode='html',
+ link_preview=False,
+ )
@command_gatekeeper(only_manager=False)
-async def cmd_set_hashtags(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def cmd_set_hashtags(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
sub, hashtags = await parse_command_get_sub_or_user_and_param(event.raw_text, chat_id, max_split=2)
@@ -422,13 +493,15 @@ async def cmd_set_hashtags(event: Union[events.NewMessage.Event, Message],
await event.respond(i18n[lang]['set_hashtags_failure_too_many'])
return
await event.respond(
- (
- ((i18n[lang]['set_hashtags_success_html'] + '\n'
- + f'{inner.utils.construct_hashtags(sub.tags)}')
- if sub.tags
- else i18n[lang]['set_hashtags_success_cleared'])
- + '\n\n' +
- await inner.customization.get_sub_info(sub, lang=lang)
- ),
+ '\n\n'.join((
+ (
+ f"{i18n[lang]['set_hashtags_success_html']}\n{inner.utils.construct_hashtags(sub.tags)}"
+ if sub.tags
+ else i18n[lang]['set_hashtags_success_cleared']
+ ),
+ await inner.customization.get_sub_info(sub, lang=lang),
+ )),
buttons=(Button.inline(i18n[lang]['other_settings_button'], data=f'set={sub.id}' + callback_tail),),
- parse_mode='html', link_preview=False)
+ parse_mode='html',
+ link_preview=False,
+ )
diff --git a/src/command/misc.py b/src/command/misc.py
index f642e31204..440cf94d96 100644
--- a/src/command/misc.py
+++ b/src/command/misc.py
@@ -15,11 +15,10 @@
# along with this program. If not, see .
from __future__ import annotations
-from typing import Optional, Union
+from typing import Optional
from contextlib import suppress
from telethon import events, types, Button
-from telethon.tl.patched import Message
from telethon.errors import RPCError
from .. import env, db
@@ -29,14 +28,12 @@
)
from ..i18n import i18n, get_commands_list
from . import inner
+from .types import *
@command_gatekeeper(only_manager=False, ignore_tg_lang=True)
async def cmd_start(
- event: Union[
- events.NewMessage.Event, Message,
- events.ChatAction.Event,
- ],
+ event: TypeEventCollectionMsgOrChatAction,
*_,
lang=None,
**__,
@@ -49,10 +46,7 @@ async def cmd_start(
@command_gatekeeper(only_manager=False)
async def cmd_lang(
- event: Union[
- events.NewMessage.Event, Message,
- events.ChatAction.Event,
- ],
+ event: TypeEventCollectionMsgOrChatAction,
*_,
chat_id: Optional[int] = None,
**__,
@@ -66,7 +60,7 @@ async def cmd_lang(
@command_gatekeeper(only_manager=False)
async def callback_set_lang(
- event: events.CallbackQuery.Event,
+ event: TypeEventCb,
*_,
chat_id: Optional[int] = None,
**__,
@@ -75,9 +69,11 @@ async def callback_set_lang(
lang, _ = parse_callback_data_with_page(event.data)
welcome_msg = i18n[lang]['welcome_prompt']
await db.User.update_or_create(defaults={'lang': lang}, id=chat_id)
- await set_bot_commands(scope=types.BotCommandScopePeer(await event.get_input_chat()),
- lang_code='',
- commands=get_commands_list(lang=lang, manager=chat_id in env.MANAGER))
+ await set_bot_commands(
+ scope=types.BotCommandScopePeer(await event.get_input_chat()),
+ lang_code='',
+ commands=get_commands_list(lang=lang, manager=chat_id in env.MANAGER),
+ )
logger.info(f'Changed language to {lang} for {chat_id}')
help_button = Button.inline(text=i18n[lang]['cmd_description_help'], data='help')
await event.edit(welcome_msg, buttons=help_button)
@@ -85,11 +81,7 @@ async def callback_set_lang(
@command_gatekeeper(only_manager=False)
async def cmd_or_callback_help(
- event: Union[
- events.NewMessage.Event, Message,
- events.ChatAction.Event,
- events.CallbackQuery.Event,
- ],
+ event: TypeEventCollectionMsgLike,
*_,
lang: Optional[str] = None,
**__,
@@ -99,19 +91,19 @@ async def cmd_or_callback_help(
msg += '\n\n' + i18n[lang]['usage_in_channel_or_group_prompt_html']
await (
event.respond(msg, parse_mode='html', link_preview=False)
- if isinstance(event, events.NewMessage.Event) or not hasattr(event, 'edit')
+ if isinstance(event, TypeEventMsg) or not hasattr(event, 'edit')
else event.edit(msg, parse_mode='html', link_preview=False)
)
@command_gatekeeper(only_manager=False)
-async def cmd_version(event: Union[events.NewMessage.Event, Message], *_, **__):
+async def cmd_version(event: TypeEventMsgHint, *_, **__):
await event.respond(env.VERSION)
@command_gatekeeper(only_manager=False)
async def callback_cancel(
- event: events.CallbackQuery.Event,
+ event: TypeEventCb,
*_,
lang: Optional[str] = None,
**__,
@@ -121,7 +113,7 @@ async def callback_cancel(
@command_gatekeeper(only_manager=False, allow_in_old_fashioned_groups=True)
async def callback_get_group_migration_help(
- event: events.CallbackQuery.Event,
+ event: TypeEventCb,
*_,
**__,
): # callback data: get_group_migration_help={lang_code}
@@ -136,14 +128,14 @@ async def callback_get_group_migration_help(
# bypassing command gatekeeper
-async def callback_null(event: events.CallbackQuery.Event): # callback data = null
+async def callback_null(event: TypeEventCb): # callback data = null
await event.answer(cache_time=3600)
raise events.StopPropagation
@command_gatekeeper(only_manager=False)
async def callback_del_buttons(
- event: events.CallbackQuery.Event,
+ event: TypeEventCb,
*_,
**__,
): # callback data = del_buttons
@@ -154,7 +146,7 @@ async def callback_del_buttons(
@command_gatekeeper(only_manager=False, allow_in_others_private_chat=False, quiet=True)
async def inline_command_constructor(
- event: events.InlineQuery.Event,
+ event: TypeEventInline,
*_,
lang: Optional[str] = None,
**__,
@@ -163,11 +155,15 @@ async def inline_command_constructor(
builder = event.builder
text = query.query.strip()
if not text:
- await event.answer(switch_pm=i18n[lang]['permission_denied_input_command'],
- switch_pm_param=str(event.id),
- cache_time=3600,
- private=False)
+ await event.answer(
+ switch_pm=i18n[lang]['permission_denied_input_command'],
+ switch_pm_param=str(event.id),
+ cache_time=3600,
+ private=False,
+ )
return
- await event.answer(results=[builder.article(title=text, text=text)],
- cache_time=3600,
- private=False)
+ await event.answer(
+ results=[builder.article(title=text, text=text)],
+ cache_time=3600,
+ private=False,
+ )
diff --git a/src/command/opml.py b/src/command/opml.py
index ff97aeb5eb..9d8f566816 100644
--- a/src/command/opml.py
+++ b/src/command/opml.py
@@ -15,12 +15,12 @@
# along with this program. If not, see .
from __future__ import annotations
-from typing import Union, Optional
+from typing import Optional
import listparser
-from datetime import datetime
+from datetime import datetime, timezone
from functools import partial
-from telethon import events, Button
+from telethon import Button
from telethon.tl import types
from telethon.tl.patched import Message
@@ -29,57 +29,69 @@
from ..aio_helper import run_async
from ..i18n import i18n
from . import inner
+from .types import *
from .utils import command_gatekeeper, logger, send_success_and_failure_msg, get_callback_tail, check_sub_limit
@command_gatekeeper(only_manager=False)
-async def cmd_import(event: Union[events.NewMessage.Event, Message],
- *_,
- chat_id: Optional[int] = None,
- lang: Optional[str] = None,
- **__):
+async def cmd_import(
+ event: TypeEventMsgHint,
+ *_,
+ chat_id: Optional[int] = None,
+ lang: Optional[str] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
await check_sub_limit(event, user_id=chat_id, lang=lang)
await event.respond(
- i18n[lang]['send_opml_prompt'] + (
- '\n\n'
- + i18n[lang]['import_for_channel_or_group_prompt'] if event.is_private else ''
- ),
+ '\n\n'.join(filter(None, (
+ i18n[lang]['send_opml_prompt'],
+ i18n[lang]['import_for_channel_or_group_prompt'] if event.is_private else '',
+ ))),
buttons=(
Button.force_reply(
single_use=True,
selective=True,
- placeholder=i18n[lang]['send_opml_reply_placeholder']
- ) if event.is_group and chat_id == event.chat_id else None
+ placeholder=i18n[lang]['send_opml_reply_placeholder'],
+ )
+ if event.is_group and chat_id == event.chat_id
+ else None
),
reply_to=event.id if event.is_group else None
)
@command_gatekeeper(only_manager=False)
-async def cmd_export(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def cmd_export(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
opml_file = await inner.sub.export_opml(chat_id)
if opml_file is None:
await event.respond(i18n[lang]['no_subscription'])
return
- await event.respond(file=opml_file,
- attributes=(types.DocumentAttributeFilename(
- f"RSStT_export_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}.opml"),))
+ await event.respond(
+ file=opml_file,
+ attributes=(
+ types.DocumentAttributeFilename(f"RSStT_export_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}.opml"),
+ ),
+ )
@command_gatekeeper(only_manager=False, timeout=300)
-async def opml_import(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def opml_import(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
reply_message: Optional[Message] = await event.get_reply_message()
@@ -98,13 +110,20 @@ async def opml_import(event: Union[events.NewMessage.Event, Message],
logger.warning(f'Failed to get opml file from {chat_id}: ', exc_info=e)
return
- reply: Message = await event.reply(i18n[lang]['processing'] + '\n' + i18n[lang]['opml_import_processing'])
+ reply: Message = await event.reply(
+ '\n'.join((
+ i18n[lang]['processing'],
+ i18n[lang]['opml_import_processing'],
+ ))
+ )
logger.info(f'Got an opml file from {chat_id}')
opml_d = await run_async(
- partial(bozo_exception_removal_wrapper,
- listparser.parse, opml_file),
- prefer_pool='thread' if len(opml_file) < 64 * 1024 else None
+ partial(
+ bozo_exception_removal_wrapper,
+ listparser.parse, opml_file,
+ ),
+ prefer_pool='thread' if len(opml_file) < 64 * 1024 else None,
)
if not opml_d.feeds:
await reply.edit('ERROR: ' + i18n[lang]['opml_parse_error'])
@@ -114,8 +133,11 @@ async def opml_import(event: Union[events.NewMessage.Event, Message],
import_result = await inner.sub.subs(
chat_id,
tuple(
- (feed.url, feed.text) if feed.text and feed.text != feed.title_orig else feed.url
- for feed in opml_d.feeds
+ (
+ (feed.url, feed.text)
+ if feed.text and feed.text != feed.title_orig
+ else feed.url
+ ) for feed in opml_d.feeds
),
lang=lang
)
@@ -139,9 +161,11 @@ async def opml_import(event: Union[events.NewMessage.Event, Message],
continue
elif sum(sub.title is not None for sub in subs
if sub.id in range(curr_start, curr_id + 1)): # if any sub has custom title
- subs_between_w_title_count = await db.Sub.filter(user_id=chat_id,
- id__in=(curr_id + 1, next_id - 1),
- title__not_isnull=True).count()
+ subs_between_w_title_count = await db.Sub.filter(
+ user_id=chat_id,
+ id__in=(curr_id + 1, next_id - 1),
+ title__not_isnull=True,
+ ).count()
if not subs_between_w_title_count:
continue
sub_ranges.append((curr_start, curr_id))
@@ -152,8 +176,11 @@ async def opml_import(event: Union[events.NewMessage.Event, Message],
if not sub_ranges:
return # no subscription set custom title
- button_data = 'del_subs_title=' + '|'.join(f'{start}-{end}' for start, end in sub_ranges) \
- + get_callback_tail(event, chat_id)
+ button_data = ''.join((
+ 'del_subs_title=',
+ '|'.join(f'{start}-{end}' for start, end in sub_ranges),
+ get_callback_tail(event, chat_id),
+ ))
if len(button_data) <= 64: # Telegram API limit
button = [
[Button.inline(i18n[lang]['delete_subs_title_button'], button_data)],
diff --git a/src/command/sub.py b/src/command/sub.py
index 09c0603a5c..7f2012bd90 100644
--- a/src/command/sub.py
+++ b/src/command/sub.py
@@ -15,25 +15,30 @@
# along with this program. If not, see .
from __future__ import annotations
-from typing import Union, Optional
+from typing import Optional
-from telethon import events, Button
+from telethon import Button
from telethon.tl import types
from telethon.tl.patched import Message
from .. import env
from ..i18n import i18n
from . import inner
-from .utils import command_gatekeeper, parse_command, escape_html, parse_callback_data_with_page, \
- send_success_and_failure_msg, get_callback_tail, check_sub_limit
+from .types import *
+from .utils import (
+ command_gatekeeper, parse_command, escape_html, parse_callback_data_with_page,
+ send_success_and_failure_msg, get_callback_tail, check_sub_limit,
+)
@command_gatekeeper(only_manager=False)
-async def cmd_sub(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def cmd_sub(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
await check_sub_limit(event, user_id=chat_id, lang=lang)
@@ -42,9 +47,11 @@ async def cmd_sub(event: Union[events.NewMessage.Event, Message],
filtered_urls = inner.utils.filter_urls(args)
allow_reply = (event.is_private or event.is_group) and chat_id == event.chat_id
- prompt = (i18n[lang]['sub_reply_feed_url_prompt_html']
- if allow_reply
- else i18n[lang]['sub_usage_in_channel_html'])
+ prompt = (
+ i18n[lang]['sub_reply_feed_url_prompt_html']
+ if allow_reply
+ else i18n[lang]['sub_usage_in_channel_html']
+ )
if not filtered_urls:
await event.respond(
@@ -57,7 +64,8 @@ async def cmd_sub(event: Union[events.NewMessage.Event, Message],
placeholder='url1 url2 url3 ...'
)
# do not force reply in private chat
- if event.is_group else None
+ if event.is_group
+ else None
),
reply_to=event.id if event.is_group else None
)
@@ -66,7 +74,8 @@ async def cmd_sub(event: Union[events.NewMessage.Event, Message],
# delete the force reply message
reply_message: Optional[Message] = await event.get_reply_message()
if (
- reply_message and reply_message.sender_id == env.bot_id
+ reply_message
+ and reply_message.sender_id == env.bot_id
and isinstance(reply_message.reply_markup, types.ReplyKeyboardForceReply)
):
env.loop.create_task(reply_message.delete())
@@ -83,11 +92,13 @@ async def cmd_sub(event: Union[events.NewMessage.Event, Message],
@command_gatekeeper(only_manager=False)
-async def cmd_unsub(event: Union[events.NewMessage.Event, Message],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__):
+async def cmd_unsub(
+ event: TypeEventMsgHint,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+):
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
args = parse_command(event.raw_text)
@@ -95,25 +106,31 @@ async def cmd_unsub(event: Union[events.NewMessage.Event, Message],
unsub_result = await inner.sub.unsubs(chat_id, args, lang=lang)
if unsub_result is None:
- buttons = await inner.utils.get_sub_choosing_buttons(chat_id, lang=lang, page_number=1, callback='unsub',
- get_page_callback='get_unsub_page', tail=callback_tail)
- await event.respond(i18n[lang]['unsub_choose_sub_prompt_html'] if buttons else i18n[lang]['no_subscription'],
- buttons=buttons,
- parse_mode='html')
+ buttons = await inner.utils.get_sub_choosing_buttons(
+ chat_id, lang=lang,
+ page_number=1, callback='unsub', get_page_callback='get_unsub_page', tail=callback_tail,
+ )
+ await event.respond(
+ i18n[lang]['unsub_choose_sub_prompt_html'] if buttons else i18n[lang]['no_subscription'],
+ buttons=buttons,
+ parse_mode='html',
+ )
return
await send_success_and_failure_msg(event, **unsub_result, lang=lang, edit=False)
@command_gatekeeper(only_manager=False)
-async def cmd_or_callback_unsub_all(event: Union[events.NewMessage.Event, Message, events.CallbackQuery.Event],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # command = /unsub_all, callback data = unsub_all
+async def cmd_or_callback_unsub_all(
+ event: TypeEventCollectionMsgOrCb,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # command = /unsub_all, callback data = unsub_all
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
- is_callback = isinstance(event, events.CallbackQuery.Event)
+ is_callback = isinstance(event, TypeEventCb)
if is_callback:
backup_file = await inner.sub.export_opml(chat_id)
if backup_file is None:
@@ -123,7 +140,7 @@ async def cmd_or_callback_unsub_all(event: Union[events.NewMessage.Event, Messag
file=backup_file,
attributes=(
types.DocumentAttributeFilename("RSStT_unsub_all_backup.opml"),
- )
+ ),
)
unsub_all_result = await inner.sub.unsub_all(chat_id, lang=lang)
@@ -135,58 +152,70 @@ async def cmd_or_callback_unsub_all(event: Union[events.NewMessage.Event, Messag
i18n[lang]['unsub_all_confirm_prompt'],
buttons=[
[Button.inline(i18n[lang]['unsub_all_confirm'], data=f'unsub_all{callback_tail}')],
- [Button.inline(i18n[lang]['unsub_all_cancel'], data='cancel')]
- ]
+ [Button.inline(i18n[lang]['unsub_all_cancel'], data='cancel')],
+ ],
)
return
await event.respond(i18n[lang]['no_subscription'])
@command_gatekeeper(only_manager=False)
-async def cmd_list_or_callback_get_list_page(event: Union[events.NewMessage.Event, Message, events.CallbackQuery.Event],
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # command = /list, callback data = get_list_page|{page_number}
+async def cmd_list_or_callback_get_list_page(
+ event: TypeEventCollectionMsgOrCb,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # command = /list, callback data = get_list_page|{page_number}
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
- is_callback = isinstance(event, events.CallbackQuery.Event)
+ is_callback = isinstance(event, TypeEventCb)
if is_callback:
_, page_number = parse_callback_data_with_page(event.data)
else:
page_number = 1
# Telegram only allow <= 100 parsing entities in a message
- page_number, page_count, page, sub_count = \
- await inner.utils.get_sub_list_by_page(user_id=chat_id, page_number=page_number, size=99)
+ page_number, page_count, page, sub_count = await inner.utils.get_sub_list_by_page(
+ user_id=chat_id, page_number=page_number, size=99,
+ )
if page_count == 0:
await event.respond(i18n[lang]['no_subscription'])
return
- list_result = (
- f'{i18n[lang]["subscription_list"]}' # it occupies a parsing entity
- + '\n'
- + '\n'.join(f'{escape_html(sub.title or sub.feed.title)}' for sub in page)
+ list_result = ''.join((
+ f'{i18n[lang]["subscription_list"]}\n', # it occupies a parsing entity
+ '\n'.join(
+ f'{escape_html(sub.title or sub.feed.title)}'
+ for sub in page
+ )
+ ))
+
+ page_buttons = inner.utils.get_page_buttons(
+ page_number=page_number,
+ page_count=page_count,
+ get_page_callback='get_list_page',
+ total_count=sub_count,
+ lang=lang,
+ tail=callback_tail,
)
- page_buttons = inner.utils.get_page_buttons(page_number=page_number,
- page_count=page_count,
- get_page_callback='get_list_page',
- total_count=sub_count,
- lang=lang,
- tail=callback_tail)
-
- await event.edit(list_result, parse_mode='html', buttons=page_buttons) if is_callback else \
- await event.respond(list_result, parse_mode='html', buttons=page_buttons)
+ await (
+ event.edit(list_result, parse_mode='html', buttons=page_buttons)
+ if is_callback
+ else event.respond(list_result, parse_mode='html', buttons=page_buttons)
+ )
@command_gatekeeper(only_manager=False)
-async def callback_unsub(event: events.CallbackQuery.Event,
- *_,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # callback data = unsub={sub_id}|{page}
+async def callback_unsub(
+ event: TypeEventCb,
+ *_,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data = unsub={sub_id}|{page}
chat_id = chat_id or event.chat_id
sub_id, page = parse_callback_data_with_page(event.data)
sub_id = int(sub_id)
@@ -200,17 +229,20 @@ async def callback_unsub(event: events.CallbackQuery.Event,
@command_gatekeeper(only_manager=False)
-async def callback_get_unsub_page(event: events.CallbackQuery.Event,
- *_,
- page: Optional[int] = None,
- lang: Optional[str] = None,
- chat_id: Optional[int] = None,
- **__): # callback data = get_unsub_page|{page_number}
+async def callback_get_unsub_page(
+ event: TypeEventCb,
+ *_,
+ page: Optional[int] = None,
+ lang: Optional[str] = None,
+ chat_id: Optional[int] = None,
+ **__,
+): # callback data = get_unsub_page|{page_number}
chat_id = chat_id or event.chat_id
callback_tail = get_callback_tail(event, chat_id)
if not page:
_, page = parse_callback_data_with_page(event.data)
- buttons = await inner.utils.get_sub_choosing_buttons(chat_id, page, callback='unsub',
- get_page_callback='get_unsub_page', lang=lang,
- tail=callback_tail)
+ buttons = await inner.utils.get_sub_choosing_buttons(
+ chat_id, page, callback='unsub',
+ get_page_callback='get_unsub_page', lang=lang, tail=callback_tail,
+ )
await event.edit(None if buttons else i18n[lang]['no_subscription'], buttons=buttons)
diff --git a/src/command/types.py b/src/command/types.py
new file mode 100644
index 0000000000..6ff29c67cb
--- /dev/null
+++ b/src/command/types.py
@@ -0,0 +1,59 @@
+# RSS to Telegram Bot
+# Copyright (C) 2024 Rongrong
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from __future__ import annotations
+from typing import Union
+
+from telethon import events
+from telethon.tl.patched import Message
+
+__all__ = [
+ 'TypeEventMsg', 'TypeEventMsgHint', 'TypeEventCb', 'TypeEventInline', 'TypeEventChatAction',
+ 'TypeEventCollectionAll', 'TypeEventCollectionMsgLike', 'TypeEventCollectionMsgOrCb',
+ 'TypeEventCollectionMsgOrChatAction'
+]
+
+# Has: respond(), reply(), edit(), delete(), get_reply_message()
+TypeEventMsg = events.NewMessage.Event
+TypeEventMsgHint = Union[events.NewMessage.Event, Message]
+# Has: respond(), reply(), edit(), delete(), answer()
+TypeEventCb = events.CallbackQuery.Event
+# Has: answer()
+TypeEventInline = events.InlineQuery.Event
+# Has: respond(), reply(), delete()
+# Note: `events.ChatAction.Event` only have ChatGetter, do not have SenderGetter like others
+TypeEventChatAction = events.ChatAction.Event
+
+# All have: get_chat(), get_input_chat()
+TypeEventCollectionAll = Union[
+ events.NewMessage.Event, Message, # Has: respond(), reply(), edit(), delete(), get_reply_message()
+ events.CallbackQuery.Event, # Has: respond(), reply(), edit(), delete(), answer()
+ events.InlineQuery.Event, # Has: answer()
+ events.ChatAction.Event, # Has: respond(), reply(), delete()
+]
+TypeEventCollectionMsgLike = Union[
+ events.NewMessage.Event, Message,
+ events.CallbackQuery.Event,
+ events.ChatAction.Event,
+]
+TypeEventCollectionMsgOrCb = Union[
+ events.NewMessage.Event, Message,
+ events.CallbackQuery.Event,
+]
+TypeEventCollectionMsgOrChatAction = Union[
+ events.NewMessage.Event, Message,
+ events.ChatAction.Event,
+]
diff --git a/src/command/utils.py b/src/command/utils.py
index a6d09dd6c2..319df597af 100644
--- a/src/command/utils.py
+++ b/src/command/utils.py
@@ -37,6 +37,7 @@
from .. import env, log, db, locks, errors_collection
from ..i18n import i18n
from . import inner
+from .types import *
from ..errors_collection import UserBlockedErrors
from ..compat import cached_async
@@ -136,12 +137,7 @@ def parse_customization_callback_data(
async def respond_or_answer(
- event: Union[
- events.NewMessage.Event, Message,
- events.CallbackQuery.Event,
- events.InlineQuery.Event,
- events.ChatAction.Event,
- ],
+ event: TypeEventCollectionAll,
msg: str,
alert: bool = True,
cache_time: int = 120,
@@ -160,12 +156,12 @@ async def respond_or_answer(
"""
with suppress(*UserBlockedErrors): # silently ignore
# noinspection PyProtectedMember
- if isinstance(event, events.CallbackQuery.Event) and not event._answered:
+ if isinstance(event, TypeEventCb) and not event._answered:
# answering callback query is of a tolerant rate limit, no lock needed
with suppress(QueryIdInvalidError): # callback query expired, respond instead
await event.answer(msg, alert=alert, cache_time=cache_time)
return # return if answering successfully
- elif isinstance(event, events.InlineQuery.Event):
+ elif isinstance(event, TypeEventInline):
# noinspection PyProtectedMember
if event._answered:
return
@@ -181,7 +177,7 @@ async def respond_or_answer(
**kwargs,
reply_to=(
event.message
- if isinstance(event, events.NewMessage.Event) and event.is_group
+ if isinstance(event, TypeEventMsg) and event.is_group
else None
),
)
@@ -272,12 +268,7 @@ def command_gatekeeper(
@wraps(func)
async def wrapper(
# Note: `events.ChatAction.Event` only have ChatGetter, do not have SenderGetter like others
- event: Union[
- events.NewMessage.Event, Message,
- events.CallbackQuery.Event,
- events.InlineQuery.Event,
- events.ChatAction.Event
- ],
+ event: TypeEventCollectionAll,
*args,
**kwargs,
):
@@ -293,9 +284,9 @@ async def wrapper(
chat_id = event.chat_id
flood_lock = locks.user_flood_lock(chat_id)
pending_callbacks = locks.user_pending_callbacks(chat_id)
- is_callback = isinstance(event, events.CallbackQuery.Event)
- is_inline = isinstance(event, events.InlineQuery.Event)
- is_chat_action = isinstance(event, events.ChatAction.Event)
+ is_callback = isinstance(event, TypeEventCb)
+ is_inline = isinstance(event, TypeEventInline)
+ is_chat_action = isinstance(event, TypeEventChatAction)
def describe_user():
chat_info = None
@@ -713,7 +704,7 @@ def __init__(
pattern=pattern,
)
- def filter(self, event: Union[events.NewMessage.Event, Message]):
+ def filter(self, event: TypeEventMsgHint):
document: types.Document = event.message.document
if not document:
return
@@ -756,7 +747,7 @@ def __init__(
pattern=pattern,
)
- async def __reply_verify(self, event: Union[events.NewMessage.Event, Message]):
+ async def __reply_verify(self, event: TypeEventMsgHint):
if event.is_reply:
reply_to_msg: Optional[Message] = await event.get_reply_message()
if reply_to_msg is not None and self.reply_to_peer_id == reply_to_msg.sender_id:
@@ -788,7 +779,7 @@ def __init__(
)
@staticmethod
- def __in_private_chat(event: Union[events.NewMessage.Event, Message]):
+ def __in_private_chat(event: TypeEventMsgHint):
return event.is_private
@@ -799,7 +790,7 @@ def __init__(self, chats=None, *, blacklist_chats=False):
super().__init__(chats, blacklist_chats=blacklist_chats, func=self.__added_to_group)
@staticmethod
- def __added_to_group(event: events.ChatAction.Event):
+ def __added_to_group(event: TypeEventChatAction):
if not event.is_group:
return False
if event.created:
@@ -864,7 +855,7 @@ async def set_bot_commands(
async def send_success_and_failure_msg(
- message: Union[Message, events.NewMessage.Event, events.CallbackQuery.Event],
+ message: TypeEventCollectionMsgOrCb,
success_msg: str,
failure_msg: str,
success_count: int,
@@ -873,7 +864,7 @@ async def send_success_and_failure_msg(
lang: Optional[str] = None,
edit: bool = False,
**__,
-) -> Union[Message, events.NewMessage.Event, events.CallbackQuery.Event]:
+) -> TypeEventCollectionMsgOrCb:
success_msg_raw = success_msg
failure_msg_raw = failure_msg
success_msg_short = (
@@ -920,10 +911,7 @@ def get_group_migration_help_msg(
def get_callback_tail(
- event: Union[
- events.NewMessage.Event, Message,
- events.CallbackQuery.Event,
- ],
+ event: TypeEventCollectionMsgOrCb,
chat_id: int,
) -> str:
if not event.is_private or event.chat.id == chat_id:
@@ -934,7 +922,7 @@ def get_callback_tail(
return f'%{ori_chat_id}' if ori_chat_id < 0 else f'%+{ori_chat_id}'
-async def check_sub_limit(event: Union[events.NewMessage.Event, Message], user_id: int, lang: Optional[str] = None):
+async def check_sub_limit(event: TypeEventMsgHint, user_id: int, lang: Optional[str] = None):
limit_reached, curr_count, limit, _ = await inner.utils.check_sub_limit(user_id)
if limit_reached:
logger.warning(f'Refused user {user_id} to add new subscriptions due to limit reached ({curr_count}/{limit})')