-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspeedtest.py
72 lines (63 loc) · 2.58 KB
/
speedtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import threading, argparse, os
from urllib.parse import urlparse
import requests
from tqdm import tqdm
parser = argparse.ArgumentParser(
description="A Naive Speed Tester",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("-H", "--host", help="override hostname in SNI (e.g., a.speedtest.com)", type=str, default=None)
parser.add_argument("-i", "--ip", help="override ip address (e.g., 222.28.152.253)", type=str, default=None)
parser.add_argument("-n", "--num-threads", help="number of threads to use", type=int, default=1)
parser.add_argument("-u", "--url", help="url to test", type=str, default="https://changeme/client_app/download/pc_zip/20230916101725_v2XCKuuvzCVh3BdI/GenshinImpact_4.1.0.zip.001")
args = parser.parse_args()
dns_map = {}
def patch_dns():
import urllib3
from urllib3.util import connection
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
_orig_create_connection = connection.create_connection
def patched_create_connection(address, *args, **kwargs):
host, port = address
host = dns_map.get(host, host)
return _orig_create_connection((host, port), *args, **kwargs)
connection.create_connection = patched_create_connection
url = args.url
resp = requests.get(url, stream=True)
size = int(resp.headers["Content-Length"])
remote_addr = resp.raw._connection.sock.getpeername()
if args.ip:
remote_addr = (args.ip, remote_addr[1])
print("Testing {}".format(remote_addr))
resp.close()
factor = 8
sem = threading.Semaphore(value=args.num_threads)
bar = tqdm(total=size * factor, unit="B", unit_scale=True, unit_divisor=1024, smoothing=0)
parsed = urlparse(url)
dns_map[parsed.netloc] = remote_addr[0]
custom_domain = args.host
if custom_domain:
replaced = parsed._replace(netloc=parsed.netloc.replace(parsed.hostname, custom_domain))
dns_map[custom_domain] = remote_addr[0]
url = replaced.geturl()
def worker():
with sem:
try:
resp = requests.get(url, headers={
"host": parsed.hostname
}, stream=True, verify=False)
except requests.exceptions.SSLError:
print("\nSSL Error, please also try overriding the ip address")
os._exit(1)
for chunk in resp.iter_content(chunk_size=8192):
bar.update(len(chunk))
patch_dns()
T = [threading.Thread(target=worker, daemon=True) for i in range(factor)]
for t in T:
t.start()
try:
for t in T:
while t.is_alive():
t.join(0.1)
except KeyboardInterrupt:
pass