diff --git a/Makefile b/Makefile index 36485e5..a1723e7 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,8 @@ TMP_DIR := build all: wheels build -build: __init__.py importer.py exporter.py chunky.py textures.py utils.py operators.py props.py slpp.py blender_manifest.toml \ +build: __init__.py importer.py exporter.py chunky.py textures.py utils.py operators.py props.py \ + slpp.py blender_manifest.toml sga.py dow_layout.py \ LICENSE README.md docs/export.md docs/magic.md docs/tts_import.md default_badge.tga default_banner.tga mkdir $(TMP_DIR); \ cp -r ./wheels $^ $(TMP_DIR); \ diff --git a/dow_layout.py b/dow_layout.py new file mode 100644 index 0000000..9bb7941 --- /dev/null +++ b/dow_layout.py @@ -0,0 +1,304 @@ +import abc +import configparser +import contextlib +import dataclasses +import enum +import pathlib +import typing + +from .sga import SgaArchive, SgaPath + + +@enum.unique +class TextureLevel(str, enum.Enum): + HIGH = 'Full' + + +@enum.unique +class SoundLevel(str, enum.Enum): + HIGH = 'Full' + MEDIUM = 'Med' + LOW = 'Low' + + +@enum.unique +class ModelLevel(str, enum.Enum): + HIGH = 'High' + MEDIUM = 'Medium' + LOW = 'Low' + + +T = typing.TypeVar('T') + + +class AbstractSource(abc.ABC): + @abc.abstractmethod + def make_path(self, path: str | pathlib.PurePath) -> 'LayoutPath': + raise NotImplementedError + + @abc.abstractmethod + def exists(self) -> bool: + raise NotImplementedError + + +@dataclasses.dataclass +class DirectoryPath: + full_path: pathlib.Path + root: pathlib.PurePosixPath + + def __getattr__(self, key): + return getattr(self.full_path, key) + + def __truediv__(self, other) -> 'DirectoryPath': + return DirectoryPath(self.full_path / other, self.root) + + def __rtruediv__(self, other) -> 'DirectoryPath': + return DirectoryPath(other / self.full_path, self.root) + + def iterdir(self) -> 'typing.Generator[DirectoryPath, None, None]': + for c in self.full_path.iterdir(): + yield DirectoryPath(c, self.root) + + def __str__(self) -> str: + return str(self.layout_path()) + + @property + def data_size(self): + return self.full_path.stat().st_size + + def layout_path(self) -> pathlib.PurePosixPath: + return self.full_path.relative_to(self.root) + + def __getstate__(self): + return vars(self) + + def __setstate__(self, state): + vars(self).update(state) + +LayoutPath = SgaPath | DirectoryPath + + +class DirectorySource(AbstractSource): + def __init__(self, root: str | pathlib.Path, name: str): + self.root = pathlib.Path(root) + self.name = name + + def make_path(self, path: str | pathlib.PurePath) -> DirectoryPath: + path = pathlib.PurePath(path) + if path.is_absolute(): + path = path.relative_to('/') + return DirectoryPath(self.root / path, self.root) + + def exists(self) -> bool: + return self.root.exists() + + @contextlib.contextmanager + def open(self): + yield self + + def __repr__(self) -> str: + return f'DirectorySource({self.root})' + + +class SgaSource(AbstractSource): + def __init__(self, path: str | pathlib.Path, name: str): + self.path = path + self.name = name + self._archive = None + + @property + def archive(self): + if self._archive is None and self.path.exists(): + self._archive = SgaArchive.parse(self.path) + return self._archive + + def make_path(self, path: str | pathlib.PurePath) -> SgaPath: + return self.archive.make_path(path) + + def exists(self) -> bool: + return self.path.exists() + + @contextlib.contextmanager + def open(self): + archive = self.archive + if archive is not None: + with archive.open(): + yield self + return + yield self + + def __repr__(self) -> str: + return f'SgaSource({self.path})' + + +def iter_path_candidates(part: str) -> typing.Generator[str, None, None]: + yield part + yield part.lower() + yield part.upper() + yield part.title() + + +def try_find_path(root: pathlib.Path, *parts: str) -> pathlib.Path: + curr = root + for part in parts: + for part_case in iter_path_candidates(part): + if (candidate := curr / part_case).exists(): + curr = candidate + break + else: + if curr.is_dir(): + for c in curr.iterdir(): + if c.name.lower() == part.lower(): + curr = c + break + else: + for p in parts: + root /= p + return root + return curr + + +@dataclasses.dataclass +class DowLayout: + default_lang: str = 'english' + default_texture_level: TextureLevel = TextureLevel.HIGH + default_sound_level: SoundLevel = SoundLevel.HIGH + default_model_level: ModelLevel = ModelLevel.HIGH + sources: list[AbstractSource] = dataclasses.field(default_factory=list) + + @classmethod + def from_mod_folder(cls, path: str | pathlib.Path, include_movies: bool = True, include_locale: bool = True) -> 'DowLayout': + path = pathlib.Path(path) + dow_folder = path.parent + res = cls._initilize_defaults(dow_folder) + mod_configs = cls.load_mod_configs_options(dow_folder) + required_mods = [mod_configs.get(path.name.lower(), cls._make_default_mod_config(path.name))] + for required_mod_name in required_mods[0].get('requiredmods', ['dxp2', 'w40k']) + ['engine']: + required_mods.append(mod_configs.get(required_mod_name.lower(), cls._make_default_mod_config(required_mod_name))) + for mod in required_mods: + res.sources.append(DirectorySource(try_find_path(dow_folder, mod['modfolder'], 'Data'), name=mod['modfolder'])) + for folder in mod.get('datafolders', []): + folder = res.interpolate_path(folder) + res.sources.append(SgaSource(try_find_path(dow_folder, mod['modfolder'], f'{folder}.sga'), name=mod['modfolder'])) + for file in mod.get('archivefiles', []): + file = res.interpolate_path(file) + res.sources.append(SgaSource(try_find_path(dow_folder, mod['modfolder'], f'{file}.sga'), name=mod['modfolder'])) + for mod in required_mods: + if include_movies: + res.sources.append(DirectorySource(try_find_path(dow_folder, mod['modfolder'], 'Movies'), name=mod['modfolder'])) + if include_locale: + res.sources.append(DirectorySource(try_find_path(dow_folder, mod['modfolder'], res.interpolate_path('%LOCALE%')), name=mod['modfolder'])) + return res + + @classmethod + def _initilize_defaults(cls, root: pathlib.Path) -> 'DowLayout': + lang_config = cls.load_lang(root) + game_config = cls.load_game_options(root) + res = cls() + res.default_lang = lang_config.get('default', res.default_lang) + res.default_texture_level = game_config.get('texture_level', res.default_texture_level) + res.default_sound_level = game_config.get('texture_level', res.default_sound_level) + res.default_model_level = game_config.get('model_level', res.default_model_level) + return res + + @classmethod + def _make_default_mod_config(cls, folder_name: str) -> dict: + return { + 'modfolder': folder_name, + 'datafolders': ['Data'] + } + + @classmethod + def load_lang(cls, path: pathlib.Path) -> dict: + conf_path = path / 'regions.ini' + if not conf_path.is_file(): + return {} + try: + config = configparser.ConfigParser() + config.read(conf_path) + return { + **{k.lower(): v for k, v in config['mods'].items()}, + 'default': config['global']['lang'], + } + except Exception: + return {} + + @classmethod + def load_game_options(cls, path: pathlib.Path) -> dict: + return {} # TODO + + @classmethod + def load_mod_configs_options(cls, path: pathlib.Path) -> dict: + result = {} + for file in path.iterdir(): + if file.suffix.lower() != '.module' or not file.is_file(): + continue + config = configparser.ConfigParser(interpolation=None, comment_prefixes=('#', ';', '--')) + config.read(file) + config = config['global'] + result[file.stem.lower()] = { + **{k: config[k] for k in ('uiname', 'description', 'modfolder')}, + **{ + f'{key}s': [ + i for _, i in sorted([(k, v) + for k, v in config.items() + if k.startswith(f'{key}.') + ], key=lambda x: int(x[0].rsplit('.')[1])) + ] + for key in ('datafolder', 'archivefile', 'requiredmod') + }, + } + if 'engine' not in result: + result['engine'] = { + 'modfolder': 'engine', + 'archivefiles': ['%LOCALE%\EnginLoc', 'Engine', 'Engine-New'], + } + return result + + def interpolate_path( + self, + path: str, + lang: str = None, + texture_level: TextureLevel = None, + sound_level: SoundLevel = None, + model_level: ModelLevel = None, + ) -> str: + path = path.replace('%LOCALE%', 'Locale/' + (lang or self.default_lang).title()) + path = path.replace('%TEXTURE-LEVEL%', texture_level or self.default_texture_level) + path = path.replace('%SOUND-LEVEL%', sound_level or self.default_sound_level) + path = path.replace('%MODEL-LEVEL%', model_level or self.default_model_level) + return pathlib.PureWindowsPath(path).as_posix() + + def iter_paths(self, path: str | pathlib.PurePath, return_missing: bool = False) -> typing.Generator[LayoutPath, None, None]: + path = pathlib.PurePath(path) + for source in self.sources: + if not source.exists(): + continue + source_path = try_find_path(source.make_path('.'), *path.parts) + if return_missing or source_path.exists(): + yield source_path + + def find(self, path: str | pathlib.PurePath, default: T = None) -> LayoutPath | T: + for p in self.iter_paths(path): + return p + return default + + def iterdir(self, path: str | pathlib.PurePath) -> typing.Generator[LayoutPath, None, None]: + seen_files = set() + for source in self.sources: + if not source.exists(): + continue + source_path = source.make_path(path) + if source_path.exists(): + for i in source_path.iterdir(): + if i.name.lower() not in seen_files: + seen_files.add(i.name.lower()) + yield i + + @contextlib.contextmanager + def open(self): + with contextlib.ExitStack() as stack: + for source in self.sources: + if source.exists(): + stack.enter_context(source.open()) + yield self diff --git a/importer.py b/importer.py index 99d72d1..a3a014a 100644 --- a/importer.py +++ b/importer.py @@ -1,4 +1,5 @@ import dataclasses +import io import pathlib import math import tempfile @@ -8,15 +9,12 @@ from . import textures, utils, props from .chunky import ChunkReader +from .dow_layout import DowLayout, LayoutPath, DirectoryPath from .utils import print -# class BonePropGroup(bpy.types.PropertyGroup): -# stale: bpy.props.BoolProperty(name="Stale", default=False) - -# bpy.utils.register_class(BonePropGroup) - -# bpy.types.PoseBone.dow_settings = bpy.props.PointerProperty(name='DOW settings', type=BonePropGroup) +def open_reader(path: LayoutPath) -> ChunkReader: + return ChunkReader(io.BytesIO(path.read_bytes())) @dataclasses.dataclass @@ -39,6 +37,7 @@ class WhmLoader: def __init__(self, root: pathlib.Path, load_wtp: bool = True, stric_mode: bool = True, context=None): self.root = root + self.layout = DowLayout.from_mod_folder(root) self.wtp_load_enabled = load_wtp self.stric_mode = stric_mode self.bpy_context = context @@ -79,19 +78,19 @@ def CH_DATASSHR(self, reader: ChunkReader): # CH_DATASSHR > - Chunk Handler - M material_path = reader.read_str() # -- Read Texture Path if material_path not in self.loaded_material_paths: - full_material_path = self.root / 'Data' / f'{material_path}.rsh' - if not full_material_path.exists(): + full_material_path = self.layout.find(f'{material_path}.rsh') + material_data = self.layout.find(f'{material_path}.rsh') + if not material_data: self.messages.append(('WARNING', f'Cannot find texture {full_material_path}')) return - with full_material_path.open('rb') as f: - material = self.load_rsh(ChunkReader(f), material_path) # -- create new material + material = self.load_rsh(open_reader(material_data), material_path) # -- create new material if self.wtp_load_enabled: - full_teamcolor_path = self.root / 'Data' / f'{material_path}_default.wtp' - if not full_teamcolor_path.exists(): - self.messages.append(('INFO', f'Cannot find {full_teamcolor_path}')) + teamcolor_path = f'{material_path}_default.wtp' + teamcolor_data = self.layout.find(teamcolor_path) + if not teamcolor_data: + self.messages.append(('INFO', f'Cannot find {teamcolor_path}')) else: - with full_teamcolor_path.open('rb') as f: - self.load_wtp(ChunkReader(f), material_path, material) + self.load_wtp(open_reader(teamcolor_data), material_path, material) self.loaded_material_paths.add(material_path) def load_rsh(self, reader: ChunkReader, material_path: str): @@ -119,8 +118,6 @@ def CH_FOLDTXTR(self, reader: ChunkReader, texture_path: str): # Chunk Handler image_format, width, height, num_mips = reader.read_struct('<4l') current_chunk = reader.read_header('DATADATA') - import tempfile - texture_name = pathlib.Path(texture_path).name with tempfile.TemporaryDirectory() as tmpdir: with open(f'{tmpdir}/{texture_name}.dds', 'wb') as f: @@ -961,33 +958,33 @@ def CH_DATADATA(self, reader: ChunkReader): # - Chunk Handler - Sub Chunk Of FO mesh_name = reader.read_str() # -- Read Mesh Name mesh_path: pathlib.Path = pathlib.Path(reader.read_str()) # -- Read Mesh Path if mesh_path and mesh_path != pathlib.Path(''): - filename = self.root / 'Data' / mesh_path.with_suffix('.whm') - if filename.exists(): - if filename not in loaded_messages: - loaded_messages.add(filename) - self.messages.append(('INFO', f'Loading {filename}')) - with filename.open('rb') as f: - xreffile = ChunkReader(f) - xreffile.skip_relic_chunky() - chunk = xreffile.read_header('DATAFBIF') # -- Read 'File Burn Info' Header - xreffile.skip(chunk.size) # -- Skip 'File Burn Info' Chunk - chunk = xreffile.read_header('FOLDRSGM') # -- Skip 'Folder SGM' Header - group_name = f'xref_{chunk.name}' - for current_chunk in xreffile.iter_chunks(): # -- Read Chunks Until End Of File - match current_chunk.typeid: - case 'DATASSHR': self.CH_DATASSHR(xreffile) # -- DATASSHR - Texture Data - case 'DATASKEL': self.CH_DATASKEL(xreffile, xref=True) # -- FOLDMSLC - Skeleton Data - case 'FOLDMSGR': # -- Read FOLDMSLC Chunks - for current_chunk in xreffile.iter_chunks(): # -- Read FOLDMSLC Chunks - if current_chunk.typeid == 'FOLDMSLC' and current_chunk.name.lower() == mesh_name.lower(): - mesh_obj = self.CH_FOLDMSLC(xreffile, mesh_name, xref=True, group_name=group_name) - props.setup_property(mesh_obj, 'xref_source', str(mesh_path)) - else: - xreffile.skip(current_chunk.size) - if current_chunk.typeid == 'DATABVOL': - break - # case 'DATAMARK': self.CH_DATAMARK(xreffile) - case _: xreffile.skip(current_chunk.size) + filename = mesh_path.with_suffix('.whm') + file_data = self.layout.find(filename) + if file_data: + if mesh_path not in loaded_messages: + loaded_messages.add(mesh_path) + self.messages.append(('INFO', f'Loading {mesh_path}')) + xreffile = open_reader(file_data) + xreffile.skip_relic_chunky() + chunk = xreffile.read_header('DATAFBIF') # -- Read 'File Burn Info' Header + xreffile.skip(chunk.size) # -- Skip 'File Burn Info' Chunk + chunk = xreffile.read_header('FOLDRSGM') # -- Skip 'Folder SGM' Header + group_name = f'xref_{chunk.name}' + for current_chunk in xreffile.iter_chunks(): # -- Read Chunks Until End Of File + match current_chunk.typeid: + case 'DATASSHR': self.CH_DATASSHR(xreffile) # -- DATASSHR - Texture Data + case 'DATASKEL': self.CH_DATASKEL(xreffile, xref=True) # -- FOLDMSLC - Skeleton Data + case 'FOLDMSGR': # -- Read FOLDMSLC Chunks + for current_chunk in xreffile.iter_chunks(): # -- Read FOLDMSLC Chunks + if current_chunk.typeid == 'FOLDMSLC' and current_chunk.name.lower() == mesh_name.lower(): + mesh_obj = self.CH_FOLDMSLC(xreffile, mesh_name, xref=True, group_name=group_name) + props.setup_property(mesh_obj, 'xref_source', str(mesh_path)) + else: + xreffile.skip(current_chunk.size) + if current_chunk.typeid == 'DATABVOL': + break + # case 'DATAMARK': self.CH_DATAMARK(xreffile) + case _: xreffile.skip(current_chunk.size) else: self.messages.append(('WARNING', f'Cannot find file {filename}')) mesh_parent_idx = reader.read_one(' dict: path = teamcolor.get('LocalInfo', {}).get(f'{k}_name') if path is None: continue - path = self.root / 'data/art' / f'{k}s' / f'{path}.tga' - if not path.exists(): + path = f'art/{k}s/{path}.tga' + data = self.layout.find(path) + if not data: self.messages.append(('WARNING', f'Cannot find {k} {path}')) continue + if isinstance(data, DirectoryPath): + path = data.full_path res[k] = path return res def apply_teamcolor(self, teamcolor: dict): color_node_names = {f'color_{i}' for i in self.TEAMCOLORABLE_LAYERS} + images = {} + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = pathlib.Path(tmpdir) + for key in self.TEAMCOLORABLE_IMAGES: + if (img_path := teamcolor.get(key)) is None: + continue + data_path = pathlib.Path(img_path) + if not data_path.exists(): + data_path = self.layout.find(data_path) + if not data_path: + continue + tmpfile = tmpdir / pathlib.Path(img_path).name + tmpfile.write_bytes(data_path.read_bytes()) + images[key] = image = bpy.data.images.load(str(tmpfile)) + image.pack() for mat in bpy.data.materials: if mat.node_tree is None: continue @@ -1073,9 +1088,8 @@ def apply_teamcolor(self, teamcolor: dict): continue node.color_ramp.elements[-1].color[:3] = teamcolor[key][:3] continue - if node.bl_idname == 'ShaderNodeTexImage' and node.label in self.TEAMCOLORABLE_IMAGES and teamcolor.get(node.label) is not None: - node.image = bpy.data.images.load(str(teamcolor[node.label])) - node.image.pack() + if node.bl_idname == 'ShaderNodeTexImage' and node.label in self.TEAMCOLORABLE_IMAGES and images.get(node.label) is not None: + node.image = images[node.label] def import_whm(module_root: pathlib.Path, target_path: pathlib.Path, teamcolor_path: pathlib.Path = None): diff --git a/sga.py b/sga.py new file mode 100644 index 0000000..f87fdfc --- /dev/null +++ b/sga.py @@ -0,0 +1,402 @@ +import collections +import contextlib +import dataclasses +import pathlib +import struct +import typing +import zlib + + +def read_struct(fmt: str, stream) -> tuple: + size = struct.calcsize(fmt) + buf = stream.read(size) + if len(buf) < size: + return None + return struct.unpack(fmt, buf) + + +def read_one(fmt: str, stream) -> typing.Any: + fields = read_struct(fmt, stream) + if fields is None: + return None + assert len(fields) == 1, 'Need to parse exactly 1 value' + return fields[0] + + +@dataclasses.dataclass +class Header2: + table_of_content_offset: int + data_offset: int + + @classmethod + def parse(cls, stream) -> 'Header2': + checksum1, name, checksum2, table_of_content_size, data_offset = read_struct('<16s128s16sLL', stream) + assert data_offset == table_of_content_size + 180, f'{data_offset=} {table_of_content_size=}' + return cls(180, data_offset) + + +@dataclasses.dataclass +class Toc25: + """Table of content + """ + virtual_drive_offset: int + virtual_drive_count: int + folder_offset: int + folder_count: int + file_offset: int + file_count: int + name_buffer_offset: int + name_buffer_count: int + + @classmethod + def parse(cls, stream) -> 'Toc25': + data = read_struct(' 'VirtualDrive25': + path, name, first_folder, last_folder, first_file, last_file, unk = read_struct('<64s64sHHHH2s', stream) + return cls(str(path.rstrip(b'\0'), 'ascii'), str(name.rstrip(b'\0'), 'ascii'), + first_folder, last_folder, first_file, last_file) + + +@dataclasses.dataclass +class FolderHeader25: + name_offset: int + sub_folder_start_index: int + sub_folder_end_index: int + file_start_index: int + file_end_index: int + + @classmethod + def parse(cls, stream) -> 'FolderHeader25': + data = read_struct(' 'FileHeader2': + data = read_struct('<5L', stream) + return cls(*data) + + +@dataclasses.dataclass +class IndexFolder(collections.abc.MutableMapping): + name: str + children: dict[str, 'IndexItem'] = dataclasses.field(default_factory=dict) + parent: 'IndexFolder | None' = dataclasses.field(default=None, repr=False) + + def __getitem__(self, key: str) -> 'IndexItem': + return self.children[key] + + def __setitem__(self, key: str, value: 'IndexItem'): + self.children[key] = value + value.parent = self + + def __delitem__(self, key: str): + child = self.children.get(key) + if child is not None: + child.parent = None + del self.children[key] + + def __iter__(self) -> typing.Iterator[str]: + return iter(self.children) + + def __len__(self) -> int: + return len(self.children) + + @staticmethod + def is_file(): + return False + + @staticmethod + def is_dir(): + return True + + def iterdir(self) -> typing.Iterator['IndexItem']: + yield from self.children.values() + + +@dataclasses.dataclass +class IndexFile: + name: str + data_offset: int + compressed_size: int + compression_flag: int + data_size: int + parent: 'IndexFolder | None' = dataclasses.field(default=None, repr=False) + + @staticmethod + def is_file(): + return True + + @staticmethod + def is_dir(): + return False + + +IndexItem = IndexFolder | IndexFile + + +# See https://github.com/ModernMAK/Relic-Game-Tool/wiki/SGA-Archive +class SgaArchive: + def __init__(self, index: dict, path: pathlib.Path): + self.index = index + self.path = path + self.stream = None + + @classmethod + def parse(cls, sga_path: str | pathlib.PurePath) -> 'SgaArchive': + with open(sga_path, 'rb') as f: + magic, version_major, version_minor = read_struct('<8s2H', f) + assert magic == b'_ARCHIVE' + header_cls = { + (2, 0): Header2 + }[version_major, version_minor] + header = header_cls.parse(f) + + toc_cls = { + (2, 0): Toc25, + (5, 0): Toc25, + }[version_major, version_minor] + f.seek(header.table_of_content_offset) + toc = toc_cls.parse(f) + + virtual_drive_cls = { + (2, 0): VirtualDrive25, + (5, 0): VirtualDrive25, + }[version_major, version_minor] + f.seek(header.table_of_content_offset + toc.virtual_drive_offset) + drives = [virtual_drive_cls.parse(f) for _ in range(toc.virtual_drive_count)] + + folder_cls = { + (2, 0): FolderHeader25, + (5, 0): FolderHeader25, + }[version_major, version_minor] + f.seek(header.table_of_content_offset + toc.folder_offset) + folders = [folder_cls.parse(f) for _ in range(toc.folder_count)] + + file_cls = { + (2, 0): FileHeader2, + }[version_major, version_minor] + f.seek(header.table_of_content_offset + toc.file_offset) + files = [file_cls.parse(f) for _ in range(toc.file_count)] + + max_name_offset = max(i.name_offset for s in [folders, files] for i in s) + f.seek(header.table_of_content_offset + toc.name_buffer_offset + max_name_offset + 1) + last_name_size = 1 + while True: + last_name_size += 1 + if f.read(1) in (b'\0', b''): + break + f.seek(header.table_of_content_offset + toc.name_buffer_offset) + name_buffer = f.read(max_name_offset + last_name_size) + name_buffer_splits = [idx for idx, c in enumerate(name_buffer) if c == 0] + + def find_name(start: int): + l, r = 0, len(name_buffer_splits) + if name_buffer_splits[l] == start: + return '' + while l + 1 != r: + m = (l + r) // 2 + if name_buffer_splits[m] <= start: + l = m + else: + r = m + return str(name_buffer[start: name_buffer_splits[r]], 'utf8') + + index = {} + folder_infos = [IndexFolder(name=find_name(f.name_offset)) for f in folders] + file_infos = [IndexFile( + name=find_name(f.name_offset), + data_offset=header.data_offset + f.data_offset, + compressed_size=f.compressed_size, + compression_flag=f.compression_flag, + data_size=f.decompressed_size, + ) for f in files] + for drive in drives: + drive_index = index.setdefault(drive.name, {}) + drive_folders = folders[drive.first_folder:drive.last_folder] + + def create_path(path: str) -> pathlib.PurePosixPath: + return pathlib.PurePosixPath(pathlib.PureWindowsPath(path.lower()).as_posix()) + + folder_paths = [create_path(i.name) for i in folder_infos] + for folder, info, path in zip(drive_folders, folder_infos, folder_paths): + for idx in range(folder.sub_folder_start_index, folder.sub_folder_end_index): + child_info = folder_infos[idx] + child_path = folder_paths[idx] + child_info.name = str(child_path.relative_to(path)) + info[child_info.name] = child_info + for idx in range(folder.file_start_index, folder.file_end_index): + child_info = file_infos[idx] + info[child_info.name] = child_info + for info in folder_infos: + if info.parent is None: + drive_index[info.name] = info + + return cls(index, pathlib.Path(sga_path)) + + @contextlib.contextmanager + def open(self): + if self.stream is not None: + yield self + return + + with self.path.open('rb') as f: + self.stream = f + try: + yield self + finally: + self.stream = None + + def resolve_path(self, path: str | pathlib.PurePath) -> IndexItem | None: + path_str = str(path).lower() + vdrive = None + if ':' in path_str: + vdrive, path_str = path_str.split(':', 1) + path = pathlib.PureWindowsPath(path_str) + normalized_path = pathlib.PurePosixPath(path.as_posix()) + parts = normalized_path.parts + if parts and parts[0] == '/': + parts = parts[1:] + for drive, index in self.index.items(): + index = index[''] + for part in parts: + index = index.get(part) + if index is None: + break + else: + return index + return None + + def read_file(self, file: IndexFile, retries: int = 3) -> bytes: + with self.open(): + for retry in range(retries): + try: + self.stream.seek(file.data_offset) + data = self.stream.read(file.compressed_size) + if file.compression_flag != 0: + data = zlib.decompress(data) + return data + except zlib.error: + if retry + 1 == retries: + raise + + def read_bytes(self, path: str | pathlib.PurePath) -> bytes: + file = self.resolve_path(path) + if file is None: + raise FileNotFoundError(path) + if file.is_dir(): + raise IsADirectoryError(path) + return self.read_file(file) + + def make_path(self, path: str | pathlib.PurePath = '') -> 'SgaPath': + return SgaPath(self, path) + + +@dataclasses.dataclass(repr=False) +class SgaPath: + SENTINEL = object() + + sga: SgaArchive = dataclasses.field(repr=False) + impl: pathlib.PurePath + _cached_item: IndexItem = dataclasses.field(default=SENTINEL, init=False) + + def __post_init__(self): + self.impl = pathlib.PurePath(self.impl) + + @property + def _sga_item(self): + if self._cached_item is SgaPath.SENTINEL: + self._cached_item = self.sga.resolve_path(self.impl) + return self._cached_item + + def read_bytes(self) -> bytes: + item = self._sga_item + if item is None: + raise FileNotFoundError(self.impl) + if item.is_dir(): + raise IsADirectoryError(self.impl) + return self.sga.read_file(item) + + def read_text(self, encoding: str = 'utf-8', errors: str = 'strict') -> str: + data = self.read_bytes() + return str(data, encoding=encoding, errors=errors) + + def exists(self) -> None: + return self._sga_item is not None + + def is_file(self) -> bool: + item = self._sga_item + return item is not None and item.is_file() + + def is_dir(self) -> bool: + item = self._sga_item + return item is not None and item.is_dir() + + def iterdir(self) -> 'typing.Iterator[SgaPath]': + item = self._sga_item + if item is None: + raise FileNotFoundError(self.impl) + if not item.is_dir(): + raise NotADirectoryError(self.impl) + for key, child_data in item.children.items(): + child = SgaPath(self.sga, self.impl / key) + child._cached_item = child_data + yield child + + def __str__(self) -> str: + return str(self.impl) + + def __repr__(self): + return f'''SgaPath("{self.impl}", "{self.sga.path}")''' + + def __hash__(self): + return hash(self.impl) + + def __lt__(self, other: 'SgaPath') -> bool: + return self.impl < other.impl + + def __truediv__(self, other) -> 'SgaPath': + return SgaPath(self.sga, self.impl / other) + + def __rtruediv__(self, other) -> 'SgaPath': + return SgaPath(self.sga, other / self.impl) + + @property + def name(self) -> str: + return self.impl.name + + @property + def suffix(self) -> str: + return self.impl.suffix + + @property + def stem(self) -> str: + return self.impl.stem + + @property + def data_size(self): + return self._sga_item.data_size + + def layout_path(self)-> pathlib.PurePosixPath: + return self.impl \ No newline at end of file