-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimemachine.py
232 lines (186 loc) · 8.43 KB
/
timemachine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# coded in part with https://claude.ai/chat/2fba7438-0e73-41d5-bd98-313e5d0a57cc
import datetime
import requests
from thumbnail_api import Rectangle
import math
import numpy as np
from video_decoder import decode_video_frames
CAMERAS = {
"Clairton Coke Works": "clairton4",
"Shell Plastics West": "vanport3",
"Edgar Thomson South": "westmifflin2",
"Metalico": "accan2",
"Revolution ETC/Harmon Creek Gas Processing Plants": "cryotm",
"Riverside Concrete": "cementtm",
"Shell Plastics East": "center1",
"Irvin": "irvin1",
"North Shore": "heinz",
"Mon. Valley": "walnuttowers1",
"Downtown": "trimont1",
"Oakland": "oakland"
}
class TimeMachine:
def __init__(self, root_url: str):
self.root_url = root_url
self.tm_url = f"{root_url}/tm.json"
print(f"Fetching {self.tm_url}")
self.tm = requests.get(self.tm_url).json()
datasets = self.tm['datasets']
assert(len(datasets) == 1)
dataset = datasets[0]
id = dataset['id']
self.tile_root_url = f"{root_url}/{id}"
self.r_url = f"{self.tile_root_url}/r.json"
print(f"Fetching {self.r_url}")
self.r = requests.get(self.r_url).json()
print(f'TimeMachine has {self.r["nlevels"]} levels and {len(self.capture_times())} frames')
@staticmethod
def download(
location: str,
date: datetime.date,
time: datetime.time,
frames: int,
rect: Rectangle,
subsample: int) np.ndarray:
"""
Downloads a video for the given tmera location, date and time.
Parameters:
---
* location - Location of the tmera, refer to `CAMERAS` for valid locations.
* date - The day of the video
* time - The time to start the capture
* frames - The number of frames to capture
* rect - The view to capture
* subsample - The subsample of the produced video
Returns:
---
If frames is 1, a numpy array of dimensions width*height*4. If frames
is greater than 1, a numpy array of dimensions frames*width*height*4.
"""
if date is None:
raise Exception("Date not set.")
if time is None:
raise Exception("Time not set.")
date_str = date.strftime("%Y-%m-%d")
time_str = get_time(time)
start_time = f"{date_str} {time_str.strftime('%H:%M:%S')}"
url = f"{BASE_URL}/{CAMERAS[location]}/{date_str}.timemachine"
tm = TimeMachine(url)
start_frame = tm.frame_from_date(start_time)
if start_frame < 0:
raise Exception("First frame invalid.")
return None
remaining_frames = len(tm.capture_times()) - start_frame
if remaining_frames < frames:
frames = remaining_frames
video = tm.download_video(start_frame, frames, view, subsample)
opacity = np.full((video.shape[0], video.shape[1], video.shape[2], 1), 255, dtype=video.dtype)
video = np.concatenate((video, opacity), axis=3) / 255.0
if frames == 1:
return video[0]
else:
return video
def download_video(self, start_frame_no: int, nframes: int, rect: Rectangle, subsample:int=1):
"""
Download and assemble video tiles into a single numpy array.
Args:
start_frame_no: Starting frame number
nframes: Number of frames to download
rect: Rectangle coordinates after subsampling
subsample: Subsample factor
Returns:
numpy.ndarray: Array of shape (nframes, height, width, 3) containing the video data
"""
level = self.level_from_subsample(subsample)
level_width = self.width(subsample)
level_height = self.height(subsample)
# Create output array to hold the final video
result = np.zeros((nframes, rect.height, rect.width, 3), dtype=np.uint8)
# Compute the tiles that intersect the rectangle
min_tile_y = rect.y1 // self.tile_height()
max_tile_y = 1 + (rect.y2 - 1) // self.tile_height()
min_tile_x = rect.x1 // self.tile_width()
max_tile_x = 1 + (rect.x2 - 1) // self.tile_width()
for tile_y in range(min_tile_y, max_tile_y):
for tile_x in range(min_tile_x, max_tile_x):
tile_url = self.tile_url(level, tile_x, tile_y)
# Check if tile exists
response = requests.head(tile_url)
if response.status_code == 404:
print(f"Warning: tile {tile_x},{tile_y} does not exist, skipping")
continue
# Calculate tile and intersection rectangles
tile_rectangle = Rectangle(
tile_x * self.tile_width(),
tile_y * self.tile_height(),
(tile_x + 1) * self.tile_width(),
(tile_y + 1) * self.tile_height()
)
intersection = rect.intersection(tile_rectangle)
assert intersection is not None, f"Tile {tile_x},{tile_y} does not intersect rectangle {rect}"
# Calculate source and destination rectangles
src_rect = intersection.translate(-tile_rectangle.x1, -tile_rectangle.y1)
dest_rect = intersection.translate(-rect.x1, -rect.y1)
print(f"Fetching {tile_url}")
print(f"From tile {tile_url}, copying {src_rect} to destination {dest_rect}")
try:
# Download the tile video
frames, metadata = decode_video_frames(
video_url=tile_url,
start_frame=start_frame_no,
n_frames=nframes
)
# Copy the intersection region to the result array
result[:,
dest_rect.y1:dest_rect.y2,
dest_rect.x1:dest_rect.x2,
:] = frames[:,
src_rect.y1:src_rect.y2,
src_rect.x1:src_rect.x2,
:]
except Exception as e:
print(f"Error processing tile {tile_url}: {str(e)}")
continue
return result
# tile_x and tile_y are in tile coordinates / 4
def tile_url(self, level:int, tile_x:int, tile_y:int):
return f"{self.tile_root_url}/{level}/{tile_y*4}/{tile_x*4}.mp4"
def level_from_subsample(self, subsample:int) -> int:
log2_subsample = math.log2(subsample)
assert(log2_subsample.is_integer())
# Find level_info for subsample
level_number = round(len(self.level_info()) - 1 - log2_subsample)
print(f"Subsample {subsample} corresponds to level {level_number}")
assert level_number >= 0, f"Subsample {subsample} too high for timemachine of {len(self.level_info())} levels (max subsample {self.max_subsample()})"
return level_number
def subsample_from_level(self, level:int) -> int:
return 2 ** (len(self.level_info()) - 1 - level)
def max_subsample(self) -> int:
return self.subsample_from_level(0)
# Convenience accessors for tm and r
def capture_times(self):
return self.tm["capture-times"]
def level_info(self):
return self.r["level_info"]
def fps(self):
return self.r["fps"]
def width(self, subsample:int=1):
return int(math.ceil(self.r["width"]/subsample))
def height(self, subsample:int=1):
return int(math.ceil(self.r["height"]/subsample))
def tile_width(self):
return self.r["video_width"]
def tile_height(self):
return self.r["video_height"]
def info(self):
print(f"TimeMachine root: {self.root_url}")
print(f"Tile root: {self.tile_root_url}")
print(f"Capture times: {self.capture_times()}")
print(f"Level info: {self.level_info()}")
print(f"FPS: {self.fps()}")
print(f"Width: {self.width()}")
print(f"Height: {self.height()}")
print(f"Tile width: {self.tile_width()}")
print(f"Tile height: {self.tile_height()}")
print(f"r: {self.r}")
print(f"tm: {self.tm}")