From c937e30d3955244452506489925e4969ac8ce59a Mon Sep 17 00:00:00 2001 From: heinezen Date: Wed, 17 Jan 2024 01:46:56 +0100 Subject: [PATCH 01/18] convert: Log processor stages execution times. --- openage/convert/tool/driver.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/openage/convert/tool/driver.py b/openage/convert/tool/driver.py index d7d664d16e..ba974f42e8 100644 --- a/openage/convert/tool/driver.py +++ b/openage/convert/tool/driver.py @@ -1,4 +1,4 @@ -# Copyright 2015-2023 the openage authors. See copying.md for legal info. +# Copyright 2015-2024 the openage authors. See copying.md for legal info. # # pylint: disable=too-many-return-statements @@ -65,7 +65,9 @@ def convert_metadata(args: Namespace) -> None: gamedata_path = args.targetdir.joinpath('gamedata') if gamedata_path.exists(): gamedata_path.removerecursive() + read_start = timeit.default_timer() + # Read .dat debug_gamedata_format(args.debugdir, args.debug_info, args.game_version) gamespec = get_gamespec(args.srcdir, args.game_version, not args.flag("no_pickle_cache")) @@ -86,6 +88,7 @@ def convert_metadata(args: Namespace) -> None: debug_registered_graphics(args.debugdir, args.debug_info, existing_graphics) read_end = timeit.default_timer() + info("Finished metadata read (%.2f seconds)", read_end - read_start) conversion_start = timeit.default_timer() # Convert @@ -95,6 +98,7 @@ def convert_metadata(args: Namespace) -> None: existing_graphics) conversion_end = timeit.default_timer() + info("Finished data conversion (%.2f seconds)", conversion_end - conversion_start) export_start = timeit.default_timer() for modpack in modpacks: @@ -102,6 +106,7 @@ def convert_metadata(args: Namespace) -> None: debug_modpack(args.debugdir, args.debug_info, modpack) export_end = timeit.default_timer() + info("Finished modpack export (%.2f seconds)", export_end - export_start) stages_time = { "read": read_end - read_start, From 11b5d71975edf25775f2c59d4cc3e8af2f328756 Mon Sep 17 00:00:00 2001 From: heinezen Date: Wed, 17 Jan 2024 01:47:30 +0100 Subject: [PATCH 02/18] convert: Remove parent relationship for 'Texture'. --- .../convert/entity_object/export/texture.py | 34 ++++--------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/openage/convert/entity_object/export/texture.py b/openage/convert/entity_object/export/texture.py index 5039abfeb3..885f5dbfaa 100644 --- a/openage/convert/entity_object/export/texture.py +++ b/openage/convert/entity_object/export/texture.py @@ -1,4 +1,4 @@ -# Copyright 2014-2023 the openage authors. See copying.md for legal info. +# Copyright 2014-2024 the openage authors. See copying.md for legal info. """ Routines for texture generation etc """ @@ -14,7 +14,6 @@ from ....log import spam from ...value_object.read.media.blendomatic import BlendingMode from ...value_object.read.media.hardcoded.terrain_tile_size import TILE_HALFSIZE -from ...value_object.read.genie_structure import GenieStructure if typing.TYPE_CHECKING: from openage.convert.value_object.read.media.colortable import ColorTable @@ -65,17 +64,13 @@ def get_data(self) -> numpy.ndarray: return self.data -class Texture(GenieStructure): - image_format = "png" +class Texture: + """ + one sprite, as part of a texture atlas. - name_struct = "subtexture" - name_struct_file = "texture" - struct_description = ( - "one sprite, as part of a texture atlas.\n" - "\n" - "this struct stores information about positions and sizes\n" - "of sprites included in the 'big texture'." - ) + stores information about positions and sizes + of sprites included in the 'big texture'. + """ def __init__( self, @@ -180,18 +175,3 @@ def get_cache_params(self) -> tuple[tuple, tuple]: - PNG compression parameters (compression level + deflate params) """ return self.best_packer_hints, self.best_compr - - @classmethod - def get_data_format_members(cls, game_version) -> tuple: - """ - Return the members in this struct. - """ - data_format = ( - (True, "x", None, "int32_t"), - (True, "y", None, "int32_t"), - (True, "w", None, "int32_t"), - (True, "h", None, "int32_t"), - (True, "cx", None, "int32_t"), - (True, "cy", None, "int32_t"), - ) - return data_format From fecdddccfd623257a96a8d28bbe385caaa1e0189 Mon Sep 17 00:00:00 2001 From: heinezen Date: Thu, 18 Jan 2024 03:44:01 +0100 Subject: [PATCH 03/18] convert: Write to PNG with thread pool. --- .../processor/export/media_exporter.py | 109 ++++++++++++++++-- 1 file changed, 99 insertions(+), 10 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 7005e262c9..36475e9e80 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -1,4 +1,4 @@ -# Copyright 2021-2023 the openage authors. See copying.md for legal info. +# Copyright 2021-2024 the openage authors. See copying.md for legal info. # # pylint: disable=too-many-arguments,too-many-locals """ @@ -9,7 +9,7 @@ import logging import os - +from multiprocessing import Pool from openage.convert.entity_object.export.texture import Texture from openage.convert.service import debug_info @@ -59,6 +59,8 @@ def export( if args.game_version.edition.media_cache: cache_info = load_media_cache(args.game_version.edition.media_cache) + textures = [] + for media_type in export_requests.keys(): cur_export_requests = export_requests[media_type] @@ -79,6 +81,28 @@ def export( export_func = MediaExporter._export_graphics info("-- Exporting graphics files...") + for count, request in enumerate(cur_export_requests, start = 1): + texture = MediaExporter._export_graphics( + request, + sourcedir, + exportdir, + args.palettes, + args.compression_level, + cache_info + ) + textures.append(( + texture, + # exportdir[request.targetdir], + exportdir[request.targetdir][request.target_filename].resolve_native_path(), + args.compression_level, + cache_info + )) + + with Pool() as pool: + pool.starmap(save_png, textures) + + continue + elif media_type is MediaType.SOUNDS: kwargs["loglevel"] = args.debug_info kwargs["debugdir"] = args.debugdir @@ -172,7 +196,7 @@ def _export_graphics( palettes: dict[int, ColorTable], compression_level: int, cache_info: dict = None - ) -> None: + ) -> Texture: """ Convert and export a graphics file. @@ -245,13 +269,13 @@ def _export_graphics( texture = Texture(image, palettes) merge_frames(texture, cache=packer_cache) - MediaExporter.save_png( - texture, - exportdir[export_request.targetdir], - export_request.target_filename, - compression_level=compression_level, - cache=compr_cache - ) + # MediaExporter.save_png( + # texture, + # exportdir[export_request.targetdir], + # export_request.target_filename, + # compression_level=compression_level, + # cache=compr_cache + # ) metadata = {export_request.target_filename: texture.get_metadata()} export_request.set_changed() export_request.notify_observers(metadata) @@ -263,6 +287,8 @@ def _export_graphics( exportdir[export_request.targetdir, export_request.target_filename] ) + return texture + @staticmethod def _export_interface( export_request: MediaExportRequest, @@ -582,3 +608,66 @@ def log_fileinfo( f"{(target_size / source_size * 100) - 100:+.1f}%)") dbg(log) + + +def save_png( + texture: Texture, + # targetdir: Path, + filename: str, + compression_level: int = 1, + cache: dict = None, + dry_run: bool = False +) -> None: + """ + Store the image data into the target directory path, + with given filename="dir/out.png". + + :param texture: Texture with an image atlas. + :param targetdir: Directory where the image file is created. + :param filename: Name of the resulting image file. + :param compression_level: PNG compression level used for the resulting image file. + :param dry_run: If True, create the PNG but don't save it as a file. + :type texture: Texture + :type targetdir: Directory + :type filename: str + :type compression_level: int + :type dry_run: bool + """ + from ...service.export.png import png_create + + compression_levels = { + 0: png_create.CompressionMethod.COMPR_NONE, + 1: png_create.CompressionMethod.COMPR_DEFAULT, + 2: png_create.CompressionMethod.COMPR_OPTI, + 3: png_create.CompressionMethod.COMPR_GREEDY, + 4: png_create.CompressionMethod.COMPR_AGGRESSIVE, + } + + if not dry_run: + _, ext = os.path.splitext(filename) + + # only allow png + if ext != b".png": + raise ValueError("Filename invalid, a texture must be saved" + f" as '*.png', not '*.{ext}'") + + compression_method = compression_levels.get( + compression_level, + png_create.CompressionMethod.COMPR_DEFAULT + ) + png_data, compr_params = png_create.save( + texture.image_data.data, + compression_method, + cache + ) + + if not dry_run: + with open(filename, "wb") as imagefile: + imagefile.write(png_data) + + # if not dry_run: + # with targetdir[filename].open("wb") as imagefile: + # imagefile.write(png_data) + + if compr_params: + texture.best_compr = (compression_level, *compr_params) From e1762eccca94800f73eac9a9131361bce5db950f Mon Sep 17 00:00:00 2001 From: heinezen Date: Fri, 19 Jan 2024 23:13:59 +0100 Subject: [PATCH 04/18] convert: Graphics conversion inside thread pool. --- .../processor/export/media_exporter.py | 179 +++++++++++++----- 1 file changed, 134 insertions(+), 45 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 36475e9e80..7fc296ed33 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -9,7 +9,7 @@ import logging import os -from multiprocessing import Pool +import multiprocessing from openage.convert.entity_object.export.texture import Texture from openage.convert.service import debug_info @@ -59,8 +59,6 @@ def export( if args.game_version.edition.media_cache: cache_info = load_media_cache(args.game_version.edition.media_cache) - textures = [] - for media_type in export_requests.keys(): cur_export_requests = export_requests[media_type] @@ -81,25 +79,46 @@ def export( export_func = MediaExporter._export_graphics info("-- Exporting graphics files...") - for count, request in enumerate(cur_export_requests, start = 1): - texture = MediaExporter._export_graphics( - request, - sourcedir, - exportdir, - args.palettes, - args.compression_level, - cache_info - ) - textures.append(( - texture, - # exportdir[request.targetdir], - exportdir[request.targetdir][request.target_filename].resolve_native_path(), - args.compression_level, - cache_info - )) - - with Pool() as pool: - pool.starmap(save_png, textures) + with multiprocessing.Manager() as manager: + outqueue = manager.Queue() + with multiprocessing.Pool() as pool: + for idx, request in enumerate(cur_export_requests): + source_file = sourcedir[request.get_type().value][request.source_filename] + if not source_file.exists(): + if source_file.suffix.lower() in (".smx", ".sld"): + # Rename extension to SMP and try again + other_filename = request.source_filename[:-3] + "smp" + source_file = sourcedir[ + request.get_type().value, + other_filename + ] + request.set_source_filename(other_filename) + + target_path = exportdir[request.targetdir][request.target_filename].resolve_native_path() + func_args = ( + idx, + source_file.open("rb").read(), + outqueue, + request.source_filename, + target_path, + kwargs["palettes"], + kwargs["compression_level"], + cache_info + ) + pool.apply_async( + _export_texture, + func_args + ) + + pool.close() + pool.join() + + while not outqueue.empty(): + idx, metadata = outqueue.get() + update_data = {cur_export_requests[idx].target_filename: metadata} + cur_export_requests[idx].set_changed() + cur_export_requests[idx].notify_observers(update_data) + cur_export_requests[idx].clear_changed() continue @@ -269,13 +288,13 @@ def _export_graphics( texture = Texture(image, palettes) merge_frames(texture, cache=packer_cache) - # MediaExporter.save_png( - # texture, - # exportdir[export_request.targetdir], - # export_request.target_filename, - # compression_level=compression_level, - # cache=compr_cache - # ) + MediaExporter.save_png( + texture, + exportdir[export_request.targetdir], + export_request.target_filename, + compression_level=compression_level, + cache=compr_cache + ) metadata = {export_request.target_filename: texture.get_metadata()} export_request.set_changed() export_request.notify_observers(metadata) @@ -287,8 +306,6 @@ def _export_graphics( exportdir[export_request.targetdir, export_request.target_filename] ) - return texture - @staticmethod def _export_interface( export_request: MediaExportRequest, @@ -610,26 +627,102 @@ def log_fileinfo( dbg(log) -def save_png( +def _export_texture( + export_request_idx: int, + graphics_data: bytes, + outqueue: multiprocessing.Queue, + source_filename: str, + target_path: str, + palettes: dict[int, ColorTable], + compression_level: int, + cache_info: dict = None +) -> None: + """ + Convert and export a graphics file to a PNG texture. + + :param export_request: Export request for a graphics file. + :param sourcedir: Directory where all media assets are mounted. Source subfolder and + source filename should be stored in the export request. + :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder + and target filename should be stored in the export request. + :param palettes: Palettes used by the game. + :param compression_level: PNG compression level for the resulting image file. + :param cache_info: Media cache information with compression parameters from a previous run. + :type export_request: MediaExportRequest + :type sourcedir: Path + :type exportdir: Path + :type palettes: dict + :type compression_level: int + :type cache_info: tuple + """ + file_ext = source_filename.split('.')[-1].lower() + if file_ext == "slp": + from ...value_object.read.media.slp import SLP + image = SLP(graphics_data) + + elif file_ext == "smp": + from ...value_object.read.media.smp import SMP + image = SMP(graphics_data) + + elif file_ext == "smx": + from ...value_object.read.media.smx import SMX + image = SMX(graphics_data) + + elif file_ext == "sld": + from ...value_object.read.media.sld import SLD + image = SLD(graphics_data) + + else: + raise SyntaxError(f"Source file {source_filename} has an unrecognized extension: " + f"{file_ext}") + + packer_cache = None + compr_cache = None + if cache_info: + cache_params = cache_info.get(source_filename, None) + + if cache_params: + packer_cache = cache_params["packer_settings"] + compression_level = cache_params["compr_settings"][0] + compr_cache = cache_params["compr_settings"][1:] + + from .texture_merge import merge_frames + + texture = Texture(image, palettes) + merge_frames(texture, cache=packer_cache) + _save_png( + texture, + target_path, + compression_level=compression_level, + cache=compr_cache + ) + metadata = (export_request_idx, texture.get_metadata().copy()) + outqueue.put(metadata) + + # if get_loglevel() <= logging.DEBUG: + # MediaExporter.log_fileinfo( + # source_file, + # exportdir[export_request.targetdir, export_request.target_filename] + # ) + + +def _save_png( texture: Texture, - # targetdir: Path, - filename: str, + target_path: str, compression_level: int = 1, cache: dict = None, dry_run: bool = False ) -> None: """ Store the image data into the target directory path, - with given filename="dir/out.png". + with given target_path="dir/out.png". :param texture: Texture with an image atlas. - :param targetdir: Directory where the image file is created. - :param filename: Name of the resulting image file. + :param target_path: Path to the resulting image file. :param compression_level: PNG compression level used for the resulting image file. :param dry_run: If True, create the PNG but don't save it as a file. :type texture: Texture - :type targetdir: Directory - :type filename: str + :type target_path: str :type compression_level: int :type dry_run: bool """ @@ -644,7 +737,7 @@ def save_png( } if not dry_run: - _, ext = os.path.splitext(filename) + _, ext = os.path.splitext(target_path) # only allow png if ext != b".png": @@ -662,12 +755,8 @@ def save_png( ) if not dry_run: - with open(filename, "wb") as imagefile: + with open(target_path, "wb") as imagefile: imagefile.write(png_data) - # if not dry_run: - # with targetdir[filename].open("wb") as imagefile: - # imagefile.write(png_data) - if compr_params: texture.best_compr = (compression_level, *compr_params) From ff59db03d1be4cc58b138524b6b735323d4c384c Mon Sep 17 00:00:00 2001 From: heinezen Date: Fri, 19 Jan 2024 23:48:52 +0100 Subject: [PATCH 05/18] convert: Switch graphics export function to multi-threading. --- .../processor/export/media_exporter.py | 216 ++++++++---------- 1 file changed, 94 insertions(+), 122 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 7fc296ed33..7b859762a3 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -73,53 +73,15 @@ def export( info("-- Exporting terrain files...") elif media_type is MediaType.GRAPHICS: - kwargs["palettes"] = args.palettes - kwargs["compression_level"] = args.compression_level - kwargs["cache_info"] = cache_info - export_func = MediaExporter._export_graphics info("-- Exporting graphics files...") - - with multiprocessing.Manager() as manager: - outqueue = manager.Queue() - with multiprocessing.Pool() as pool: - for idx, request in enumerate(cur_export_requests): - source_file = sourcedir[request.get_type().value][request.source_filename] - if not source_file.exists(): - if source_file.suffix.lower() in (".smx", ".sld"): - # Rename extension to SMP and try again - other_filename = request.source_filename[:-3] + "smp" - source_file = sourcedir[ - request.get_type().value, - other_filename - ] - request.set_source_filename(other_filename) - - target_path = exportdir[request.targetdir][request.target_filename].resolve_native_path() - func_args = ( - idx, - source_file.open("rb").read(), - outqueue, - request.source_filename, - target_path, - kwargs["palettes"], - kwargs["compression_level"], - cache_info - ) - pool.apply_async( - _export_texture, - func_args - ) - - pool.close() - pool.join() - - while not outqueue.empty(): - idx, metadata = outqueue.get() - update_data = {cur_export_requests[idx].target_filename: metadata} - cur_export_requests[idx].set_changed() - cur_export_requests[idx].notify_observers(update_data) - cur_export_requests[idx].clear_changed() - + MediaExporter._export_graphics( + cur_export_requests, + sourcedir, + exportdir, + args.palettes, + args.compression_level, + cache_info + ) continue elif media_type is MediaType.SOUNDS: @@ -209,17 +171,17 @@ def _export_blend( @staticmethod def _export_graphics( - export_request: MediaExportRequest, + requests: list[MediaExportRequest], sourcedir: Path, exportdir: Path, palettes: dict[int, ColorTable], compression_level: int, cache_info: dict = None - ) -> Texture: + ) -> None: """ - Convert and export a graphics file. + Convert and export graphics file requests (multi-threaded). - :param export_request: Export request for a graphics file. + :param requests: Export requests for graphics files. :param sourcedir: Directory where all media assets are mounted. Source subfolder and source filename should be stored in the export request. :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder @@ -227,84 +189,94 @@ def _export_graphics( :param palettes: Palettes used by the game. :param compression_level: PNG compression level for the resulting image file. :param cache_info: Media cache information with compression parameters from a previous run. - :type export_request: MediaExportRequest + :type requests: list[MediaExportRequest] :type sourcedir: Path :type exportdir: Path :type palettes: dict :type compression_level: int :type cache_info: tuple """ - source_file = sourcedir[ - export_request.get_type().value, - export_request.source_filename - ] - - try: - media_file = source_file.open("rb") - - except FileNotFoundError: - if source_file.suffix.lower() in (".smx", ".sld"): - # Rename extension to SMP and try again - other_filename = export_request.source_filename[:-3] + "smp" - source_file = sourcedir[ - export_request.get_type().value, - other_filename - ] - export_request.set_source_filename(other_filename) - - media_file = source_file.open("rb") - - if source_file.suffix.lower() == ".slp": - from ...value_object.read.media.slp import SLP - image = SLP(media_file.read()) - - elif source_file.suffix.lower() == ".smp": - from ...value_object.read.media.smp import SMP - image = SMP(media_file.read()) - - elif source_file.suffix.lower() == ".smx": - from ...value_object.read.media.smx import SMX - image = SMX(media_file.read()) - - elif source_file.suffix.lower() == ".sld": - from ...value_object.read.media.sld import SLD - image = SLD(media_file.read()) - - else: - raise SyntaxError(f"Source file {source_file.name} has an unrecognized extension: " - f"{source_file.suffix.lower()}") - - packer_cache = None - compr_cache = None - if cache_info: - cache_params = cache_info.get(export_request.source_filename, None) - - if cache_params: - packer_cache = cache_params["packer_settings"] - compression_level = cache_params["compr_settings"][0] - compr_cache = cache_params["compr_settings"][1:] - - from .texture_merge import merge_frames - - texture = Texture(image, palettes) - merge_frames(texture, cache=packer_cache) - MediaExporter.save_png( - texture, - exportdir[export_request.targetdir], - export_request.target_filename, - compression_level=compression_level, - cache=compr_cache - ) - metadata = {export_request.target_filename: texture.get_metadata()} - export_request.set_changed() - export_request.notify_observers(metadata) - export_request.clear_changed() - - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[export_request.targetdir, export_request.target_filename] - ) + # Create a manager for sharing data between the workers and main process + with multiprocessing.Manager() as manager: + # Workers write the image metadata to this queue + # so that it can be forwarded to the export requests + # + # we cannot do this in a worker process directly + # because the export requests cannot be pickled + outqueue = manager.Queue() + + # Create a pool of workers + with multiprocessing.Pool() as pool: + for idx, request in enumerate(requests): + # Feed the worker with the source file data (bytes) from the + # main process + # + # This is necessary because some image files are inside an + # archive and cannot be accessed asynchronously + source_file = sourcedir[request.get_type().value, + request.source_filename] + if not source_file.exists(): + if source_file.suffix.lower() in (".smx", ".sld"): + # Some DE2 graphics files have the wrong extension + # Fall back to the SMP (beta) extension + other_filename = request.source_filename[:-3] + "smp" + source_file = sourcedir[ + request.get_type().value, + other_filename + ] + request.set_source_filename(other_filename) + + # The target path must be native + target_path = exportdir[request.targetdir, + request.target_filename].resolve_native_path() + + # Start an export call in a worker process + # The call is asynchronous, so the next worker can be + # started immediately + pool.apply_async( + _export_texture, + args=( + idx, + source_file.open("rb").read(), + outqueue, + request.source_filename, + target_path, + palettes, + compression_level, + cache_info + ) + ) + + # Log file information + if get_loglevel() <= logging.DEBUG: + MediaExporter.log_fileinfo( + source_file, + exportdir[request.targetdir, request.target_filename] + ) + + # Show progress + print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", + end = "\r", flush = True) + + # Close the pool since all workers have been started + pool.close() + + # Show progress for remaining workers + while outqueue.qsize() < len(requests): + print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", + end = "\r", flush = True) + + # Wait for all workers to finish + pool.join() + + # Collect the metadata from the workers and forward it to the + # export requests + while not outqueue.empty(): + idx, metadata = outqueue.get() + update_data = {requests[idx].target_filename: metadata} + requests[idx].set_changed() + requests[idx].notify_observers(update_data) + requests[idx].clear_changed() @staticmethod def _export_interface( From 2592ea9b7974ff8bbd1a94db306891be517e1ade Mon Sep 17 00:00:00 2001 From: heinezen Date: Sat, 20 Jan 2024 00:07:53 +0100 Subject: [PATCH 06/18] convert: Switch terrain export to multi-threading. --- .../processor/export/media_exporter.py | 218 ++++++++++++------ 1 file changed, 152 insertions(+), 66 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 7b859762a3..d4792763a6 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -65,12 +65,16 @@ def export( export_func = None kwargs = {} if media_type is MediaType.TERRAIN: - # Game version and palettes - kwargs["game_version"] = args.game_version - kwargs["palettes"] = args.palettes - kwargs["compression_level"] = args.compression_level - export_func = MediaExporter._export_terrain info("-- Exporting terrain files...") + MediaExporter._export_terrains( + cur_export_requests, + sourcedir, + exportdir, + args.palettes, + args.game_version, + args.compression_level + ) + continue elif media_type is MediaType.GRAPHICS: info("-- Exporting graphics files...") @@ -355,8 +359,8 @@ def _export_sound( ) @staticmethod - def _export_terrain( - export_request: MediaExportRequest, + def _export_terrains( + requests: list[MediaExportRequest], sourcedir: Path, exportdir: Path, palettes: dict[int, ColorTable], @@ -364,9 +368,9 @@ def _export_terrain( compression_level: int ) -> None: """ - Convert and export a terrain graphics file. + Convert and export terrain graphics files (multi-threaded). - :param export_request: Export request for a terrain graphics file. + :param requests: Export requests for terrain graphics files. :param sourcedir: Directory where all media assets are mounted. Source subfolder and source filename should be stored in the export request. :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder @@ -374,61 +378,82 @@ def _export_terrain( :param game_version: Game edition and expansion info. :param palettes: Palettes used by the game. :param compression_level: PNG compression level for the resulting image file. - :type export_request: MediaExportRequest + :type requests: list[MediaExportRequest] :type sourcedir: Directory :type exportdir: Directory :type palettes: dict :type game_version: GameVersion :type compression_level: int """ - source_file = sourcedir[ - export_request.get_type().value, - export_request.source_filename - ] + # Create a manager for sharing data between the workers and main process + with multiprocessing.Manager() as manager: + # Create a queue for data sharing + # it's not actually used for passing data, only for counting + # finished tasks + outqueue = manager.Queue() - if source_file.suffix.lower() == ".slp": - from ...value_object.read.media.slp import SLP - media_file = source_file.open("rb") - image = SLP(media_file.read()) + # Create a pool of workers + with multiprocessing.Pool() as pool: + for request in requests: + # Feed the worker with the source file data (bytes) from the + # main process + # + # This is necessary because some image files are inside an + # archive and cannot be accessed asynchronously + source_file = sourcedir[request.get_type().value, + request.source_filename] + if not source_file.exists(): + if source_file.suffix.lower() in (".smx", ".sld"): + # Some DE2 graphics files have the wrong extension + # Fall back to the SMP (beta) extension + other_filename = request.source_filename[:-3] + "smp" + source_file = sourcedir[ + request.get_type().value, + other_filename + ] + request.set_source_filename(other_filename) - elif source_file.suffix.lower() == ".dds": - # TODO: Implement - pass + # The target path must be native + target_path = exportdir[request.targetdir, + request.target_filename].resolve_native_path() - elif source_file.suffix.lower() == ".png": - from shutil import copyfileobj - src_path = source_file.open('rb') - dst_path = exportdir[export_request.targetdir, - export_request.target_filename].open('wb') - copyfileobj(src_path, dst_path) - return + # Start an export call in a worker process + # The call is asynchronous, so the next worker can be + # started immediately + pool.apply_async( + _export_terrain, + args=( + source_file.open("rb").read(), + outqueue, + request.source_filename, + target_path, + palettes, + compression_level, + game_version + ) + ) - else: - raise SyntaxError(f"Source file {source_file.name} has an unrecognized extension: " - f"{source_file.suffix.lower()}") + # Log file information + if get_loglevel() <= logging.DEBUG: + MediaExporter.log_fileinfo( + source_file, + exportdir[request.targetdir, request.target_filename] + ) - if game_version.edition.game_id in ("AOC", "SWGB"): - from .terrain_merge import merge_terrain - texture = Texture(image, palettes) - merge_terrain(texture) + # Show progress + print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", + end = "\r", flush = True) - else: - from .texture_merge import merge_frames - texture = Texture(image, palettes) - merge_frames(texture) + # Close the pool since all workers have been started + pool.close() - MediaExporter.save_png( - texture, - exportdir[export_request.targetdir], - export_request.target_filename, - compression_level, - ) + # Show progress for remaining workers + while outqueue.qsize() < len(requests): + print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", + end = "\r", flush = True) - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[export_request.targetdir, export_request.target_filename] - ) + # Wait for all workers to finish + pool.join() @staticmethod def _get_media_cache( @@ -599,8 +624,73 @@ def log_fileinfo( dbg(log) +def _export_terrain( + graphics_data: bytes, + outqueue: multiprocessing.Queue, + source_filename: str, + target_path: str, + palettes: dict[int, ColorTable], + compression_level: int, + game_version: GameVersion +) -> None: + """ + Convert and export a terrain graphics file. + + :param graphics_data: Raw file data of the graphics file. + :param outqueue: Queue for passing the image metadata to the main process. + :param source_filename: Filename of the source file. + :param target_path: Path to the resulting image file. + :param palettes: Palettes used by the game. + :param compression_level: PNG compression level for the resulting image file. + :param game_version: Game edition and expansion info. + :type graphics_data: bytes + :type outqueue: multiprocessing.Queue + :type source_filename: str + :type target_path: str + :type palettes: dict + :type compression_level: int + :type game_version: GameVersion + """ + file_ext = source_filename.split('.')[-1].lower() + if file_ext == "slp": + from ...value_object.read.media.slp import SLP + image = SLP(graphics_data) + + elif file_ext == "dds": + # TODO: Implement + pass + + elif file_ext == "png": + with open(target_path, "wb") as imagefile: + imagefile.write(graphics_data) + + return + + else: + raise SyntaxError(f"Source file {source_filename} has an unrecognized extension: " + f"{file_ext}") + + if game_version.edition.game_id in ("AOC", "SWGB"): + from .terrain_merge import merge_terrain + texture = Texture(image, palettes) + merge_terrain(texture) + + else: + from .texture_merge import merge_frames + texture = Texture(image, palettes) + merge_frames(texture) + + _save_png( + texture, + target_path, + compression_level=compression_level + ) + + outqueue.put(0) + + def _export_texture( - export_request_idx: int, + export_request_id: int, graphics_data: bytes, outqueue: multiprocessing.Queue, source_filename: str, @@ -612,17 +702,19 @@ def _export_texture( """ Convert and export a graphics file to a PNG texture. - :param export_request: Export request for a graphics file. - :param sourcedir: Directory where all media assets are mounted. Source subfolder and - source filename should be stored in the export request. - :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder - and target filename should be stored in the export request. + :param export_request_id: ID of the export request. + :param graphics_data: Raw file data of the graphics file. + :param outqueue: Queue for passing the image metadata to the main process. + :param source_filename: Filename of the source file. + :param target_path: Path to the resulting image file. :param palettes: Palettes used by the game. :param compression_level: PNG compression level for the resulting image file. :param cache_info: Media cache information with compression parameters from a previous run. - :type export_request: MediaExportRequest - :type sourcedir: Path - :type exportdir: Path + :type export_request_id: int + :type graphics_data: bytes + :type outqueue: multiprocessing.Queue + :type source_filename: str + :type target_path: str :type palettes: dict :type compression_level: int :type cache_info: tuple @@ -668,15 +760,9 @@ def _export_texture( compression_level=compression_level, cache=compr_cache ) - metadata = (export_request_idx, texture.get_metadata().copy()) + metadata = (export_request_id, texture.get_metadata().copy()) outqueue.put(metadata) - # if get_loglevel() <= logging.DEBUG: - # MediaExporter.log_fileinfo( - # source_file, - # exportdir[export_request.targetdir, export_request.target_filename] - # ) - def _save_png( texture: Texture, From 34ec340c84ed6607beeb01ed1a4543498c6d82d4 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sat, 20 Jan 2024 00:22:47 +0100 Subject: [PATCH 07/18] convert: Switch sound export to multi-threading. --- .../processor/export/media_exporter.py | 131 +++++++++++++----- 1 file changed, 99 insertions(+), 32 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index d4792763a6..fcee47a7ef 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -89,10 +89,16 @@ def export( continue elif media_type is MediaType.SOUNDS: - kwargs["loglevel"] = args.debug_info kwargs["debugdir"] = args.debugdir - export_func = MediaExporter._export_sound + kwargs["loglevel"] = args.debug_info info("-- Exporting sound files...") + MediaExporter._export_sound( + cur_export_requests, + sourcedir, + exportdir, + **kwargs + ) + continue elif media_type is MediaType.BLEND: kwargs["blend_mode_count"] = args.blend_mode_count @@ -306,57 +312,91 @@ def _export_palette( @staticmethod def _export_sound( - export_request: MediaExportRequest, + requests: list[MediaExportRequest], sourcedir: Path, exportdir: Path, **kwargs ) -> None: """ - Convert and export a sound file. + Convert and export sound files (multi-threaded). - :param export_request: Export request for a sound file. + :param requests: Export requests for sound files. :param sourcedir: Directory where all media assets are mounted. Source subfolder and source filename should be stored in the export request. :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder and target filename should be stored in the export request. - :type export_request: MediaExportRequest + :type requests: list[MediaExportRequest] :type sourcedir: Path :type exportdir: Path """ - source_file = sourcedir[ - export_request.get_type().value, - export_request.source_filename - ] + # Create a manager for sharing data between the workers and main process + with multiprocessing.Manager() as manager: + # Create a queue for data sharing + # it's not actually used for passing data, only for counting + # finished tasks + outqueue = manager.Queue() + + # Create a pool of workers + with multiprocessing.Pool() as pool: + sound_count = len(requests) + for request in requests: + # Feed the worker with the source file data (bytes) from the + # main process + # + # This is necessary because some image files are inside an + # archive and cannot be accessed asynchronously + source_file = sourcedir[request.get_type().value, + request.source_filename] - if source_file.is_file(): - with source_file.open_r() as infile: - media_file = infile.read() + if source_file.is_file(): + with source_file.open_r() as infile: + media_file = infile.read() - else: - # TODO: Filter files that do not exist out sooner - debug_info.debug_not_found_sounds(kwargs["debugdir"], kwargs["loglevel"], source_file) - return + else: + # TODO: Filter files that do not exist out sooner + debug_info.debug_not_found_sounds(kwargs["debugdir"], + kwargs["loglevel"], + source_file) + sound_count -= 1 + continue + + # The target path must be native + target_path = exportdir[request.targetdir, + request.target_filename].resolve_native_path() - from ...service.export.opus.opusenc import encode + # Start an export call in a worker process + # The call is asynchronous, so the next worker can be + # started immediately + pool.apply_async( + _export_sound, + args=( + media_file, + outqueue, + target_path + ) + ) - soundata = encode(media_file) + # Log file information + if get_loglevel() <= logging.DEBUG: + MediaExporter.log_fileinfo( + source_file, + exportdir[request.targetdir, request.target_filename] + ) - if isinstance(soundata, (str, int)): - raise RuntimeError(f"opusenc failed: {soundata}") + # Show progress + print(f"-- Files done: {format_progress(outqueue.qsize(), sound_count)}", + end = "\r", flush = True) - export_file = exportdir[ - export_request.targetdir, - export_request.target_filename - ] + # Close the pool since all workers have been started + pool.close() - with export_file.open_w() as outfile: - outfile.write(soundata) + # Show progress for remaining workers + while outqueue.qsize() < sound_count: + print(f"-- Files done: {format_progress(outqueue.qsize(), sound_count)}", + end = "\r", flush = True) - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[export_request.targetdir, export_request.target_filename] - ) + # Wait for all workers to finish + pool.join() @staticmethod def _export_terrains( @@ -624,6 +664,33 @@ def log_fileinfo( dbg(log) +def _export_sound( + sound_data: bytes, + outqueue: multiprocessing.Queue, + target_path: str +) -> None: + """ + Convert and export a sound file. + + :param sound_data: Raw file data of the sound file. + :param outqueue: Queue for passing metadata to the main process. + :param target_path: Path to the resulting sound file. + :type sound_data: bytes + :type outqueue: multiprocessing.Queue + :type target_path: str + """ + from ...service.export.opus.opusenc import encode + encoded = encode(sound_data) + + if isinstance(encoded, (str, int)): + raise RuntimeError(f"opusenc failed: {encoded}") + + with open(target_path, "wb") as outfile: + outfile.write(encoded) + + outqueue.put(0) + + def _export_terrain( graphics_data: bytes, outqueue: multiprocessing.Queue, From 1e08bf2006af11dfdd1e50e4b6110c90648c8ee9 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sat, 20 Jan 2024 00:42:55 +0100 Subject: [PATCH 08/18] convert: Switch blend export to multi-threading. --- .../processor/export/media_exporter.py | 127 +++++++++++++----- 1 file changed, 94 insertions(+), 33 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index fcee47a7ef..3270536f2f 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -62,7 +62,6 @@ def export( for media_type in export_requests.keys(): cur_export_requests = export_requests[media_type] - export_func = None kwargs = {} if media_type is MediaType.TERRAIN: info("-- Exporting terrain files...") @@ -74,7 +73,6 @@ def export( args.game_version, args.compression_level ) - continue elif media_type is MediaType.GRAPHICS: info("-- Exporting graphics files...") @@ -86,7 +84,6 @@ def export( args.compression_level, cache_info ) - continue elif media_type is MediaType.SOUNDS: kwargs["debugdir"] = args.debugdir @@ -98,18 +95,15 @@ def export( exportdir, **kwargs ) - continue elif media_type is MediaType.BLEND: - kwargs["blend_mode_count"] = args.blend_mode_count - export_func = MediaExporter._export_blend info("-- Exporting blend files...") - - total_count = len(cur_export_requests) - for count, request in enumerate(cur_export_requests, start = 1): - export_func(request, sourcedir, exportdir, **kwargs) - print(f"-- Files done: {format_progress(count, total_count)}", - end = "\r", flush = True) + MediaExporter._export_blend( + cur_export_requests, + sourcedir, + exportdir, + args.blend_mode_count + ) if args.debug_info > 5: cachedata = {} @@ -137,7 +131,7 @@ def export( @staticmethod def _export_blend( - export_request: MediaExportRequest, + requests: list[MediaExportRequest], sourcedir: Path, exportdir: Path, blend_mode_count: int = None @@ -145,39 +139,73 @@ def _export_blend( """ Convert and export a blending mode. - :param export_request: Export request for a blending mask. + :param requests: Export requests for blending masks. :param sourcedir: Directory where all media assets are mounted. Source subfolder and source filename should be stored in the export request. :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder and target filename should be stored in the export request. :param blend_mode_count: Number of blending modes extracted from the source file. - :type export_request: MediaExportRequest + :type requests: list[MediaExportRequest] :type sourcedir: Path :type exportdir: Path :type blend_mode_count: int """ - source_file = sourcedir.joinpath(export_request.source_filename) + # Create a manager for sharing data between the workers and main process + with multiprocessing.Manager() as manager: + # Create a queue for data sharing + # it's not actually used for passing data, only for counting + # finished tasks + outqueue = manager.Queue() - media_file = source_file.open("rb") - blend_data = Blendomatic(media_file, blend_mode_count) + # Create a pool of workers + with multiprocessing.Pool() as pool: + for request in requests: + # Feed the worker with the source file data (bytes) from the + # main process + # + # This is necessary because some image files are inside an + # archive and cannot be accessed asynchronously + source_file = sourcedir[request.get_type().value, + request.source_filename] - from .texture_merge import merge_frames + # The target path must be native + target_path = exportdir[request.targetdir, + request.target_filename].resolve_native_path() - textures = blend_data.get_textures() - for idx, texture in enumerate(textures): - merge_frames(texture) - MediaExporter.save_png( - texture, - exportdir[export_request.targetdir], - f"{export_request.target_filename}{idx}.png" - ) + # Start an export call in a worker process + # The call is asynchronous, so the next worker can be + # started immediately + pool.apply_async( + _export_blend, + args=( + source_file.open("rb").read(), + outqueue, + target_path, + blend_mode_count + ) + ) - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[export_request.targetdir, - f"{export_request.target_filename}{idx}.png"] - ) + # Log file information + if get_loglevel() <= logging.DEBUG: + MediaExporter.log_fileinfo( + source_file, + exportdir[request.targetdir, request.target_filename] + ) + + # Show progress + print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", + end = "\r", flush = True) + + # Close the pool since all workers have been started + pool.close() + + # Show progress for remaining workers + while outqueue.qsize() < len(requests): + print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", + end = "\r", flush = True) + + # Wait for all workers to finish + pool.join() @staticmethod def _export_graphics( @@ -664,6 +692,39 @@ def log_fileinfo( dbg(log) +def _export_blend( + blendfile_data: bytes, + outqueue: multiprocessing.Queue, + target_path: str, + blend_mode_count: int = None +) -> None: + """ + Convert and export a blending mode. + + :param blendfile_data: Raw file data of the blending mask. + :param outqueue: Queue for passing metadata to the main process. + :param target_path: Path to the resulting image file. + :param blend_mode_count: Number of blending modes extracted from the source file. + :type blendfile_data: bytes + :type outqueue: multiprocessing.Queue + :type target_path: str + :type blend_mode_count: int + """ + blend_data = Blendomatic(blendfile_data, blend_mode_count) + + from .texture_merge import merge_frames + + textures = blend_data.get_textures() + for idx, texture in enumerate(textures): + merge_frames(texture) + _save_png( + texture, + f"{target_path}{idx}.png" + ) + + outqueue.put(0) + + def _export_sound( sound_data: bytes, outqueue: multiprocessing.Queue, From d891e0e7060d0135c28102f1f6d2bc9c64bc6db4 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sat, 20 Jan 2024 00:55:55 +0100 Subject: [PATCH 09/18] convert: Reorder media type checks in export(). --- .../processor/export/media_exporter.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 3270536f2f..d019be4f00 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -63,15 +63,14 @@ def export( cur_export_requests = export_requests[media_type] kwargs = {} - if media_type is MediaType.TERRAIN: - info("-- Exporting terrain files...") - MediaExporter._export_terrains( + + if media_type is MediaType.BLEND: + info("-- Exporting blend files...") + MediaExporter._export_blend( cur_export_requests, sourcedir, exportdir, - args.palettes, - args.game_version, - args.compression_level + args.blend_mode_count ) elif media_type is MediaType.GRAPHICS: @@ -96,13 +95,15 @@ def export( **kwargs ) - elif media_type is MediaType.BLEND: - info("-- Exporting blend files...") - MediaExporter._export_blend( + elif media_type is MediaType.TERRAIN: + info("-- Exporting terrain files...") + MediaExporter._export_terrains( cur_export_requests, sourcedir, exportdir, - args.blend_mode_count + args.palettes, + args.game_version, + args.compression_level ) if args.debug_info > 5: From 55f4b026146b34de7ff15cdb531d5e4afa09cc04 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sat, 20 Jan 2024 01:59:01 +0100 Subject: [PATCH 10/18] convert: Fix queue not being fed when a terrain texture is copied. --- openage/convert/processor/export/media_exporter.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index d019be4f00..60ead219e4 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -471,16 +471,6 @@ def _export_terrains( # archive and cannot be accessed asynchronously source_file = sourcedir[request.get_type().value, request.source_filename] - if not source_file.exists(): - if source_file.suffix.lower() in (".smx", ".sld"): - # Some DE2 graphics files have the wrong extension - # Fall back to the SMP (beta) extension - other_filename = request.source_filename[:-3] + "smp" - source_file = sourcedir[ - request.get_type().value, - other_filename - ] - request.set_source_filename(other_filename) # The target path must be native target_path = exportdir[request.targetdir, @@ -793,6 +783,7 @@ def _export_terrain( with open(target_path, "wb") as imagefile: imagefile.write(graphics_data) + outqueue.put(0) return else: From 9e518721e6d8abb856aa8678281875bb6249a510 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 01:57:25 +0100 Subject: [PATCH 11/18] convert: Remove synchronizer lock for export dir. --- openage/convert/main.py | 7 ++- .../processor/export/media_exporter.py | 45 +++++++++++-------- openage/convert/tool/api_export.py | 5 +-- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/openage/convert/main.py b/openage/convert/main.py index a0bd7a927b..9bf3f920ba 100644 --- a/openage/convert/main.py +++ b/openage/convert/main.py @@ -1,4 +1,4 @@ -# Copyright 2015-2023 the openage authors. See copying.md for legal info. +# Copyright 2015-2024 the openage authors. See copying.md for legal info. # # pylint: disable=too-many-branches """ @@ -64,7 +64,7 @@ def convert_assets( # add a dir for debug info debug_log_path = converted_path / "debug" / datetime.now().strftime("%Y-%m-%d-%H-%M-%S") debugdir = DirectoryCreator(debug_log_path).root - args.debugdir = AccessSynchronizer(debugdir).root + args.debugdir = debugdir # Create CLI args info debug_cli_args(args.debugdir, args.debug_info, args) @@ -93,9 +93,8 @@ def convert_assets( if not data_dir: return None - # make srcdir and targetdir safe for threaded conversion args.srcdir = AccessSynchronizer(data_dir).root - args.targetdir = AccessSynchronizer(targetdir).root + args.targetdir = targetdir # Create mountpoint info debug_mounts(args.debugdir, args.debug_info, args) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 60ead219e4..9aade3e96a 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -266,24 +266,33 @@ def _export_graphics( request.set_source_filename(other_filename) # The target path must be native - target_path = exportdir[request.targetdir, - request.target_filename].resolve_native_path() + target_path = exportdir[request.targetdir, request.target_filename] # Start an export call in a worker process # The call is asynchronous, so the next worker can be # started immediately - pool.apply_async( - _export_texture, - args=( - idx, - source_file.open("rb").read(), - outqueue, - request.source_filename, - target_path, - palettes, - compression_level, - cache_info - ) + # pool.apply_async( + # _export_texture, + # args=( + # idx, + # source_file.open("rb").read(), + # outqueue, + # request.source_filename, + # target_path, + # palettes, + # compression_level, + # cache_info + # ) + # ) + _export_texture( + idx, + source_file.open("rb").read(), + outqueue, + request.source_filename, + target_path, + palettes, + compression_level, + cache_info ) # Log file information @@ -915,12 +924,12 @@ def _save_png( } if not dry_run: - _, ext = os.path.splitext(target_path) + ext = target_path.suffix.lower() # only allow png - if ext != b".png": + if ext != ".png": raise ValueError("Filename invalid, a texture must be saved" - f" as '*.png', not '*.{ext}'") + f" as '*.png', not '*{ext}'") compression_method = compression_levels.get( compression_level, @@ -933,7 +942,7 @@ def _save_png( ) if not dry_run: - with open(target_path, "wb") as imagefile: + with target_path.open("wb") as imagefile: imagefile.write(png_data) if compr_params: diff --git a/openage/convert/tool/api_export.py b/openage/convert/tool/api_export.py index 1124da8771..b1cd767898 100644 --- a/openage/convert/tool/api_export.py +++ b/openage/convert/tool/api_export.py @@ -12,8 +12,7 @@ from openage.nyan.import_tree import ImportTree from openage.util.fslike.directory import Directory from openage.util.fslike.union import Union, UnionPath -from openage.util.fslike.wrapper import (DirectoryCreator, - Synchronizer as AccessSynchronizer) +from openage.util.fslike.wrapper import DirectoryCreator from ...log import info @@ -50,7 +49,7 @@ def export_api(exportdir: UnionPath) -> None: info("Dumping info file...") targetdir = DirectoryCreator(exportdir).root - outdir = AccessSynchronizer(targetdir).root / "engine" + outdir = targetdir / "engine" # Modpack info file DataExporter.export([modpack.info], outdir) From 1e7e25cda0313e4f86201baa726ed49cd53483df Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 02:10:31 +0100 Subject: [PATCH 12/18] convert: Use fslike paths for export again. --- .../processor/export/media_exporter.py | 74 ++++++++----------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 9aade3e96a..0976980411 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -170,8 +170,8 @@ def _export_blend( request.source_filename] # The target path must be native - target_path = exportdir[request.targetdir, - request.target_filename].resolve_native_path() + targetdir = exportdir[request.targetdir] + target_filename = request.target_filename # Start an export call in a worker process # The call is asynchronous, so the next worker can be @@ -181,7 +181,8 @@ def _export_blend( args=( source_file.open("rb").read(), outqueue, - target_path, + targetdir, + target_filename, blend_mode_count ) ) @@ -271,28 +272,18 @@ def _export_graphics( # Start an export call in a worker process # The call is asynchronous, so the next worker can be # started immediately - # pool.apply_async( - # _export_texture, - # args=( - # idx, - # source_file.open("rb").read(), - # outqueue, - # request.source_filename, - # target_path, - # palettes, - # compression_level, - # cache_info - # ) - # ) - _export_texture( - idx, - source_file.open("rb").read(), - outqueue, - request.source_filename, - target_path, - palettes, - compression_level, - cache_info + pool.apply_async( + _export_texture, + args=( + idx, + source_file.open("rb").read(), + outqueue, + request.source_filename, + target_path, + palettes, + compression_level, + cache_info + ) ) # Log file information @@ -399,8 +390,7 @@ def _export_sound( continue # The target path must be native - target_path = exportdir[request.targetdir, - request.target_filename].resolve_native_path() + target_path = exportdir[request.targetdir, request.target_filename] # Start an export call in a worker process # The call is asynchronous, so the next worker can be @@ -482,8 +472,7 @@ def _export_terrains( request.source_filename] # The target path must be native - target_path = exportdir[request.targetdir, - request.target_filename].resolve_native_path() + target_path = exportdir[request.targetdir, request.target_filename] # Start an export call in a worker process # The call is asynchronous, so the next worker can be @@ -695,7 +684,8 @@ def log_fileinfo( def _export_blend( blendfile_data: bytes, outqueue: multiprocessing.Queue, - target_path: str, + targetdir: Path, + target_filename: str, blend_mode_count: int = None ) -> None: """ @@ -707,7 +697,7 @@ def _export_blend( :param blend_mode_count: Number of blending modes extracted from the source file. :type blendfile_data: bytes :type outqueue: multiprocessing.Queue - :type target_path: str + :type target_path: openage.util.fslike.path.Path :type blend_mode_count: int """ blend_data = Blendomatic(blendfile_data, blend_mode_count) @@ -719,7 +709,7 @@ def _export_blend( merge_frames(texture) _save_png( texture, - f"{target_path}{idx}.png" + targetdir.joinpath(f"{target_filename}_{idx}.png") ) outqueue.put(0) @@ -728,7 +718,7 @@ def _export_blend( def _export_sound( sound_data: bytes, outqueue: multiprocessing.Queue, - target_path: str + target_path: Path ) -> None: """ Convert and export a sound file. @@ -738,7 +728,7 @@ def _export_sound( :param target_path: Path to the resulting sound file. :type sound_data: bytes :type outqueue: multiprocessing.Queue - :type target_path: str + :type target_path: openage.util.fslike.path.Path """ from ...service.export.opus.opusenc import encode encoded = encode(sound_data) @@ -746,7 +736,7 @@ def _export_sound( if isinstance(encoded, (str, int)): raise RuntimeError(f"opusenc failed: {encoded}") - with open(target_path, "wb") as outfile: + with target_path.open("wb") as outfile: outfile.write(encoded) outqueue.put(0) @@ -756,7 +746,7 @@ def _export_terrain( graphics_data: bytes, outqueue: multiprocessing.Queue, source_filename: str, - target_path: str, + target_path: Path, palettes: dict[int, ColorTable], compression_level: int, game_version: GameVersion @@ -774,7 +764,7 @@ def _export_terrain( :type graphics_data: bytes :type outqueue: multiprocessing.Queue :type source_filename: str - :type target_path: str + :type target_path: openage.util.fslike.path.Path :type palettes: dict :type compression_level: int :type game_version: GameVersion @@ -789,7 +779,7 @@ def _export_terrain( pass elif file_ext == "png": - with open(target_path, "wb") as imagefile: + with target_path.open("wb") as imagefile: imagefile.write(graphics_data) outqueue.put(0) @@ -823,7 +813,7 @@ def _export_texture( graphics_data: bytes, outqueue: multiprocessing.Queue, source_filename: str, - target_path: str, + target_path: Path, palettes: dict[int, ColorTable], compression_level: int, cache_info: dict = None @@ -843,7 +833,7 @@ def _export_texture( :type graphics_data: bytes :type outqueue: multiprocessing.Queue :type source_filename: str - :type target_path: str + :type target_path: openage.util.fslike.path.Path :type palettes: dict :type compression_level: int :type cache_info: tuple @@ -895,7 +885,7 @@ def _export_texture( def _save_png( texture: Texture, - target_path: str, + target_path: Path, compression_level: int = 1, cache: dict = None, dry_run: bool = False @@ -909,7 +899,7 @@ def _save_png( :param compression_level: PNG compression level used for the resulting image file. :param dry_run: If True, create the PNG but don't save it as a file. :type texture: Texture - :type target_path: str + :type target_path: openage.util.fslike.path.Path :type compression_level: int :type dry_run: bool """ From ff651a7772697d2106223d9be8f99739ee57e51f Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 02:16:32 +0100 Subject: [PATCH 13/18] convert: Allow specifying number of workers in pool. --- .../processor/export/media_exporter.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 0976980411..23ac515900 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -135,7 +135,8 @@ def _export_blend( requests: list[MediaExportRequest], sourcedir: Path, exportdir: Path, - blend_mode_count: int = None + blend_mode_count: int = None, + jobs: int = None ) -> None: """ Convert and export a blending mode. @@ -146,10 +147,12 @@ def _export_blend( :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder and target filename should be stored in the export request. :param blend_mode_count: Number of blending modes extracted from the source file. + :param jobs: Number of worker processes to use (default: number of CPU cores). :type requests: list[MediaExportRequest] :type sourcedir: Path :type exportdir: Path :type blend_mode_count: int + :type jobs: int """ # Create a manager for sharing data between the workers and main process with multiprocessing.Manager() as manager: @@ -159,7 +162,7 @@ def _export_blend( outqueue = manager.Queue() # Create a pool of workers - with multiprocessing.Pool() as pool: + with multiprocessing.Pool(jobs) as pool: for request in requests: # Feed the worker with the source file data (bytes) from the # main process @@ -216,7 +219,8 @@ def _export_graphics( exportdir: Path, palettes: dict[int, ColorTable], compression_level: int, - cache_info: dict = None + cache_info: dict = None, + jobs: int = None ) -> None: """ Convert and export graphics file requests (multi-threaded). @@ -229,12 +233,14 @@ def _export_graphics( :param palettes: Palettes used by the game. :param compression_level: PNG compression level for the resulting image file. :param cache_info: Media cache information with compression parameters from a previous run. + :param jobs: Number of worker processes to use (default: number of CPU cores). :type requests: list[MediaExportRequest] :type sourcedir: Path :type exportdir: Path :type palettes: dict :type compression_level: int :type cache_info: tuple + :type jobs: int """ # Create a manager for sharing data between the workers and main process with multiprocessing.Manager() as manager: @@ -246,7 +252,7 @@ def _export_graphics( outqueue = manager.Queue() # Create a pool of workers - with multiprocessing.Pool() as pool: + with multiprocessing.Pool(jobs) as pool: for idx, request in enumerate(requests): # Feed the worker with the source file data (bytes) from the # main process @@ -344,6 +350,7 @@ def _export_sound( requests: list[MediaExportRequest], sourcedir: Path, exportdir: Path, + jobs: int = None, **kwargs ) -> None: """ @@ -354,9 +361,11 @@ def _export_sound( source filename should be stored in the export request. :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder and target filename should be stored in the export request. + :param jobs: Number of worker processes to use (default: number of CPU cores). :type requests: list[MediaExportRequest] :type sourcedir: Path :type exportdir: Path + :type jobs: int """ # Create a manager for sharing data between the workers and main process with multiprocessing.Manager() as manager: @@ -366,7 +375,7 @@ def _export_sound( outqueue = manager.Queue() # Create a pool of workers - with multiprocessing.Pool() as pool: + with multiprocessing.Pool(jobs) as pool: sound_count = len(requests) for request in requests: # Feed the worker with the source file data (bytes) from the @@ -433,7 +442,8 @@ def _export_terrains( exportdir: Path, palettes: dict[int, ColorTable], game_version: GameVersion, - compression_level: int + compression_level: int, + jobs: int = None ) -> None: """ Convert and export terrain graphics files (multi-threaded). @@ -446,12 +456,14 @@ def _export_terrains( :param game_version: Game edition and expansion info. :param palettes: Palettes used by the game. :param compression_level: PNG compression level for the resulting image file. + :param jobs: Number of worker processes to use (default: number of CPU cores). :type requests: list[MediaExportRequest] :type sourcedir: Directory :type exportdir: Directory :type palettes: dict :type game_version: GameVersion :type compression_level: int + :type jobs: int """ # Create a manager for sharing data between the workers and main process with multiprocessing.Manager() as manager: @@ -461,7 +473,7 @@ def _export_terrains( outqueue = manager.Queue() # Create a pool of workers - with multiprocessing.Pool() as pool: + with multiprocessing.Pool(jobs) as pool: for request in requests: # Feed the worker with the source file data (bytes) from the # main process From 20b4cdb2e1bb31e31eee3790518fc43b47e69aa2 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 04:55:32 +0100 Subject: [PATCH 14/18] convert: Move pool into main loop. Reduce code duplication. --- .../processor/export/media_exporter.py | 275 +++++++++++++++--- 1 file changed, 237 insertions(+), 38 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 23ac515900..a6e959f796 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -31,6 +31,8 @@ class MediaExporter: """ Provides functions for converting media files and writing them to a targetdir. + + TODO: Avoid code duplication in the export functions. """ @staticmethod @@ -62,49 +64,108 @@ def export( for media_type in export_requests.keys(): cur_export_requests = export_requests[media_type] + read_data_func = None + export_func = None + handle_outqueue_func = None kwargs = {} - if media_type is MediaType.BLEND: + read_data_func = MediaExporter._get_blend_data + export_func = _export_blend + itargs = (sourcedir, exportdir) + kwargs["blend_mode_count"] = args.blend_mode_count info("-- Exporting blend files...") - MediaExporter._export_blend( - cur_export_requests, - sourcedir, - exportdir, - args.blend_mode_count - ) elif media_type is MediaType.GRAPHICS: + read_data_func = MediaExporter._get_graphics_data + export_func = _export_texture + handle_outqueue_func = MediaExporter._handle_graphics_outqueue + itargs = (args.palettes, args.compression_level) + kwargs["cache_info"] = cache_info info("-- Exporting graphics files...") - MediaExporter._export_graphics( - cur_export_requests, - sourcedir, - exportdir, - args.palettes, - args.compression_level, - cache_info - ) elif media_type is MediaType.SOUNDS: + read_data_func = MediaExporter._get_sound_data + export_func = _export_sound + itargs = tuple() kwargs["debugdir"] = args.debugdir kwargs["loglevel"] = args.debug_info info("-- Exporting sound files...") - MediaExporter._export_sound( - cur_export_requests, - sourcedir, - exportdir, - **kwargs - ) elif media_type is MediaType.TERRAIN: + read_data_func = MediaExporter._get_terrain_data + export_func = _export_terrain + itargs = (args.palettes, args.compression_level, args.game_version) info("-- Exporting terrain files...") - MediaExporter._export_terrains( - cur_export_requests, - sourcedir, - exportdir, - args.palettes, - args.game_version, - args.compression_level - ) + + # Create a manager for sharing data between the workers and main process + with multiprocessing.Manager() as manager: + # Workers write the image metadata to this queue + # so that it can be forwarded to the export requests + # + # we cannot do this in a worker process directly + # because the export requests cannot be pickled + outqueue = manager.Queue() + + expected_size = len(cur_export_requests) + + worker_count = args.jobs + if worker_count is None: + worker_count = min(multiprocessing.cpu_count(), expected_size) + + # Create a pool of workers + with multiprocessing.Pool(worker_count) as pool: + for idx, request in enumerate(cur_export_requests): + # Feed the worker with the source file data (bytes) from the + # main process + # + # This is necessary because some image files are inside an + # archive and cannot be accessed asynchronously + source_data = read_data_func(request, sourcedir, **kwargs) + if source_data is None: + expected_size -= 1 + continue + + # The target path must be native + target_path = exportdir[request.targetdir, request.target_filename] + + # Start an export call in a worker process + # The call is asynchronous, so the next worker can be + # started immediately + pool.apply_async( + export_func, + args=( + idx, + source_data, + outqueue, + request.source_filename, + target_path, + *itargs + ), + kwds=kwargs + ) + + # Log file information + if get_loglevel() <= logging.DEBUG: + MediaExporter.log_fileinfo( + sourcedir[request.get_type().value, request.source_filename], + exportdir[request.targetdir, request.target_filename] + ) + + # Show progress + MediaExporter._show_progress(outqueue.qsize(), expected_size) + + # Close the pool since all workers have been started + pool.close() + + # Show progress for remaining workers + while outqueue.qsize() < expected_size: + MediaExporter._show_progress(outqueue.qsize(), expected_size) + + # Wait for all workers to finish + pool.join() + + if handle_outqueue_func: + handle_outqueue_func(outqueue, cur_export_requests) if args.debug_info > 5: cachedata = {} @@ -130,6 +191,138 @@ def export( args.game_version ) + @staticmethod + def _get_blend_data( + request: MediaExportRequest, + sourcedir: Path, + **kwargs # pylint: disable=unused-argument + ) -> bytes: + """ + Get the raw file data of a blending mask. + + :param request: Export request for a blending mask. + :param sourcedir: Directory where all media assets are mounted. + :type request: MediaExportRequest + :type sourcedir: Path + """ + source_file = sourcedir[request.get_type().value, + request.source_filename] + + return source_file.open("rb").read() + + @staticmethod + def _get_graphics_data( + request: MediaExportRequest, + sourcedir: Path, + **kwargs # pylint: disable=unused-argument + ) -> bytes: + """ + Get the raw file data of a graphics file. + + :param request: Export request for a graphics file. + :param sourcedir: Directory where all media assets are mounted. + :type request: MediaExportRequest + :type sourcedir: Path + """ + source_file = sourcedir[request.get_type().value, + request.source_filename] + if not source_file.exists(): + if source_file.suffix.lower() in (".smx", ".sld"): + # Some DE2 graphics files have the wrong extension + # Fall back to the SMP (beta) extension + other_filename = request.source_filename[:-3] + "smp" + source_file = sourcedir[ + request.get_type().value, + other_filename + ] + request.set_source_filename(other_filename) + + return source_file.open("rb").read() + + @staticmethod + def _get_sound_data( + request: MediaExportRequest, + sourcedir: Path, + **kwargs + ) -> bytes | None: + """ + Get the raw file data of a sound file. + + :param request: Export request for a sound file. + :param sourcedir: Directory where all media assets are mounted. + :type request: MediaExportRequest + :type sourcedir: Path + """ + source_file = sourcedir[request.get_type().value, + request.source_filename] + + if not source_file.is_file(): + # TODO: Filter files that do not exist out sooner + debug_info.debug_not_found_sounds(kwargs["debugdir"], + kwargs["loglevel"], + source_file) + return None + + return source_file.open("rb").read() + + @staticmethod + def _get_terrain_data( + request: MediaExportRequest, + sourcedir: Path, + **kwargs # pylint: disable=unused-argument + ) -> bytes: + """ + Get the raw file data of a terrain graphics file. + + :param request: Export request for a terrain graphics file. + :param sourcedir: Directory where all media assets are mounted. + :type request: MediaExportRequest + :type sourcedir: Path + """ + source_file = sourcedir[request.get_type().value, + request.source_filename] + + return source_file.open("rb").read() + + @staticmethod + def _handle_graphics_outqueue( + outqueue: multiprocessing.Queue, + requests: list[MediaExportRequest] + ): + """ + Collect the metadata from the workers and forward it to the + export requests. + + This must be called before the manager of the queue is shutdown! + + :param outqueue: Queue for passing metadata to the main process. + :param requests: Export requests for graphics files. + :type outqueue: multiprocessing.Queue + :type requests: list[MediaExportRequest] + """ + while not outqueue.empty(): + idx, metadata = outqueue.get() + update_data = {requests[idx].target_filename: metadata} + requests[idx].set_changed() + requests[idx].notify_observers(update_data) + requests[idx].clear_changed() + + @staticmethod + def _show_progress( + current_size: int, + total_size: int, + ): + """ + Show the progress of the export process. + + :param current_size: Number of files that have been exported. + :param total_size: Total number of files to export. + :type current_size: int + :type total_size: int + """ + print(f"-- Files done: {format_progress(current_size, total_size)}", + end = "\r", flush = True) + @staticmethod def _export_blend( requests: list[MediaExportRequest], @@ -694,8 +887,10 @@ def log_fileinfo( def _export_blend( + request_id: int, blendfile_data: bytes, outqueue: multiprocessing.Queue, + source_filename: str, # pylint: disable=unused-argument targetdir: Path, target_filename: str, blend_mode_count: int = None @@ -724,13 +919,16 @@ def _export_blend( targetdir.joinpath(f"{target_filename}_{idx}.png") ) - outqueue.put(0) + outqueue.put(request_id) def _export_sound( + request_id: int, sound_data: bytes, outqueue: multiprocessing.Queue, - target_path: Path + source_filename: str, # pylint: disable=unused-argument + target_path: Path, + **kwargs # pylint: disable=unused-argument ) -> None: """ Convert and export a sound file. @@ -751,10 +949,11 @@ def _export_sound( with target_path.open("wb") as outfile: outfile.write(encoded) - outqueue.put(0) + outqueue.put(request_id) def _export_terrain( + request_id: int, graphics_data: bytes, outqueue: multiprocessing.Queue, source_filename: str, @@ -794,7 +993,7 @@ def _export_terrain( with target_path.open("wb") as imagefile: imagefile.write(graphics_data) - outqueue.put(0) + outqueue.put(request_id) return else: @@ -817,11 +1016,11 @@ def _export_terrain( compression_level=compression_level ) - outqueue.put(0) + outqueue.put(request_id) def _export_texture( - export_request_id: int, + request_id: int, graphics_data: bytes, outqueue: multiprocessing.Queue, source_filename: str, @@ -833,7 +1032,7 @@ def _export_texture( """ Convert and export a graphics file to a PNG texture. - :param export_request_id: ID of the export request. + :param request_id: ID of the export request. :param graphics_data: Raw file data of the graphics file. :param outqueue: Queue for passing the image metadata to the main process. :param source_filename: Filename of the source file. @@ -841,7 +1040,7 @@ def _export_texture( :param palettes: Palettes used by the game. :param compression_level: PNG compression level for the resulting image file. :param cache_info: Media cache information with compression parameters from a previous run. - :type export_request_id: int + :type request_id: int :type graphics_data: bytes :type outqueue: multiprocessing.Queue :type source_filename: str @@ -891,7 +1090,7 @@ def _export_texture( compression_level=compression_level, cache=compr_cache ) - metadata = (export_request_id, texture.get_metadata().copy()) + metadata = (request_id, texture.get_metadata().copy()) outqueue.put(metadata) From 2473a5901fba74570e63ca5f7719a9ba3a88d410 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 04:58:17 +0100 Subject: [PATCH 15/18] refactor: Remove obsolete export methods. --- .../processor/export/media_exporter.py | 394 ------------------ 1 file changed, 394 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index a6e959f796..275f1e0708 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -323,400 +323,6 @@ def _show_progress( print(f"-- Files done: {format_progress(current_size, total_size)}", end = "\r", flush = True) - @staticmethod - def _export_blend( - requests: list[MediaExportRequest], - sourcedir: Path, - exportdir: Path, - blend_mode_count: int = None, - jobs: int = None - ) -> None: - """ - Convert and export a blending mode. - - :param requests: Export requests for blending masks. - :param sourcedir: Directory where all media assets are mounted. Source subfolder and - source filename should be stored in the export request. - :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder - and target filename should be stored in the export request. - :param blend_mode_count: Number of blending modes extracted from the source file. - :param jobs: Number of worker processes to use (default: number of CPU cores). - :type requests: list[MediaExportRequest] - :type sourcedir: Path - :type exportdir: Path - :type blend_mode_count: int - :type jobs: int - """ - # Create a manager for sharing data between the workers and main process - with multiprocessing.Manager() as manager: - # Create a queue for data sharing - # it's not actually used for passing data, only for counting - # finished tasks - outqueue = manager.Queue() - - # Create a pool of workers - with multiprocessing.Pool(jobs) as pool: - for request in requests: - # Feed the worker with the source file data (bytes) from the - # main process - # - # This is necessary because some image files are inside an - # archive and cannot be accessed asynchronously - source_file = sourcedir[request.get_type().value, - request.source_filename] - - # The target path must be native - targetdir = exportdir[request.targetdir] - target_filename = request.target_filename - - # Start an export call in a worker process - # The call is asynchronous, so the next worker can be - # started immediately - pool.apply_async( - _export_blend, - args=( - source_file.open("rb").read(), - outqueue, - targetdir, - target_filename, - blend_mode_count - ) - ) - - # Log file information - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[request.targetdir, request.target_filename] - ) - - # Show progress - print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", - end = "\r", flush = True) - - # Close the pool since all workers have been started - pool.close() - - # Show progress for remaining workers - while outqueue.qsize() < len(requests): - print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", - end = "\r", flush = True) - - # Wait for all workers to finish - pool.join() - - @staticmethod - def _export_graphics( - requests: list[MediaExportRequest], - sourcedir: Path, - exportdir: Path, - palettes: dict[int, ColorTable], - compression_level: int, - cache_info: dict = None, - jobs: int = None - ) -> None: - """ - Convert and export graphics file requests (multi-threaded). - - :param requests: Export requests for graphics files. - :param sourcedir: Directory where all media assets are mounted. Source subfolder and - source filename should be stored in the export request. - :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder - and target filename should be stored in the export request. - :param palettes: Palettes used by the game. - :param compression_level: PNG compression level for the resulting image file. - :param cache_info: Media cache information with compression parameters from a previous run. - :param jobs: Number of worker processes to use (default: number of CPU cores). - :type requests: list[MediaExportRequest] - :type sourcedir: Path - :type exportdir: Path - :type palettes: dict - :type compression_level: int - :type cache_info: tuple - :type jobs: int - """ - # Create a manager for sharing data between the workers and main process - with multiprocessing.Manager() as manager: - # Workers write the image metadata to this queue - # so that it can be forwarded to the export requests - # - # we cannot do this in a worker process directly - # because the export requests cannot be pickled - outqueue = manager.Queue() - - # Create a pool of workers - with multiprocessing.Pool(jobs) as pool: - for idx, request in enumerate(requests): - # Feed the worker with the source file data (bytes) from the - # main process - # - # This is necessary because some image files are inside an - # archive and cannot be accessed asynchronously - source_file = sourcedir[request.get_type().value, - request.source_filename] - if not source_file.exists(): - if source_file.suffix.lower() in (".smx", ".sld"): - # Some DE2 graphics files have the wrong extension - # Fall back to the SMP (beta) extension - other_filename = request.source_filename[:-3] + "smp" - source_file = sourcedir[ - request.get_type().value, - other_filename - ] - request.set_source_filename(other_filename) - - # The target path must be native - target_path = exportdir[request.targetdir, request.target_filename] - - # Start an export call in a worker process - # The call is asynchronous, so the next worker can be - # started immediately - pool.apply_async( - _export_texture, - args=( - idx, - source_file.open("rb").read(), - outqueue, - request.source_filename, - target_path, - palettes, - compression_level, - cache_info - ) - ) - - # Log file information - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[request.targetdir, request.target_filename] - ) - - # Show progress - print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", - end = "\r", flush = True) - - # Close the pool since all workers have been started - pool.close() - - # Show progress for remaining workers - while outqueue.qsize() < len(requests): - print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", - end = "\r", flush = True) - - # Wait for all workers to finish - pool.join() - - # Collect the metadata from the workers and forward it to the - # export requests - while not outqueue.empty(): - idx, metadata = outqueue.get() - update_data = {requests[idx].target_filename: metadata} - requests[idx].set_changed() - requests[idx].notify_observers(update_data) - requests[idx].clear_changed() - - @staticmethod - def _export_interface( - export_request: MediaExportRequest, - sourcedir: Path, - **kwargs - ) -> None: - """ - Convert and export a sprite file. - """ - # TODO: Implement - - @staticmethod - def _export_palette( - export_request: MediaExportRequest, - sourcedir: Path, - **kwargs - ) -> None: - """ - Convert and export a palette file. - """ - # TODO: Implement - - @staticmethod - def _export_sound( - requests: list[MediaExportRequest], - sourcedir: Path, - exportdir: Path, - jobs: int = None, - **kwargs - ) -> None: - """ - Convert and export sound files (multi-threaded). - - :param requests: Export requests for sound files. - :param sourcedir: Directory where all media assets are mounted. Source subfolder and - source filename should be stored in the export request. - :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder - and target filename should be stored in the export request. - :param jobs: Number of worker processes to use (default: number of CPU cores). - :type requests: list[MediaExportRequest] - :type sourcedir: Path - :type exportdir: Path - :type jobs: int - """ - # Create a manager for sharing data between the workers and main process - with multiprocessing.Manager() as manager: - # Create a queue for data sharing - # it's not actually used for passing data, only for counting - # finished tasks - outqueue = manager.Queue() - - # Create a pool of workers - with multiprocessing.Pool(jobs) as pool: - sound_count = len(requests) - for request in requests: - # Feed the worker with the source file data (bytes) from the - # main process - # - # This is necessary because some image files are inside an - # archive and cannot be accessed asynchronously - source_file = sourcedir[request.get_type().value, - request.source_filename] - - if source_file.is_file(): - with source_file.open_r() as infile: - media_file = infile.read() - - else: - # TODO: Filter files that do not exist out sooner - debug_info.debug_not_found_sounds(kwargs["debugdir"], - kwargs["loglevel"], - source_file) - sound_count -= 1 - continue - - # The target path must be native - target_path = exportdir[request.targetdir, request.target_filename] - - # Start an export call in a worker process - # The call is asynchronous, so the next worker can be - # started immediately - pool.apply_async( - _export_sound, - args=( - media_file, - outqueue, - target_path - ) - ) - - # Log file information - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[request.targetdir, request.target_filename] - ) - - # Show progress - print(f"-- Files done: {format_progress(outqueue.qsize(), sound_count)}", - end = "\r", flush = True) - - # Close the pool since all workers have been started - pool.close() - - # Show progress for remaining workers - while outqueue.qsize() < sound_count: - print(f"-- Files done: {format_progress(outqueue.qsize(), sound_count)}", - end = "\r", flush = True) - - # Wait for all workers to finish - pool.join() - - @staticmethod - def _export_terrains( - requests: list[MediaExportRequest], - sourcedir: Path, - exportdir: Path, - palettes: dict[int, ColorTable], - game_version: GameVersion, - compression_level: int, - jobs: int = None - ) -> None: - """ - Convert and export terrain graphics files (multi-threaded). - - :param requests: Export requests for terrain graphics files. - :param sourcedir: Directory where all media assets are mounted. Source subfolder and - source filename should be stored in the export request. - :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder - and target filename should be stored in the export request. - :param game_version: Game edition and expansion info. - :param palettes: Palettes used by the game. - :param compression_level: PNG compression level for the resulting image file. - :param jobs: Number of worker processes to use (default: number of CPU cores). - :type requests: list[MediaExportRequest] - :type sourcedir: Directory - :type exportdir: Directory - :type palettes: dict - :type game_version: GameVersion - :type compression_level: int - :type jobs: int - """ - # Create a manager for sharing data between the workers and main process - with multiprocessing.Manager() as manager: - # Create a queue for data sharing - # it's not actually used for passing data, only for counting - # finished tasks - outqueue = manager.Queue() - - # Create a pool of workers - with multiprocessing.Pool(jobs) as pool: - for request in requests: - # Feed the worker with the source file data (bytes) from the - # main process - # - # This is necessary because some image files are inside an - # archive and cannot be accessed asynchronously - source_file = sourcedir[request.get_type().value, - request.source_filename] - - # The target path must be native - target_path = exportdir[request.targetdir, request.target_filename] - - # Start an export call in a worker process - # The call is asynchronous, so the next worker can be - # started immediately - pool.apply_async( - _export_terrain, - args=( - source_file.open("rb").read(), - outqueue, - request.source_filename, - target_path, - palettes, - compression_level, - game_version - ) - ) - - # Log file information - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - source_file, - exportdir[request.targetdir, request.target_filename] - ) - - # Show progress - print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", - end = "\r", flush = True) - - # Close the pool since all workers have been started - pool.close() - - # Show progress for remaining workers - while outqueue.qsize() < len(requests): - print(f"-- Files done: {format_progress(outqueue.qsize(), len(requests))}", - end = "\r", flush = True) - - # Wait for all workers to finish - pool.join() - @staticmethod def _get_media_cache( export_request: MediaExportRequest, From 477557051aa067a516655dacf4fd7955d60b4d43 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 05:54:26 +0100 Subject: [PATCH 16/18] convert: Fix job count not in args when started from 'main' entrypoint. --- openage/convert/main.py | 4 ++++ openage/convert/processor/export/media_exporter.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/openage/convert/main.py b/openage/convert/main.py index 9bf3f920ba..f89356402f 100644 --- a/openage/convert/main.py +++ b/openage/convert/main.py @@ -53,6 +53,10 @@ def convert_assets( if "compression_level" not in vars(args): args.compression_level = 1 + # Set worker count for multi-threading if it was not set + if "jobs" not in vars(args): + args.jobs = None + # Set verbosity for debug output if "debug_info" not in vars(args) or not args.debug_info: if args.devmode: diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index 275f1e0708..c4a14b1ab9 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -64,9 +64,15 @@ def export( for media_type in export_requests.keys(): cur_export_requests = export_requests[media_type] + # Function for reading the source file data read_data_func = None + + # Multi-threaded function for exporting the source file data export_func = None + + # Optional function for handling data in the outqueue handle_outqueue_func = None + kwargs = {} if media_type is MediaType.BLEND: read_data_func = MediaExporter._get_blend_data From 4dbab823bbeb899a0ad60850d472dd4796cea170 Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 07:10:08 +0100 Subject: [PATCH 17/18] convert: Allow both multi-threaded and single-threaded export. --- .../processor/export/media_exporter.py | 260 +++++++++++++----- 1 file changed, 192 insertions(+), 68 deletions(-) diff --git a/openage/convert/processor/export/media_exporter.py b/openage/convert/processor/export/media_exporter.py index c4a14b1ab9..1d824a532e 100644 --- a/openage/convert/processor/export/media_exporter.py +++ b/openage/convert/processor/export/media_exporter.py @@ -10,6 +10,7 @@ import logging import os import multiprocessing +import queue from openage.convert.entity_object.export.texture import Texture from openage.convert.service import debug_info @@ -103,75 +104,30 @@ def export( itargs = (args.palettes, args.compression_level, args.game_version) info("-- Exporting terrain files...") - # Create a manager for sharing data between the workers and main process - with multiprocessing.Manager() as manager: - # Workers write the image metadata to this queue - # so that it can be forwarded to the export requests - # - # we cannot do this in a worker process directly - # because the export requests cannot be pickled - outqueue = manager.Queue() - - expected_size = len(cur_export_requests) - - worker_count = args.jobs - if worker_count is None: - worker_count = min(multiprocessing.cpu_count(), expected_size) - - # Create a pool of workers - with multiprocessing.Pool(worker_count) as pool: - for idx, request in enumerate(cur_export_requests): - # Feed the worker with the source file data (bytes) from the - # main process - # - # This is necessary because some image files are inside an - # archive and cannot be accessed asynchronously - source_data = read_data_func(request, sourcedir, **kwargs) - if source_data is None: - expected_size -= 1 - continue - - # The target path must be native - target_path = exportdir[request.targetdir, request.target_filename] - - # Start an export call in a worker process - # The call is asynchronous, so the next worker can be - # started immediately - pool.apply_async( - export_func, - args=( - idx, - source_data, - outqueue, - request.source_filename, - target_path, - *itargs - ), - kwds=kwargs - ) - - # Log file information - if get_loglevel() <= logging.DEBUG: - MediaExporter.log_fileinfo( - sourcedir[request.get_type().value, request.source_filename], - exportdir[request.targetdir, request.target_filename] - ) - - # Show progress - MediaExporter._show_progress(outqueue.qsize(), expected_size) - - # Close the pool since all workers have been started - pool.close() - - # Show progress for remaining workers - while outqueue.qsize() < expected_size: - MediaExporter._show_progress(outqueue.qsize(), expected_size) - - # Wait for all workers to finish - pool.join() + if args.jobs == 1: + MediaExporter._export_singlethreaded( + cur_export_requests, + sourcedir, + exportdir, + read_data_func, + export_func, + handle_outqueue_func, + itargs, + kwargs + ) - if handle_outqueue_func: - handle_outqueue_func(outqueue, cur_export_requests) + else: + MediaExporter._export_multithreaded( + cur_export_requests, + sourcedir, + exportdir, + read_data_func, + export_func, + handle_outqueue_func, + itargs, + kwargs, + args.jobs + ) if args.debug_info > 5: cachedata = {} @@ -197,6 +153,174 @@ def export( args.game_version ) + @staticmethod + def _export_singlethreaded( + requests: list[MediaExportRequest], + sourcedir: Path, + exportdir: Path, + read_data_func: typing.Callable, + export_func: typing.Callable, + handle_outqueue_func: typing.Callable | None, + itargs: tuple, + kwargs: dict + ): + """ + Export media files in a single thread. + + :param requests: Export requests for media files. + :param sourcedir: Directory where all media assets are mounted. Source subfolder and + source filename should be stored in the export request. + :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder + and target filename should be stored in the export request. + :param read_data_func: Function for reading the source file data. + :param export_func: Function for exporting media files. + :param handle_outqueue_func: Optional function for handling data in the outqueue. + :param itargs: Arguments for the export function. + :param kwargs: Keyword arguments for the export function. + :type requests: list[MediaExportRequest] + :type sourcedir: Path + :type exportdir: Path + :type read_data_func: typing.Callable + :type export_func: typing.Callable + :type handle_outqueue_func: typing.Callable + :type itargs: tuple + :type kwargs: dict + """ + single_queue = queue.Queue() + for idx, request in enumerate(requests): + source_data = read_data_func(request, sourcedir, **kwargs) + if source_data is None: + continue + + target_path = exportdir[request.targetdir, request.target_filename] + + export_func( + idx, + source_data, + single_queue, + request.source_filename, + target_path, + *itargs, + **kwargs + ) + + if get_loglevel() <= logging.DEBUG: + MediaExporter.log_fileinfo( + sourcedir[request.get_type().value, request.source_filename], + exportdir[request.targetdir, request.target_filename] + ) + + MediaExporter._show_progress(idx + 1, len(requests)) + + if handle_outqueue_func: + handle_outqueue_func(single_queue, requests) + + @staticmethod + def _export_multithreaded( + requests: list[MediaExportRequest], + sourcedir: Path, + exportdir: Path, + read_data_func: typing.Callable, + export_func: typing.Callable, + handle_outqueue_func: typing.Callable | None, + itargs: tuple, + kwargs: dict, + job_count: int = None + ): + """ + Export media files in multiple threads. + + :param requests: Export requests for media files. + :param sourcedir: Directory where all media assets are mounted. Source subfolder and + source filename should be stored in the export request. + :param exportdir: Directory the resulting file(s) will be exported to. Target subfolder + and target filename should be stored in the export request. + :param read_data_func: Function for reading the source file data. + :param export_func: Function for exporting media files. + :param handle_outqueue_func: Optional function for handling data in the outqueue. + :param itargs: Arguments for the export function. + :param kwargs: Keyword arguments for the export function. + :param job_count: Number of worker processes to use. + :type requests: list[MediaExportRequest] + :type sourcedir: Path + :type exportdir: Path + :type read_data_func: typing.Callable + :type export_func: typing.Callable + :type handle_outqueue_func: typing.Callable + :type itargs: tuple + :type kwargs: dict + :type job_count: int + """ + worker_count = job_count + if worker_count is None: + # Small optimization that saves some time for small exports + worker_count = min(multiprocessing.cpu_count(), len(requests)) + + # Create a manager for sharing data between the workers and main process + with multiprocessing.Manager() as manager: + # Workers write the image metadata to this queue + # so that it can be forwarded to the export requests + # + # we cannot do this in a worker process directly + # because the export requests cannot be pickled + outqueue = manager.Queue() + + expected_size = len(requests) + + # Create a pool of workers + with multiprocessing.Pool(worker_count) as pool: + for idx, request in enumerate(requests): + # Feed the worker with the source file data (bytes) from the + # main process + # + # This is necessary because some image files are inside an + # archive and cannot be accessed asynchronously + source_data = read_data_func(request, sourcedir, **kwargs) + if source_data is None: + expected_size -= 1 + continue + + target_path = exportdir[request.targetdir, request.target_filename] + + # Start an export call in a worker process + # The call is asynchronous, so the next worker can be + # started immediately + pool.apply_async( + export_func, + args=( + idx, + source_data, + outqueue, + request.source_filename, + target_path, + *itargs + ), + kwds=kwargs + ) + + # Log file information + if get_loglevel() <= logging.DEBUG: + MediaExporter.log_fileinfo( + sourcedir[request.get_type().value, request.source_filename], + exportdir[request.targetdir, request.target_filename] + ) + + # Show progress + MediaExporter._show_progress(outqueue.qsize(), expected_size) + + # Close the pool since all workers have been started + pool.close() + + # Show progress for remaining workers + while outqueue.qsize() < expected_size: + MediaExporter._show_progress(outqueue.qsize(), expected_size) + + # Wait for all workers to finish + pool.join() + + if handle_outqueue_func: + handle_outqueue_func(outqueue, requests) + @staticmethod def _get_blend_data( request: MediaExportRequest, From 97a67c7d83b0ed41ea2a3764e29725926e739acb Mon Sep 17 00:00:00 2001 From: heinezen Date: Sun, 21 Jan 2024 15:03:04 +0100 Subject: [PATCH 18/18] util: Replace file collection lambdas with file entry objects. --- openage/cabextract/cab.py | 43 +++++++---- .../convert/value_object/read/media/drs.py | 32 +++++--- openage/util/fslike/filecollection.py | 76 +++++++++++++------ 3 files changed, 103 insertions(+), 48 deletions(-) diff --git a/openage/cabextract/cab.py b/openage/cabextract/cab.py index 3f04d45947..dde6d09aac 100644 --- a/openage/cabextract/cab.py +++ b/openage/cabextract/cab.py @@ -1,4 +1,4 @@ -# Copyright 2015-2023 the openage authors. See copying.md for legal info. +# Copyright 2015-2024 the openage authors. See copying.md for legal info. """ Provides CABFile, an extractor for the MSCAB format. @@ -18,7 +18,7 @@ from ..util.filelike.readonly import PosSavingReadOnlyFileLikeObject from ..util.filelike.stream import StreamFragment from ..util.files import read_guaranteed, read_nullterminated_string -from ..util.fslike.filecollection import FileCollection +from ..util.fslike.filecollection import FileCollection, FileEntry from ..util.math import INF from ..util.strings import try_decode from ..util.struct import NamedStruct, Flags @@ -216,6 +216,28 @@ def verify_checksum(self) -> Union[None, NoReturn]: raise ValueError("checksum error in MSCAB data block") +class CABEntry(FileEntry): + """ + Entry in a CAB file. + """ + + def __init__(self, fileobj: CFFile): + self.fileobj = fileobj + + def open_r(self): + return StreamFragment( + self.fileobj.folder.plain_stream, + self.fileobj.pos, + self.fileobj.size + ) + + def size(self) -> int: + return self.fileobj.size + + def mtime(self) -> float: + return self.fileobj.timestamp + + class CABFile(FileCollection): """ The actual file system-like CAB object. @@ -275,20 +297,9 @@ def __init__(self, cab: FileLikeObject, offset: int = 0): "CABFile has multiple entries with the same path: " + b'/'.join(fileobj.path).decode()) - def open_r(fileobj=fileobj): - """ Returns a opened ('rb') file-like object for fileobj. """ - return StreamFragment( - fileobj.folder.plain_stream, - fileobj.pos, - fileobj.size - ) - - self.add_fileentry(fileobj.path, ( - open_r, - None, - lambda fileobj=fileobj: fileobj.size, - lambda fileobj=fileobj: fileobj.timestamp - )) + file_entry = CABEntry(fileobj) + + self.add_fileentry(fileobj.path, file_entry) def __repr__(self): return "CABFile" diff --git a/openage/convert/value_object/read/media/drs.py b/openage/convert/value_object/read/media/drs.py index 72bf6a0679..8e3410f614 100644 --- a/openage/convert/value_object/read/media/drs.py +++ b/openage/convert/value_object/read/media/drs.py @@ -1,4 +1,4 @@ -# Copyright 2013-2022 the openage authors. See copying.md for legal info. +# Copyright 2013-2024 the openage authors. See copying.md for legal info. """ Code for reading Genie .DRS archives. @@ -12,7 +12,7 @@ from .....log import spam, dbg from .....util.filelike.stream import StreamFragment -from .....util.fslike.filecollection import FileCollection +from .....util.fslike.filecollection import FileCollection, FileEntry from .....util.strings import decode_until_null from .....util.struct import NamedStruct @@ -87,6 +87,23 @@ class DRSFileInfo(NamedStruct): file_size = "i" +class DRSEntry(FileEntry): + """ + Entry in a DRS archive. + """ + + def __init__(self, fileobj: GuardedFile, offset: int, size: int): + self.fileobj = fileobj + self.offset = offset + self.entry_size = size + + def open_r(self): + return StreamFragment(self.fileobj, self.offset, self.entry_size) + + def size(self) -> int: + return self.entry_size + + class DRS(FileCollection): """ represents a file archive in DRS format. @@ -133,14 +150,9 @@ def __init__(self, fileobj: GuardedFile, game_version: GameVersion): self.tables.append(table_header) for filename, offset, size in self.read_tables(): - def open_r(offset=offset, size=size): - """ Returns a opened ('rb') file-like object for fileobj. """ - return StreamFragment(self.fileobj, offset, size) - - self.add_fileentry( - [filename.encode()], - (open_r, None, lambda size=size: size, None) - ) + file_entry = DRSEntry(self.fileobj, offset, size) + + self.add_fileentry([filename.encode()], file_entry) def read_tables(self) -> typing.Generator[tuple[str, str, str], None, None]: """ diff --git a/openage/util/fslike/filecollection.py b/openage/util/fslike/filecollection.py index 86df01533b..f77b034099 100644 --- a/openage/util/fslike/filecollection.py +++ b/openage/util/fslike/filecollection.py @@ -1,9 +1,11 @@ -# Copyright 2015-2022 the openage authors. See copying.md for legal info. +# Copyright 2015-2024 the openage authors. See copying.md for legal info. """ Provides Filecollection, a utility class for combining multiple file-like objects to a FSLikeObject. """ +from __future__ import annotations +import typing from collections import OrderedDict from io import UnsupportedOperation @@ -12,6 +14,9 @@ from .abstract import FSLikeObject from .path import Path +if typing.TYPE_CHECKING: + from openage.util.filelike.stream import StreamFragment + class FileCollection(FSLikeObject): """ @@ -59,14 +64,12 @@ def get_direntries(self, parts=None, create: bool = False) -> tuple[OrderedDict, return entries - def add_fileentry(self, parts, fileentry): + def add_fileentry(self, parts, fileentry: FileEntry): """ Adds a file entry (and parent directory entries, if needed). This method should not be called directly; instead, use the add_file method of Path objects that were obtained from this. - - fileentry must be open_r, open_w, size, mtime. """ if not parts: raise IsADirectoryError("FileCollection.root is a directory") @@ -79,11 +82,9 @@ def add_fileentry(self, parts, fileentry): entries[0][name] = fileentry - def get_fileentry(self, parts): + def get_fileentry(self, parts) -> FileEntry: """ Gets a file entry. Helper method for internal use. - - Returns open_r, open_w, size, mtime """ if not parts: raise IsADirectoryError( @@ -101,24 +102,30 @@ def get_fileentry(self, parts): return entries[0][name] - def open_r(self, parts) -> None: - open_r, _, _, _ = self.get_fileentry(parts) + def open_r(self, parts: list[bytes]) -> StreamFragment: + entry = self.get_fileentry(parts) + + open_r = entry.open_r() if open_r is None: raise UnsupportedOperation( "not readable: " + b"/".join(parts).decode(errors='replace')) - return open_r() + return open_r + + def open_w(self, parts: list[bytes]): + entry = self.get_fileentry(parts) - def open_w(self, parts) -> None: - _, open_w, _, _ = self.get_fileentry(parts) + open_w = entry.open_w() if open_w is None: raise UnsupportedOperation( "not writable: " + b"/".join(parts).decode(errors='replace')) + return open_w + def list(self, parts): fileentries, subdirs = self.get_direntries(parts) @@ -126,20 +133,14 @@ def list(self, parts): yield from fileentries def filesize(self, parts) -> int: - _, _, filesize, _ = self.get_fileentry(parts) + entry = self.get_fileentry(parts) - if filesize is None: - return None - - return filesize() + return entry.size() def mtime(self, parts) -> float: - _, _, _, mtime = self.get_fileentry(parts) - - if mtime is None: - return None + entry = self.get_fileentry(parts) - return mtime() + return entry.mtime() def mkdirs(self, parts) -> None: self.get_direntries(parts, create=True) @@ -248,3 +249,34 @@ def add_file_from_path(self, path: Path) -> None: open_w = None self.add_file(path.open_r, open_w, path.filesize, path.mtime) + + +class FileEntry: + """ + Entry in a file collection archive. + """ + # pylint: disable=no-self-use + + def open_r(self) -> StreamFragment: + """ + Returns a file-like object for reading. + """ + raise UnsupportedOperation("FileEntry.open_r") + + def open_w(self): + """ + Returns a file-like object for writing. + """ + raise UnsupportedOperation("FileEntry.open_w") + + def size(self) -> int: + """ + Returns the size of the entr<. + """ + raise UnsupportedOperation("FileEntry.size") + + def mtime(self) -> float: + """ + Returns the modification time of the entry. + """ + raise UnsupportedOperation("FileEntry.mtime")