-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathloader.py
313 lines (269 loc) · 13.6 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import math
import random
import multiprocessing
from time import time
import openslide
from utils import *
import xml.dom.minidom as xd
from array import array
from collections import OrderedDict
class BasicLoader:
def __init__(self, slide_folder, save_folder,
target_size=400, ds_rate=0, n_procs=4, overlap=False, default_ol_sz=300,
rm_blank=True, blank_range=(200, 225), rm_black=True, black_thresh=10):
self.slide_folder = slide_folder
self.save_folder = save_folder
self.ds_rate = ds_rate
self.patch_size = target_size
self.n_procs = n_procs if 0 <= n_procs < 10 else 3
self.overlap = overlap
self.default_ol_sz = default_ol_sz
self.rm_blank = rm_blank
self.blank_range = blank_range
self.rm_black = rm_black
self.black_thresh = black_thresh
self.checks_io_folders()
self.slide_names_list = get_slide_names(slide_folder)
print(f' \nWSI Extractor')
print(f'{"-" * 20} \nFind {len(self.slide_names_list)} slides. \n{"-" * 20}')
def checks_io_folders(self):
if not check_path_valid(self.slide_folder, create=False):
raise FileNotFoundError('Folder does not exists.')
if not check_path_valid(self.save_folder, create=True):
print(f' Can not find save folder, will create {self.save_folder}.')
def get_rows_columns(self, width, height):
if width * height <= 0:
raise ValueError('Got wrong width and height value of slide.')
if self.overlap:
rows, columns = width // self.default_ol_sz, height // self.default_ol_sz
else:
rows, columns = width // self.default_ol_sz, height // self.default_ol_sz
return rows, columns
def slide_pointer_generator(self):
for slide_path in self.slide_names_list:
yield openslide.OpenSlide(slide_path), os.path.basename(slide_path)
def get_patch(self, pointer, start_rc):
patch = pointer.read_region(start_rc, self.ds_rate, (self.patch_size, self.patch_size))
return patch
class CamelyonXmlLoader(BasicLoader):
def __init__(self, xmls_folder, **kwargs):
super(CamelyonXmlLoader, self).__init__(**kwargs)
self.xml_folder = xmls_folder
self.xml_list = get_xml_list(xmls_folder)
assert check_xml_slide_align(xml_list=self.xml_list, slide_list=self.slide_names_list), \
f'could not find enough annotation .xml file(s) for .tif(f) slide(s)'
self.annotations = []
self.parse()
self.cate_xml_point_annotations()
@staticmethod
def get_elements_by_tag_name(node, name):
return node.getElementsByTagName(name) if node else None
@staticmethod
def get_attribute(node, name):
return node.getAttribute(name) if node else ''
def parse(self):
print(f' \nCamelyon XML Parser')
print(f'{"-" * 20} \nFind {len(self.xml_list)} xmls. \n{"-" * 20}')
for i, xml in enumerate(self.xml_list):
print(f'\nLoaded: {os.path.basename(xml)} ({i + 1}/{len(self.xml_list)})... ')
xml_regions = []
start_time = time()
obj = xd.parse(xml)
for annotation_obj in obj.getElementsByTagName('Annotation'):
obj_name = self.get_attribute(annotation_obj, 'Name')
obj_type = self.get_attribute(annotation_obj, 'Type')
obj_color = self.get_attribute(annotation_obj, 'Color')
x = array('f')
y = array('f')
for coordinate in self.get_elements_by_tag_name(annotation_obj.childNodes[1], 'Coordinate'):
x.append(float(self.get_attribute(coordinate, 'X')))
y.append(float(self.get_attribute(coordinate, 'Y')))
xml_regions.append(
OrderedDict({
'Name': obj_name,
'Type': obj_type,
'Color': obj_color,
'X': x,
'Y': y,
})
)
self.annotations.append((os.path.basename(xml), xml_regions))
print(f' Finished {os.path.basename(xml)}, used time: {time() - start_time: .2f} s')
print('\nXML Parse Done.\n')
def get_slide_size(self, slide_name):
slide_pointer = openslide.OpenSlide(os.path.join(self.slide_folder, slide_name + '.tif'))
width, height = slide_pointer.level_dimensions[self.ds_rate]
return width, height
def cate_xml_point_annotations(self):
# TODO: draw annotation lines by the coordinates parsed from the xml
for xml_name, regions in self.annotations:
width, height = self.get_slide_size(os.path.splitext(xml_name)[0])
rows, columns = self.get_rows_columns(width, height)
for region in regions:
x, y = region['X'], region['Y']
x_idx_bias, y_idx_bias = array('f'), array('f')
for e_x, e_y in zip(x, y):
x_idx_bias.append(1. * e_x / self.patch_size)
y_idx_bias.append(1. * e_y / self.patch_size)
a = 10
class TileSaving(BasicLoader):
def __init__(self, slide_folder, save_folder,
target_size=400, ds_rate=0, n_procs=4, overlap=False, default_ol_sz=300,
rm_blank=True, blank_range=(200, 225), rm_black=True, black_thresh=10):
super(TileSaving, self).__init__(
slide_folder=slide_folder,
save_folder=save_folder,
target_size=target_size,
ds_rate=ds_rate,
n_procs=n_procs,
overlap=overlap,
default_ol_sz=default_ol_sz,
rm_blank=rm_blank,
rm_black=rm_black,
blank_range=blank_range,
black_thresh=black_thresh
)
# The following variables will change with loop
self.restore_self_vars()
def restore_self_vars(self):
subclass_var_dict = ('sp', 'name', 'rows', 'columns',
'slide_save_path', 'ds_scale', 'time_flag')
for var in subclass_var_dict:
setattr(self, var, None)
def _slide_info(self):
time_flag = time()
print(f'\nLoaded slide: {self.name} ... ')
slide_width, slide_height = self.sp.level_dimensions[self.ds_rate]
rows, columns = self.get_rows_columns(slide_width, slide_height)
ds_scale = round(self.sp.level_downsamples[self.ds_rate])
print(f' width: {slide_width}, height: {slide_height}, \n'
f' rows: {rows}, columns: {columns}, \n'
f' ds scale: {ds_scale}, patch size: {self.patch_size}')
print(f' Creating save folder ... ', end=' ')
slide_save_path = os.path.join(
self.save_folder, f'{os.path.splitext(self.name)[0]}_{rows}_{columns}_x{ds_scale}_sz{self.patch_size}')
if type(self) is TileSaving:
check_path_valid(slide_save_path, create=True)
print('done')
return rows, columns, slide_save_path, ds_scale, time_flag
def _process_target(self, start_n, cols_per_process):
end_ncol = min(start_n + int(cols_per_process), self.rows)
for r in range(start_n, end_ncol):
for c in range(self.columns):
col_loc = r * self.columns + c
batch_id = col_loc // self.columns
batch_save_path = os.path.join(self.slide_save_path, f'batch{batch_id}')
check_path_valid(batch_save_path, create=True)
target_size = self.default_ol_sz if self.overlap else self.patch_size
start_rc = [target_size * r * self.ds_scale, target_size * c * self.ds_scale]
patch = self.get_patch(self.sp, start_rc)
patch_name = f'{os.path.splitext(self.name)[0]}_{batch_id}_{c}.jpg'
if self.rm_black and self.rm_blank:
check_valid_save_patch(patch, os.path.join(batch_save_path, patch_name),
self.black_thresh, self.blank_range)
else:
save_patch(patch, os.path.join(batch_save_path, patch_name))
print(f' Finished {start_n} - {end_ncol} columns, used time: {time() - self.time_flag :.2f} s.')
def tiling(self):
for sp, name in self.slide_pointer_generator():
self.sp, self.name = sp, name
self.rows, self.columns, self.slide_save_path, self.ds_scale, self.time_flag = self._slide_info()
cols_per_process = math.ceil(self.rows / self.n_procs)
process_start_ncol_list = [n_col for n_col in range(self.rows) if n_col % cols_per_process == 0]
if len(process_start_ncol_list) != self.n_procs:
assert len(process_start_ncol_list) < self.n_procs
print(f' self-Adaptive modification of n_procs, '
f'default: {self.n_procs} process, used: {len(process_start_ncol_list)} process, '
f'max {cols_per_process} columns per process.')
self.n_procs = len(process_start_ncol_list)
else:
print(' Using default settings, used %d process, %d columns per process.'
% (self.n_procs, cols_per_process))
assert len(process_start_ncol_list) == self.n_procs
p = [None] * self.n_procs
for proc in range(self.n_procs):
p[proc] = multiprocessing.Process(
target=self._process_target, args=(process_start_ncol_list[proc], cols_per_process,))
p[proc].start()
for proc in range(self.n_procs):
p[proc].join()
self.restore_self_vars()
class TestDataGenerator(TileSaving):
def __init__(self, slide_folder, target_size=400, ds_rate=0, n_procs=4, overlap=False, default_ol_sz=300,
rm_blank=True, blank_range=(200, 225), rm_black=True, black_thresh=10):
super(TestDataGenerator, self).__init__(
slide_folder=slide_folder,
save_folder=slide_folder, # save_folder must be a valid path though we not use it here
target_size=target_size,
ds_rate=ds_rate,
n_procs=n_procs,
overlap=overlap,
default_ol_sz=default_ol_sz,
rm_blank=rm_blank,
rm_black=rm_black,
blank_range=blank_range,
black_thresh=black_thresh
)
def _process_target(self, cols_list, cols_per_process=0):
for r in cols_list:
for c in range(self.columns):
target_size = self.default_ol_sz if self.overlap else self.patch_size
start_rc = [target_size * r * self.ds_scale, target_size * c * self.ds_scale]
patch = self.get_patch(self.sp, start_rc)
if self.rm_black and self.rm_blank:
if not check_patch_blank(patch, self.blank_range):
yield -2, r, c
elif not check_patch_black(patch, self.black_thresh):
yield -1, r, c
else:
yield patch, r, c
else:
yield patch, r, c
def get_patch_generator(self, mode, seed=None):
if not (mode == 'random' or mode == 'sequence'):
raise TypeError('mode: [random|sequence]')
for idx, (sp, name) in enumerate(self.slide_pointer_generator()):
self.sp, self.name = sp, name
self.rows, self.columns, self.slide_save_path, self.ds_scale, self.time_flag = self._slide_info()
perm = [i for i in range(self.rows)]
if mode == 'random':
if seed:
random.seed = seed
random.shuffle(perm)
yield self._process_target(perm), idx
if __name__ == '__main__':
slide_folder = os.path.join(os.curdir, 'data', 'slide')
mask_folder = os.path.join(os.curdir, 'data', 'annotation')
save_folder = os.path.join(os.curdir, 'data', 'patch')
# Get Camelyon mask patches from xml annotation
CamelyonXmlLoader(xmls_folder=mask_folder, slide_folder=slide_folder, save_folder=save_folder)
# Testing tiling
TileSaving(slide_folder, save_folder, n_procs=4, target_size=512, ds_rate=2, black_thresh=50).tiling()
# Testing generator
loader_sequence = TestDataGenerator(slide_folder).get_patch_generator(mode='sequence')
for patch_gen_seq, idx_seq in loader_sequence:
i, j, k = [], [], []
for patch_seq, c_seq, r_seq in patch_gen_seq:
if patch_seq == -1:
# patch is black
i.append([r_seq, c_seq])
elif patch_seq == -2:
# patch is blank
j.append([r_seq, c_seq])
else:
k.append([r_seq, c_seq])
break
loader_randomly = TestDataGenerator(slide_folder).get_patch_generator(mode='random')
for patch_gen_rad, idx in loader_randomly:
p, q, y = [], [], []
for patch_rad, c_rad, r_rad in patch_gen_rad:
if patch_rad == -1:
# patch is black
p.append([r_rad, c_rad])
elif patch_rad == -2:
# patch is blank
q.append([r_rad, c_rad])
else:
y.append([r_rad, c_rad])
break
print('ALL OK!')