-
Notifications
You must be signed in to change notification settings - Fork 0
/
extract_new_scans.py
executable file
·195 lines (162 loc) · 5.84 KB
/
extract_new_scans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python
"""
Searches the given directory for zipped scans and extracts the scans to
a folder that matches the PatientName field of the dicom headers
(if such a directory exists).
If a given scan has already been extracted that scan will be skipped, so
this program is safe to run on a <scans_dir> that continuously updates with new
scans.
Usage:
extract_new_scans.py [options] <scans_dir> <extract_dir>
Arguments:
<scans_dir> Path to the folder containing all zipped
scans to extract (will skip scans that have already
been extracted to <extract_dir>)
<extract_dir> Location of parent directory for all the extracted scans.
Each scan to be extracted is expected to have a folder in
<extract_dir> matching the PatientName field of its
dicom headers (unless the --make-folders option is set).
Options:
--make-folders Create folders, if needed, that match the desired
path for the extracted scans. Folder permissions will be
set to 775
--add-path STR Appends STR to the path once a folder is found. The
resulting path for extraction will be of the form
<extract_dir>/<PatientName>/<STR>
--ignore-list FILE Path to a text file containing a list of paths to zip files
in scans_dir that should be ignored.
-v, --verbose Print messages to the terminal about which scans are
being extracted and to where
"""
from docopt import docopt
import os
import sys
import glob
import dicom as dcm
import zipfile
import tempfile
import shutil
import contextlib
VERBOSE = False
def error_message(msg, continue_exec=True):
print("ERROR: " + msg)
sys.stdout.flush()
if not continue_exec:
sys.exit(1)
def verbose_message(msg):
if VERBOSE:
print(msg)
def main():
global VERBOSE
arguments = docopt(__doc__)
scans_dir = arguments['<scans_dir>']
extract_dir = arguments['<extract_dir>']
make_dirs = arguments['--make-folders']
path_ext = arguments['--add-path']
ignore = arguments['--ignore-list']
VERBOSE = arguments['--verbose']
if path_ext is None:
path_ext = ""
if ignore is not None:
ignore_file = sanitize_path(ignore)
ignore_list = get_ignored_files(ignore_file)
else:
ignore_list = []
scans_dir = sanitize_path(scans_dir)
extract_dir = sanitize_path(extract_dir)
for scan in glob.glob("{}/*.zip".format(scans_dir)):
if scan in ignore_list:
continue
scan_id = get_scan_id(scan)
if scan_id is None:
# Not a folder containing dicoms, skip it.
continue
output_path = os.path.join(extract_dir, scan_id, path_ext)
if not already_extracted(scan, output_path):
verbose_message("Found new scan: {}".format(scan))
extract_scan(scan, output_path, make_dirs)
def sanitize_path(user_path):
"""
Ensures an absolute and normalized path is always used so path dependent
functions don't mysteriously fail
os.path.abspath is not used, because symbolic links may cause a broken
path to be generated.
"""
curr_path = os.environ['PWD']
abs_path = os.path.join(curr_path, user_path)
clean_path = os.path.normpath(abs_path)
return clean_path
def get_ignored_files(list_path):
scan_list = []
try:
with open(list_path, 'r') as files:
for scan in files:
scan_list.append(scan.strip('\n'))
except:
error_message("Cannot read ignore list. Check the path and ensure each"\
" file in the list is on a separate line")
return scan_list
def get_scan_id(scan):
"""
Checks the contents of the zipped scan and returns the value of the
PatientName field for the first dicom found. Returns None if no dicoms
are found.
"""
scan_id = None
try:
with read_zip(scan) as zip_scan:
for item in zip_scan.namelist():
if ".dcm" in item:
scan_id = read_patient_name(zip_scan, item)
break
except:
verbose_message("{} is not a readable zipfile".format(os.path.basename(scan)))
return scan_id
@contextlib.contextmanager
def read_zip(zip_file):
open_zip = zipfile.ZipFile(zip_file, 'r')
try:
yield open_zip
finally:
open_zip.close()
def read_patient_name(zip_scan, dicom):
"""
Returns the PatientName field of the given dicom
"""
with make_temp_dir() as tmp:
zip_scan.extract(dicom, tmp)
image = os.path.join(tmp, dicom)
dicom = dcm.read_file(image)
name = dicom.PatientName
return name
@contextlib.contextmanager
def make_temp_dir():
temp = tempfile.mkdtemp()
try:
yield temp
finally:
shutil.rmtree(temp)
def already_extracted(scan, target_dir):
fname = os.path.basename(scan)
unzip_name = os.path.splitext(fname)[0]
extract_loc = os.path.join(target_dir, unzip_name)
if os.path.isdir(extract_loc):
return True
return False
def extract_scan(scan, output_path, make_dirs):
if make_dirs:
try:
os.makedirs(output_path)
os.chmod(output_path, 0770)
except:
error_message("Cannot make {}".format(output_path))
if os.path.isdir(output_path):
verbose_message("extracting {} to {}".format(os.path.basename(scan),
output_path))
with read_zip(scan) as zip_scan:
zip_scan.extractall(output_path)
else:
error_message("{} doesn't exist. Cannot extract {}".format(output_path,
os.path.basename(scan)))
if __name__ == '__main__':
main()