This repository has been archived by the owner on Jul 17, 2021. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathtwitter-dl.py
224 lines (165 loc) · 7.39 KB
/
twitter-dl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#!/usr/bin/env python
import argparse
import requests
import json
import urllib.parse
import m3u8
from pathlib import Path
import re
import ffmpeg
import shutil
import copy
class TwitterDownloader:
"""
tw-dl offers the ability to download videos from Twitter feeds.
**Disclaimer** I wrote this to recover a video for which the original was lost. Consider copyright before downloading
content you do not own.
"""
video_player_prefix = 'https://twitter.com/i/videos/tweet/'
video_api = 'https://api.twitter.com/1.1/videos/tweet/config/'
tweet_data = {}
def __init__(self, tweet_url, output_dir='./output', target_width=0, debug=0):
self.tweet_url = tweet_url
self.output_dir = output_dir
self.target_width = int(target_width)
self.debug = debug
if debug > 2:
self.debug = 2
"""
We split on ? to clean up the URL. Sharing tweets, for example,
will add ? with data about which device shared it.
The rest is just getting the user and ID to work with.
"""
self.tweet_data['tweet_url'] = tweet_url.split('?', 1)[0]
self.tweet_data['user'] = self.tweet_data['tweet_url'].split('/')[3]
self.tweet_data['id'] = self.tweet_data['tweet_url'].split('/')[5]
output_path = Path(output_dir)
storage_dir = output_path / self.tweet_data['user'] / self.tweet_data['id']
Path.mkdir(storage_dir, parents=True, exist_ok=True)
self.storage = str(storage_dir)
self.requests = requests.Session()
def download(self):
self.__debug('Tweet URL', self.tweet_data['tweet_url'])
# Get the bearer token
token = self.__get_bearer_token()
# Get the M3u8 file - this is where rate limiting has been happening
video_host, playlist = self.__get_playlist(token)
if playlist.is_variant:
if self.target_width == 0:
print('[+] Multiple resolutions found. Slurping all resolutions.')
else:
print('[+] Multiple resolutions found. Selecting the one closest to target width of ' + str(self.target_width))
playlist = self.__filter_playlist(playlist)
for plist in playlist.playlists:
resolution = str(plist.stream_info.resolution[0]) + 'x' + str(plist.stream_info.resolution[1])
resolution_file = Path(self.storage) / Path(resolution + '.mp4')
print('[+] Downloading ' + resolution)
playlist_url = video_host + plist.uri
ts_m3u8_response = self.requests.get(playlist_url, headers = {'Authorization': None})
ts_m3u8_parse = m3u8.loads(ts_m3u8_response.text)
ts_list = []
ts_full_file_list = []
for ts_uri in ts_m3u8_parse.segments.uri:
# ts_list.append(video_host + ts_uri)
ts_file = requests.get(video_host + ts_uri)
fname = ts_uri.split('/')[-1]
ts_path = Path(self.storage) / Path(fname)
ts_list.append(ts_path)
ts_path.write_bytes(ts_file.content)
ts_full_file = Path(self.storage) / Path(resolution + '.ts')
ts_full_file = str(ts_full_file)
ts_full_file_list.append(ts_full_file)
# Shamelessly taken from https://stackoverflow.com/questions/13613336/python-concatenate-text-files/27077437#27077437
with open(str(ts_full_file), 'wb') as wfd:
for f in ts_list:
with open(f, 'rb') as fd:
shutil.copyfileobj(fd, wfd, 1024 * 1024 * 10)
for ts in ts_full_file_list:
print('\t[*] Doing the magic ...')
ffmpeg\
.input(ts)\
.output(str(resolution_file), acodec = 'copy', vcodec = 'libx264', format = 'mp4', loglevel = 'error')\
.overwrite_output()\
.run()
print('\t[+] Doing cleanup')
for ts in ts_list:
p = Path(ts)
p.unlink()
for ts in ts_full_file_list:
p = Path(ts)
p.unlink()
else:
print('[-] Sorry, single resolution video download is not yet implemented. Please submit a bug report with the link to the tweet.')
def __get_bearer_token(self):
video_player_url = self.video_player_prefix + self.tweet_data['id']
video_player_response = self.requests.get(video_player_url).text
self.__debug('Video Player Body', '', video_player_response)
js_file_url = re.findall('src="(.*js)', video_player_response)[0]
js_file_response = self.requests.get(js_file_url).text
self.__debug('JS File Body', '', js_file_response)
bearer_token_pattern = re.compile('Bearer ([a-zA-Z0-9%-])+')
bearer_token = bearer_token_pattern.search(js_file_response)
bearer_token = bearer_token.group(0)
self.requests.headers.update({'Authorization': bearer_token})
self.__debug('Bearer Token', bearer_token)
self.__get_guest_token()
return bearer_token
def __get_playlist(self, token):
player_config_req = self.requests.get(self.video_api + self.tweet_data['id'] + '.json')
player_config = json.loads(player_config_req.text)
if 'errors' not in player_config:
self.__debug('Player Config JSON', '', json.dumps(player_config))
m3u8_url = player_config['track']['playbackUrl']
else:
self.__debug('Player Config JSON - Error', json.dumps(player_config['errors']))
print('[-] Rate limit exceeded. Could not recover. Try again later.')
sys.exit(1)
# Get m3u8
m3u8_response = self.requests.get(m3u8_url)
self.__debug('M3U8 Response', '', m3u8_response.text)
m3u8_url_parse = urllib.parse.urlparse(m3u8_url)
video_host = m3u8_url_parse.scheme + '://' + m3u8_url_parse.hostname
m3u8_parse = m3u8.loads(m3u8_response.text)
return [video_host, m3u8_parse]
"""
Thanks to @devkarim for this fix: https://github.com/h4ckninja/twitter-video-downloader/issues/2#issuecomment-538773026
"""
def __get_guest_token(self):
res = self.requests.post("https://api.twitter.com/1.1/guest/activate.json")
res_json = json.loads(res.text)
self.requests.headers.update({'x-guest-token': res_json.get('guest_token')})
def __filter_playlist(self, playlist):
# Make a copy of the playlist object and reset 'playlists' member
new_playlist = copy.deepcopy(playlist)
new_playlist.playlists = []
# Arbitrary high number that any resolution will beat
min_dist_2_target = 100000
for instance in playlist.playlists:
# Calculate how far the width of considered resolution is from our target
dist_2_target = abs(instance.stream_info.resolution[0] - self.target_width)
if dist_2_target < min_dist_2_target:
min_dist_2_target = dist_2_target
# Replace the only item of new_playlist with this one
new_playlist.playlists = []
new_playlist.playlists.append(instance)
return new_playlist
def __debug(self, msg_prefix, msg_body, msg_body_full = ''):
if self.debug == 0:
return
if self.debug == 1:
print('[Debug] ' + '[' + msg_prefix + ']' + ' ' + msg_body)
if self.debug == 2:
print('[Debug+] ' + '[' + msg_prefix + ']' + ' ' + msg_body + ' - ' + msg_body_full)
if __name__ == '__main__':
import sys
if sys.version_info[0] == 2:
print('Python3 is required.')
sys.exit(1)
parser = argparse.ArgumentParser()
parser.add_argument('tweet_url', help='The video URL on Twitter (https://twitter.com/<user>/status/<id>).')
parser.add_argument('-o', '--output', dest='output', default='./output', help='The directory to output to. The structure will be: <output>/<user>/<id>.')
parser.add_argument('-d', '--debug', default=0, action='count', dest='debug', help='Debug. Add more to print out response bodies (maximum 2).')
parser.add_argument('-w', '--target_width', dest='target_width', default=0, help='In pixels. Download only the video resolution closest to this value')
args = parser.parse_args()
twitter_dl = TwitterDownloader(args.tweet_url, output_dir=args.output, target_width=args.target_width, debug=args.debug)
twitter_dl.download()