-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
111 lines (93 loc) · 4 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import asyncio
from functools import partial
from telethon import TelegramClient
from aiohttp import web, ClientSession
import ujson as json
from typing import TypedDict, TYPE_CHECKING
from checkURL import check_url
from resolveUsername import endpoint
from api_keys import api_id, api_hash
from log import send_counter
import textRoutes
if TYPE_CHECKING:
from typing import Mapping
import logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", filename="log.log"
)
# this is the amounts of clients you want to initialize. The higher, the more you can migiate flood wait, because the
# code will switch to the next
CLIENTS = 1
# x will be used to get up to client
x = 0
# This will be used to make requests to telegram's API. We throw the clients in a list
clients: list[TelegramClient] = []
# in this loop we add the unique clients to the dict
while x != CLIENTS:
clients.append(TelegramClient("session_" + str(x), api_id, api_hash))
x += 1
# This is the type hinted layout of the temp storage, so mypy can use this to do its type checking
class Username(TypedDict):
bio: str
chat_id: int
chat_type: str
first_name: str
last_name: str
# the cache is just this json file. With this and scraping the telegram website, we can do less requests to the API
# if the website and our temp storage are the same, we dont need to renew it with an API call
cache: "Mapping[str, Username]" = json.load(open("cache.json", "rb"))
# this creates a usable session. You only want to do this once in order to benefit from collection pooling
async def session_creator() -> ClientSession:
return ClientSession()
# this saves the temp storage dict to the json file every hour. if that breaks nothing important is lost
async def save() -> None:
# the while loop takes care that the saving never stops :D
while True:
# this opens the file for writing as a file
with open("cache.json", "w") as outfile:
# and here it gets dumped, with the indent of 4 and sorted keys, so its nice to look at
json.dump(cache, outfile, indent=4, sort_keys=True)
# and here this sleeps for an hour (60 minutes * 60 seconds
await asyncio.sleep(60 * 60)
# this gets the event loop, in order for us to register/call functions in it
loop = asyncio.get_event_loop()
# first, we have to create the session, so we can pass it on later. Remember, you only want one
session = loop.run_until_complete(session_creator())
# the app is the initiated web application
app = web.Application()
# here we add the router to each URL we want to support. every URL gets passed the check function first, which make
# sure all expected parameters exists, and then makes sure the api_key is allowed, if it is present. Then it reroutes
# the request to the route_to function, and passes on cache, client, and session. I wasn't able to directly
# import it because of circular imports, and this is the reason I went with partial, maybe someone can improve this
# later
app.router.add_get(
"/resolveUsername",
partial(
check_url,
expected_parameters=["api_key", "username"],
route_to=endpoint,
clients=clients,
cache=cache,
session=session,
),
)
# these two handlers are text only, they don't need the checker
app.router.add_get("/", textRoutes.index)
app.router.add_get("/api_doc", textRoutes.api_documentation)
# the runner gets initiated
runner = web.AppRunner(app)
# and set up
loop.run_until_complete(runner.setup())
# this defines the site which is supposed to run
site = web.TCPSite(runner, host="localhost", port=1234)
# and here the site gets started
loop.run_until_complete(site.start())
# this connects the client to telegram
for c_client in clients:
c_client.start()
# this task sends a log for how many calls each api key did, every now and then (an hour right now
loop.create_task(send_counter(clients[0]))
# the save task gets created here
loop.create_task(save())
# and this is the final call which runs forever.
loop.run_forever()