-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.py
190 lines (143 loc) · 4.7 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import numpy as np
import cv2 as cv
import threading
def RoundUpPowerOf2(num):
# https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
num -= 1
k = 1
while True:
mask = num >> k
if not mask: break
num |= mask
k *= 2
num += 1
return num
def linear2gray(num):
return num ^ (num >> 1)
def gray2linear(num, nbits=64):
nbits = RoundUpPowerOf2(nbits)
while nbits >= 2:
nbits //= 2
num ^= num >> nbits
return num
def num2bits(num, nbits=64):
assert num >= 0
assert num.bit_length() <= nbits
result = [(num >> k) & 1 for k in range(nbits)]
# little endian, index = position
return result
def bits2num(bits):
return sum(bool(d) << p for p,d in enumerate(bits))
def upscale(factor, im):
h,w = im.shape[:2]
return cv.resize(im, (w*factor, h*factor), interpolation=cv.INTER_NEAREST)
def build_marker(shape, value):
(nrows, ncols) = shape
nbits = nrows * ncols
code = linear2gray(value)
code_bits = num2bits(code, nbits=nbits)
code_bits = np.array(code_bits, dtype=np.bool).reshape(shape)
im = np.zeros((4+nrows, 4+ncols), dtype=np.bool)
# quiet zone: black
im[1:-1, 1:-1] = 1 # white border
im[2:-2, 2:-2] = code_bits # content
return im
def decode_marker(samples):
# check border is white
assert samples[0,:].all() and samples[-1,:].all() and samples[:,0].all() and samples[:,-1].all()
(nrows, ncols) = samples.shape
nrows -= 2 # subtract border, check comes later
ncols -= 2
nbits = nrows * ncols
bits = samples[1:-1, 1:-1].flatten()
value = bits2num(bits)
value = gray2linear(value, nbits=nbits)
return value
def fixn(value, shift):
# drawing utility, opencv drawing functions only take integers,
# but they have a shift parameter for "fixed point" format
factor = 1<<shift
if isinstance(value, (int, float)):
return int(round(value * factor))
elif isinstance(value, (tuple, list)):
return tuple(int(round(v * factor)) for v in value)
elif isinstance(value, np.ndarray):
result = np.round(value * factor).astype(np.int)
if len(result.shape) == 1:
result = tuple(result)
# otherwise: see polylines
return result
else:
assert False, f"fixn() doesn't know how to deal with type {type(value)}: {value!r}"
def contour_sense(contour):
"signed sum of all angles. should be +- 2pi for single turns. use to determine if clockwise or ccw."
# sum angles. positive -> clockwise
# cross product of successive vectors
contour = contour.reshape((-1, 2)).astype(np.float32)
vectors = np.roll(contour, -1, axis=0) - contour
vectors /= np.linalg.norm(vectors, axis=1).reshape((-1, 1))
crossed = np.arcsin(np.cross(vectors, np.roll(vectors, -1, axis=0)))
return crossed.sum()
def rotate_topleft(contour):
distances = np.linalg.norm(contour, axis=(1,2))
shift = np.argmin(distances)
return np.roll(contour, -shift, axis=0)
#return np.vstack([
# contour[shift:],
# contour[:shift]
#])
# also acts (partly) like a cv.VideoCapture
class FreshestFrame(threading.Thread):
def __init__(self, capture, name='FreshestFrame'):
self.capture = capture
assert self.capture.isOpened()
# this lets the read() method block until there's a new frame
self.cond = threading.Condition()
# this allows us to stop the thread gracefully
self.running = False
# keeping the newest frame around
self.frame = None
# passing a sequence number allows read() to NOT block
# if the currently available one is exactly the one you ask for
self.latestnum = 0
# this is just for demo purposes
self.callback = None
super().__init__(name=name)
self.start()
def start(self):
self.running = True
super().start()
def release(self, timeout=None):
self.running = False
self.join(timeout=timeout)
self.capture.release()
def run(self):
counter = 0
while self.running:
# block for fresh frame
(rv, img) = self.capture.read()
assert rv
counter += 1
# publish the frame
with self.cond: # lock the condition for this operation
self.frame = img if rv else None
self.latestnum = counter
self.cond.notify_all()
if self.callback:
self.callback(img)
def read(self, wait=True, seqnumber=None, timeout=None):
# with no arguments (wait=True), it always blocks for a fresh frame
# with wait=False it returns the current frame immediately (polling)
# with a seqnumber, it blocks until that frame is available (if it even needs to wait)
# with timeout argument, may return an earlier frame;
# may even be (0,None) if nothing received yet
with self.cond:
if wait:
if seqnumber is None:
seqnumber = self.latestnum+1
if seqnumber < 1:
seqnumber = 1
rv = self.cond.wait_for(lambda: self.latestnum >= seqnumber, timeout=timeout)
if not rv:
return (self.latestnum, self.frame)
return (self.latestnum, self.frame)