Skip to content

Commit

Permalink
convert: Allow both multi-threaded and single-threaded export.
Browse files Browse the repository at this point in the history
  • Loading branch information
heinezen committed Jan 25, 2024
1 parent 4775570 commit 4dbab82
Showing 1 changed file with 192 additions and 68 deletions.
260 changes: 192 additions & 68 deletions openage/convert/processor/export/media_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import os
import multiprocessing
import queue

from openage.convert.entity_object.export.texture import Texture
from openage.convert.service import debug_info
Expand Down Expand Up @@ -103,75 +104,30 @@ def export(
itargs = (args.palettes, args.compression_level, args.game_version)
info("-- Exporting terrain files...")

# Create a manager for sharing data between the workers and main process
with multiprocessing.Manager() as manager:
# Workers write the image metadata to this queue
# so that it can be forwarded to the export requests
#
# we cannot do this in a worker process directly
# because the export requests cannot be pickled
outqueue = manager.Queue()

expected_size = len(cur_export_requests)

worker_count = args.jobs
if worker_count is None:
worker_count = min(multiprocessing.cpu_count(), expected_size)

# Create a pool of workers
with multiprocessing.Pool(worker_count) as pool:
for idx, request in enumerate(cur_export_requests):
# Feed the worker with the source file data (bytes) from the
# main process
#
# This is necessary because some image files are inside an
# archive and cannot be accessed asynchronously
source_data = read_data_func(request, sourcedir, **kwargs)
if source_data is None:
expected_size -= 1
continue

# The target path must be native
target_path = exportdir[request.targetdir, request.target_filename]

# Start an export call in a worker process
# The call is asynchronous, so the next worker can be
# started immediately
pool.apply_async(
export_func,
args=(
idx,
source_data,
outqueue,
request.source_filename,
target_path,
*itargs
),
kwds=kwargs
)

# Log file information
if get_loglevel() <= logging.DEBUG:
MediaExporter.log_fileinfo(
sourcedir[request.get_type().value, request.source_filename],
exportdir[request.targetdir, request.target_filename]
)

# Show progress
MediaExporter._show_progress(outqueue.qsize(), expected_size)

# Close the pool since all workers have been started
pool.close()

# Show progress for remaining workers
while outqueue.qsize() < expected_size:
MediaExporter._show_progress(outqueue.qsize(), expected_size)

# Wait for all workers to finish
pool.join()
if args.jobs == 1:
MediaExporter._export_singlethreaded(
cur_export_requests,
sourcedir,
exportdir,
read_data_func,
export_func,
handle_outqueue_func,
itargs,
kwargs
)

if handle_outqueue_func:
handle_outqueue_func(outqueue, cur_export_requests)
else:
MediaExporter._export_multithreaded(
cur_export_requests,
sourcedir,
exportdir,
read_data_func,
export_func,
handle_outqueue_func,
itargs,
kwargs,
args.jobs
)

if args.debug_info > 5:
cachedata = {}
Expand All @@ -197,6 +153,174 @@ def export(
args.game_version
)

@staticmethod
def _export_singlethreaded(
requests: list[MediaExportRequest],
sourcedir: Path,
exportdir: Path,
read_data_func: typing.Callable,
export_func: typing.Callable,
handle_outqueue_func: typing.Callable | None,
itargs: tuple,
kwargs: dict
):
"""
Export media files in a single thread.
:param requests: Export requests for media files.
:param sourcedir: Directory where all media assets are mounted. Source subfolder and
source filename should be stored in the export request.
:param exportdir: Directory the resulting file(s) will be exported to. Target subfolder
and target filename should be stored in the export request.
:param read_data_func: Function for reading the source file data.
:param export_func: Function for exporting media files.
:param handle_outqueue_func: Optional function for handling data in the outqueue.
:param itargs: Arguments for the export function.
:param kwargs: Keyword arguments for the export function.
:type requests: list[MediaExportRequest]
:type sourcedir: Path
:type exportdir: Path
:type read_data_func: typing.Callable
:type export_func: typing.Callable
:type handle_outqueue_func: typing.Callable
:type itargs: tuple
:type kwargs: dict
"""
single_queue = queue.Queue()
for idx, request in enumerate(requests):
source_data = read_data_func(request, sourcedir, **kwargs)
if source_data is None:
continue

target_path = exportdir[request.targetdir, request.target_filename]

export_func(
idx,
source_data,
single_queue,
request.source_filename,
target_path,
*itargs,
**kwargs
)

if get_loglevel() <= logging.DEBUG:
MediaExporter.log_fileinfo(
sourcedir[request.get_type().value, request.source_filename],
exportdir[request.targetdir, request.target_filename]
)

MediaExporter._show_progress(idx + 1, len(requests))

if handle_outqueue_func:
handle_outqueue_func(single_queue, requests)

@staticmethod
def _export_multithreaded(
requests: list[MediaExportRequest],
sourcedir: Path,
exportdir: Path,
read_data_func: typing.Callable,
export_func: typing.Callable,
handle_outqueue_func: typing.Callable | None,
itargs: tuple,
kwargs: dict,
job_count: int = None
):
"""
Export media files in multiple threads.
:param requests: Export requests for media files.
:param sourcedir: Directory where all media assets are mounted. Source subfolder and
source filename should be stored in the export request.
:param exportdir: Directory the resulting file(s) will be exported to. Target subfolder
and target filename should be stored in the export request.
:param read_data_func: Function for reading the source file data.
:param export_func: Function for exporting media files.
:param handle_outqueue_func: Optional function for handling data in the outqueue.
:param itargs: Arguments for the export function.
:param kwargs: Keyword arguments for the export function.
:param job_count: Number of worker processes to use.
:type requests: list[MediaExportRequest]
:type sourcedir: Path
:type exportdir: Path
:type read_data_func: typing.Callable
:type export_func: typing.Callable
:type handle_outqueue_func: typing.Callable
:type itargs: tuple
:type kwargs: dict
:type job_count: int
"""
worker_count = job_count
if worker_count is None:
# Small optimization that saves some time for small exports
worker_count = min(multiprocessing.cpu_count(), len(requests))

# Create a manager for sharing data between the workers and main process
with multiprocessing.Manager() as manager:
# Workers write the image metadata to this queue
# so that it can be forwarded to the export requests
#
# we cannot do this in a worker process directly
# because the export requests cannot be pickled
outqueue = manager.Queue()

expected_size = len(requests)

# Create a pool of workers
with multiprocessing.Pool(worker_count) as pool:
for idx, request in enumerate(requests):
# Feed the worker with the source file data (bytes) from the
# main process
#
# This is necessary because some image files are inside an
# archive and cannot be accessed asynchronously
source_data = read_data_func(request, sourcedir, **kwargs)
if source_data is None:
expected_size -= 1
continue

target_path = exportdir[request.targetdir, request.target_filename]

# Start an export call in a worker process
# The call is asynchronous, so the next worker can be
# started immediately
pool.apply_async(
export_func,
args=(
idx,
source_data,
outqueue,
request.source_filename,
target_path,
*itargs
),
kwds=kwargs
)

# Log file information
if get_loglevel() <= logging.DEBUG:
MediaExporter.log_fileinfo(
sourcedir[request.get_type().value, request.source_filename],
exportdir[request.targetdir, request.target_filename]
)

# Show progress
MediaExporter._show_progress(outqueue.qsize(), expected_size)

# Close the pool since all workers have been started
pool.close()

# Show progress for remaining workers
while outqueue.qsize() < expected_size:
MediaExporter._show_progress(outqueue.qsize(), expected_size)

# Wait for all workers to finish
pool.join()

if handle_outqueue_func:
handle_outqueue_func(outqueue, requests)

@staticmethod
def _get_blend_data(
request: MediaExportRequest,
Expand Down

0 comments on commit 4dbab82

Please sign in to comment.