-
Notifications
You must be signed in to change notification settings - Fork 0
/
crop.py
310 lines (227 loc) · 9.49 KB
/
crop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import torchvision.transforms as transforms
import numpy as np
import torch
import random
import subprocess as sp
import os
import cv2
import glob
import numpy as np
import itertools
import tqdm
CROP_SIZE = [
#256,
416,
512,
#704,
832,
#960,
]
OVERLAP = 0.5
def get_env_params():
RPI_IM_DIR = os.environ['RPI_IMAGE_DIR']
RPI_SEG_DIR = os.environ['RPI_SEG_DIR']
RPI_ANNOT_FN = os.environ['RPI_ANNOT_FN']
TX2_IM_DIR = os.environ['TX2_IMAGE_DIR']
TX2_SEG_DIR = os.environ['TX2_SEG_DIR']
TX2_ANNOT_FN = os.environ['TX2_ANNOT_FN']
UNSEEN_RPI_IMAGE_DIR = os.environ["UNSEEN_RPI_IMAGE_DIR"]
UNSEEN_RPI_SEG_DIR = os.environ["UNSEEN_RPI_SEG_DIR"]
UNSEEN_TX2_IMAGE_DIR = os.environ["UNSEEN_TX2_IMAGE_DIR"]
UNSEEN_TX2_SEG_DIR = os.environ["UNSEEN_TX2_SEG_DIR"]
OUTPUT_BASE_DIR = os.environ['CROPS_OUTPUT_DIR']
sources = {
#'rpi': (RPI_IM_DIR, RPI_SEG_DIR),
#'tx2': (TX2_IM_DIR, TX2_SEG_DIR),
'rpi_unseen': (UNSEEN_RPI_IMAGE_DIR, UNSEEN_RPI_SEG_DIR),
'tx2_unseen': (UNSEEN_TX2_IMAGE_DIR, UNSEEN_TX2_SEG_DIR)
}
params = {
'sources': sources,
'OUTPUT_BASE_DIR': OUTPUT_BASE_DIR
}
return params
def main(filter_empty: bool):
env_params = get_env_params()
sources = env_params['sources']
OUTPUT_BASE_DIR = env_params['OUTPUT_BASE_DIR']
params = itertools.product(CROP_SIZE, sources.keys())
for crop_size, dataset in params:
print(crop_size, dataset)
im_dir, seg_dir = sources[dataset]
crop_dataset(im_dir=im_dir,
seg_dir=seg_dir,
out_dir=os.path.join(OUTPUT_BASE_DIR, str(crop_size), dataset),
crop_size=crop_size,
overlap=OVERLAP,
filter_empty=filter_empty)
def crop_dataset(im_dir, seg_dir, out_dir, crop_size, overlap, filter_empty: bool = True):
# set and create output directories
crop_seg_dir = os.path.join(out_dir, "seg")
crop_im_dir = os.path.join(out_dir, "images")
os.makedirs(crop_seg_dir, exist_ok=True)
os.makedirs(crop_im_dir, exist_ok=True)
seg_fns = glob.glob(os.path.join(seg_dir, "*"))
seg_id_fns = {os.path.basename(fn).split('.')[0]: fn for fn in seg_fns}
im_fns = glob.glob(os.path.join(im_dir, "*"))
im_id_fns = {os.path.basename(fn).split('.')[0]: fn for fn in im_fns}
n_deleted = 0
n_crops = 0
for seg_id, seg_fn in tqdm.tqdm(seg_id_fns.items()):
# get corresponding original image path
im_fn = im_id_fns[seg_id]
# load original image and segmentation
im_im = cv2.imread(im_fn)
seg_im = cv2.imread(seg_fn)
# if input segmentation is empty, then no area of interest exists, skip image
if filter_empty:
if seg_im.sum() == 0:
continue
# get crops
im_crops = crop_image(im_im, crop_size=crop_size, overlap=overlap)
seg_crops = crop_image(seg_im, crop_size=crop_size, overlap=overlap)
# write crops
for i, (seg_crop, im_crop) in enumerate(zip(seg_crops, im_crops)):
# filter_empty is set and sum of all segmentation pixels is 0, then no annotation is present
if filter_empty:
if seg_crop.sum() == 0:
n_deleted += 1
continue
# write crops
out_seg_crop_fn = os.path.join(crop_seg_dir, f"{seg_id}_{i}.png")
out_im_crop_fn = os.path.join(crop_im_dir, f"{seg_id}_{i}.png")
cv2.imwrite(out_seg_crop_fn, seg_crop)
cv2.imwrite(out_im_crop_fn, im_crop)
n_crops += 1
print(f'total crops: {n_crops}\tremoved: {n_deleted}')
def crop_dataset_core(image_mask_pairs_fns, out_dir, crop_size, overlap, filter_empty: bool = True):
# set and create output directories
crop_seg_dir = os.path.join(out_dir, "seg")
crop_im_dir = os.path.join(out_dir, "images")
os.makedirs(crop_seg_dir, exist_ok=True)
os.makedirs(crop_im_dir, exist_ok=True)
n_deleted = 0
n_crops = 0
crop_fns = [] # return list
# iterate over all (mask, image) pair
for im_fn, seg_fn in tqdm.tqdm(image_mask_pairs_fns):
crop_id = os.path.basename(im_fn).split('.')[0]
# load original image and segmentation
im_im = cv2.imread(im_fn)
seg_im = cv2.imread(seg_fn)
# if input segmentation is empty, then no area of interest exists, skip image
if filter_empty:
if seg_im.sum() == 0:
continue
# get crops
im_crops = crop_image(im_im, crop_size=crop_size, overlap=overlap)
seg_crops = crop_image(seg_im, crop_size=crop_size, overlap=overlap)
# write crops
for i, (seg_crop, im_crop) in enumerate(zip(seg_crops, im_crops)):
# filter_empty is set and sum of all segmentation pixels is 0, then no annotation is present
if filter_empty:
if seg_crop.sum() == 0:
n_deleted += 1
continue
# write crops
cv2.imwrite((out_seg_crop_fn := os.path.join(crop_seg_dir, f"{crop_id}_{i}.png")),
seg_crop)
cv2.imwrite((out_im_crop_fn := os.path.join(crop_im_dir, f"{crop_id}_{i}.png")),
im_crop)
crop_fns.append(out_im_crop_fn, out_seg_crop_fn)
n_crops += 1
print(f'total crops: {n_crops}\tremoved: {n_deleted}')
return crop_fns
def crop_image(im: np.ndarray, crop_size: int = (512, 512), overlap: float = 0.1):
if isinstance(crop_size, tuple):
cw, ch = crop_size
elif isinstance(crop_size, int):
cw = ch = crop_size
else:
raise TypeError(f'type of crop size must be tuple (width, height) or int')
h, w, _ = im.shape
# if crop size bigger than image, return image
if cw >= w and ch >= h:
return [im]
# overlap in pixels
overlap_w = int(overlap * cw)
overlap_h = int(overlap * ch)
step_h, step_w = ch - overlap_h, cw - overlap_w
lr_start_w = range(0, w-cw, step_w) # left to right
td_start_h = range(0, h-ch, step_h) # top to down
coords = itertools.product(lr_start_w, td_start_h)
crops = [im[h_:h_+ch, w_:w_+cw, :] for w_, h_ in coords]
return crops
def torchvision_transform():
seed = np.random.randint(2147483647)
random.seed(67280421310721)
torch.manual_seed(67280421310721)
def ext_script():
# get env parameters
env_params = get_env_params()
sources = env_params['sources']
OUTPUT_BASE_DIR = env_params['OUTPUT_BASE_DIR']
# create crops
for crop_size in CROP_SIZE:
crop_size = 832
raise NotImplementedError()
for dataset, (imdir, _, annot_fn) in sources.items():
dataset_output_dir = os.path.join(OUTPUT_BASE_DIR, str(crop_size), dataset)
external_crop(imdir, annot_fn, dataset_output_dir, crop_size)
# filter images with not seg
seg_files = glob.glob(os.path.join(dataset_output_dir, "seg", "*"))
n_deleted = 0
for im_fn in seg_files:
im = cv2.imread(im_fn)
# if no annotation, then delete
if im.sum() == 0:
# remove segmentation crop
os.remove(im_fn)
# remove original crop
im_basename = os.path.basename(im_fn)
os.remove(os.path.join(dataset_output_dir, "images", im_basename))
n_deleted += 1
print(f"\tdeleted {n_deleted} crops")
def external_crop(input_dir, annot_json_fn, output_base_dir, crop_size, overlap=0.1):
script_path = os.environ['SCRIPT_PATH']
# output dirs
crop_out_im_dir = os.path.join(output_base_dir, "images")
crop_out_seg_dir = os.path.join(output_base_dir, "seg")
# create directories if necessary
os.makedirs(crop_out_im_dir, exist_ok=True)
os.makedirs(crop_out_seg_dir, exist_ok=True)
cmd = f"python {script_path} {input_dir} {annot_json_fn} {crop_out_im_dir} {crop_out_seg_dir} {crop_size} {crop_size} {overlap}"
print(cmd)
p = sp.Popen(cmd.split(" "))
p.wait()
def masks_for_unlisted_images(im_dir, seg_dir):
"""
This function checks if there are any images that don't have a corresponding mask in the mask (seg) directory;
if there are, an empty mask is created with the same name in the seg_dir
"""
# read all images and masks
seg_fns = glob.glob(os.path.join(seg_dir, "*"))
seg_id_fns = {os.path.basename(fn).split('.')[0]: fn for fn in seg_fns}
im_fns = glob.glob(os.path.join(im_dir, "*"))
im_id_fns = {os.path.basename(fn).split('.')[0]: fn for fn in im_fns}
for im_id, im_path in im_id_fns.items():
if im_id not in seg_id_fns.keys():
# write empty mask
im = cv2.imread(im_path)
# make the image all zeros = empty mask
im[:,:,:] = 0
# write mask
seg_path = os.path.join(seg_dir, f"{im_id}.png")
cv2.imwrite(seg_path, im)
print(f"creating mask for {seg_path}")
if __name__ == '__main__':
#ext_script()
action = 'crop'
if action == 'crop':
main(filter_empty=True)
elif action == 'create_missing_masks':
env_params = get_env_params()
sources = env_params['sources']
OUTPUT_BASE_DIR = env_params['OUTPUT_BASE_DIR']
for _, (im_dir, seg_dir) in sources.items():
masks_for_unlisted_images(im_dir, seg_dir)