Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open output files after forking #238

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions wikiextractor/WikiExtractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import os.path
import re # TODO use regex when it will be standard
import sys
import io
from io import StringIO
from multiprocessing import Queue, Process, cpu_count
from timeit import default_timer
Expand Down Expand Up @@ -158,6 +159,9 @@ def __init__(self, nextFile, max_file_size=0, compress=True):
self.nextFile = nextFile
self.compress = compress
self.max_file_size = max_file_size
self.file = None

def open_file(self):
self.file = self.open(self.nextFile.next())

def reserve(self, size):
Expand All @@ -176,7 +180,7 @@ def open(self, filename):
if self.compress:
return bz2.BZ2File(filename + '.bz2', 'w')
else:
return open(filename, 'w')
return io.open(filename, "w", encoding="utf-8")


# ----------------------------------------------------------------------
Expand Down Expand Up @@ -333,7 +337,8 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
logging.info("Loaded %d templates in %.1fs", templates, template_load_elapsed)

if out_file == '-':
output = sys.stdout
# sys.stdout, but we should postpone assigning output to sys.stdout after forking
output = None
if file_compress:
logging.warn("writing to stdout, so no output compression (use an external tool)")
else:
Expand Down Expand Up @@ -435,8 +440,6 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
# wait for it to finish
reduce.join()

if output != sys.stdout:
output.close()
extract_duration = default_timer() - extract_start
extract_rate = ordinal / extract_duration
logging.info("Finished %d-process extraction of %d articles in %.1fs (%.1f art/s)",
Expand Down Expand Up @@ -469,7 +472,10 @@ def reduce_process(output_queue, output):
:param output_queue: text to be output.
:param output: file object where to print.
"""

if isinstance(output, OutputSplitter):
output.open_file()
else:
output = sys.stdout
interval_start = default_timer()
period = 100000
# FIXME: use a heap
Expand All @@ -492,6 +498,8 @@ def reduce_process(output_queue, output):
break
ordinal, text = pair
ordering_buffer[ordinal] = text
if isinstance(output, OutputSplitter):
output.file.close()


# ----------------------------------------------------------------------
Expand Down