-
Notifications
You must be signed in to change notification settings - Fork 0
/
unzip_documents.py
63 lines (53 loc) · 2.34 KB
/
unzip_documents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# This is the first stage of the pipeline - unzip the Olive Document.zip files inplace
import argparse
from glob import glob
import os
import shutil
from typing import Any
import zipfile
from os.path import exists
from tqdm import tqdm
import logging
from utilities import setup_parser, setup_logging
flags = os.O_CREAT | os.O_RDWR | 0x8000 | 0x400000
# Unzip the files in `zip`.
# We do not use ZipFile.extract_all because the zip files are a bit messy - directories are also stored as 0-length files, which
# causes the standard Python unzipping process to fail (it tries to create a file with the name of an existing directory)
#
# The solution is to find those messy zip file entries, and not extract them in the first place.
#
# Easiest way to find these extra files is by looking at their external attribute = 0x10 means a directory
def unzip(zip, dest):
with zipfile.ZipFile(zip, 'r') as zf:
all_files = zf.filelist
proper_files = [f for f in zf.filelist if f.external_attr != 0x10]
zf.extractall(dest, [f.filename for f in proper_files])
def main():
parser = setup_parser()
parser.add_argument('--remove-zip', action='store_true', default=False, help='Delete ZIP file after unzipping')
args = parser.parse_args()
setup_logging(args)
logging.info(f'Unzipping all Document.zip files from {args.base}')
print(f"Unzipping all Document.zip files from {args.base}")
document_zips = glob(os.path.join(args.base, '**', 'Document.zip'), recursive=True)
skipped = unzipped = 0
for zip_path in tqdm(document_zips):
folder_path = os.path.join(os.path.dirname(zip_path), 'Document')
if os.path.isdir(folder_path):
if not args.overwrite:
logging.info(f"Skipping ${folder_path}, it already exists")
skipped += 1
continue
else:
logging.debug(f"Removing existing folder ${folder_path}")
shutil.rmtree(folder_path)
logging.info(f"Unzipping from {zip_path} to {folder_path}")
os.makedirs(folder_path, exist_ok=True)
unzip(zip_path, folder_path)
if args.remove_zip:
logging.debug("Removing ZIP file " + zip_path)
os.remove(zip_path)
unzipped += 1
print(f'Unziped {unzipped} files, skipped {skipped} folders')
if __name__ == '__main__':
main()