Skip to content

Commit

Permalink
Collapse branches, Universal tag support (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
exdysa authored Jan 6, 2025
2 parents 5fc0628 + 1c1f277 commit 2e742f8
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 216 deletions.
56 changes: 28 additions & 28 deletions dataset_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,43 @@
""" 初始化"""
# pylint: disable=line-too-long

# 初始化

from importlib.metadata import version, PackageNotFoundError
from re import I # setuptools-scm versioning
try:
__version__ = version("dataset-tools")
except PackageNotFoundError:
# package is not installed
pass

# from re import I # setuptools-scm versioning
import os
import sys
if "pytest" not in sys.modules:
import argparse
from typing import Literal
from importlib.metadata import version, PackageNotFoundError
import logging
import argparse
from typing import Literal

levels = {"d": "DEBUG", "w": "WARNING", "e": "ERROR", "c": "CRITICAL", "i": "INFO"}
from rich.logging import RichHandler
from rich.console import Console

if "pytest" not in sys.modules:
parser = argparse.ArgumentParser(description="Set logging level.")
group = parser.add_mutually_exclusive_group()

choices = list(levels.keys()) + [v for v in levels.values()] + [v.upper() for v in levels.values()]

levels = {"d": "DEBUG", "w": "WARNING", "e": "ERROR", "c": "CRITICAL", "i": "INFO"}
choices = list(levels.keys()) + list(levels.values()) + [v.upper() for v in levels.values()]
for short, long in levels.items():
group.add_argument(f'-{short}', f'--{long.lower()}', f'--{long}',
action='store_true', help=f"Set logging level {long}")

group.add_argument('--log-level', default='i', type=str,
choices=choices, help=f"Set the logging level ({choices})")
choices=choices, help=f"Set the logging level ({choices})")

args = parser.parse_args()

# Resolve log_level from args dynamically
log_level = levels[next(iter([k for k,v in levels.items() if getattr(args, v.lower(), False)]), args.log_level)]
LOG_LEVEL = levels[next(iter([k for k,v in levels.items() if getattr(args, v.lower(), False)]), args.log_level)]
else:
log_level = "DEBUG"
EXC_INFO: bool = log_level != "i"
LOG_LEVEL = "DEBUG"

import logging
from logging import Logger
import sys
EXC_INFO: bool = LOG_LEVEL != "i"

#begin routine
msg_init = None # pylint: disable=invalid-name

msg_init = None
from rich.logging import RichHandler
from rich.console import Console
from rich.logging import RichHandler
handler = RichHandler(console=Console(stderr=True))

if handler is None:
Expand All @@ -54,12 +49,17 @@
datefmt="%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(formatter)
logging.root.setLevel(log_level)
logging.root.setLevel(LOG_LEVEL)
logging.root.addHandler(handler)

if msg_init is not None:
logger = logging.getLogger(__name__)
logger.info(msg_init)

log_level = getattr(logging, log_level)
log_level = getattr(logging, LOG_LEVEL)
logger = logging.getLogger(__name__)

try:
__version__ = version("dataset-tools")
except PackageNotFoundError:
logger.info("dataset-tools package is not installed. Did you run `pip install .`?", exc_info=EXC_INFO)
11 changes: 9 additions & 2 deletions dataset_tools/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
"""啟動程式,退出程式"""

import sys

from PyQt6 import QtWidgets # ignore

from dataset_tools import logger
from dataset_tools.ui import MainWindow # Import our main window class

def main():
from PyQt6.QtWidgets import QApplication
app = QApplication(sys.argv)
"""Launch application"""
logger.info("%s","Launching application...")

app = QtWidgets.QApplication(sys.argv) # pylint: disable=c-extension-no-member
window = MainWindow() # Initialize our main window.
window.show()
sys.exit(app.exec())
Expand Down
142 changes: 101 additions & 41 deletions dataset_tools/metadata_parser.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
"""為使用者介面清理和安排元資料"""
# pylint: disable=line-too-long

import re
import json
from collections import defaultdict
from dataset_tools import logger
from PIL import Image


from PIL import Image
from PIL.ExifTags import TAGS
from dataset_tools import logger

def open_jpg_header(file_path_named: str) -> dict:
"""
Open jpg format files\n
:param file_path_named: `str` The path and file name of the jpg file
:return: `Generator[bytes]` Generator element containing header tags
"""
from PIL.ExifTags import TAGS
pil_img = Image.open(file_path_named)
exif_info = pil_img._getexif()
exif = {TAGS.get(k, k): v for k, v in exif_info.items()}
pil_image = Image.open(file_path_named)
info = pil_image.info
if info is None:
return None
exif = {TAGS.get(k, k): v for k, v in info.items()}
return exif


def open_png_header(file_path_named: str) -> dict:
"""
Open png format files\n
:param file_path_named: `str` The path and file name of the png file
:return: `Generator[bytes]` Generator element containing header bytes
"""
pil_img = Image.open(file_path_named)
if pil_img == None:
pil_img.load()
if pil_img is None: # We dont need to load completely unless totally necessary
pil_img.load() # This is the case when we have no choice but to load (slower)
text_chunks = pil_img.info
logger.debug(text_chunks)
logger.debug("%s",f"{text_chunks}")
return text_chunks


Expand All @@ -47,10 +48,9 @@ def format_chunk(text_chunks: dict) -> dict:
clean_segments = [seg.replace(buffer, '') for seg in segmented_string]
cleaned_text = ' '.join(map(str,clean_segments)).replace('\n',',')

logger.debug(f"{cleaned_text}")
logger.debug("%s",f"{cleaned_text}")
return cleaned_text


def extract_enclosed_values(cleaned_text: str) -> tuple[str, list]:
"""
Split string by pre-delineated tag information\n
Expand All @@ -63,15 +63,28 @@ def extract_enclosed_values(cleaned_text: str) -> tuple[str, list]:
structured_dict = {}
for item in prestructured_data:
# Only keep non-empty groups
if item[1]: structured_dict[item[0]] = item[1]
elif item[3]: structured_dict[item[2]] = item[3]
else: structured_dict['Hashes'] = item[4]
if item[1]:
structured_dict[item[0]] = item[1]
elif item[3]:
structured_dict[item[2]] = item[3]
else:
structured_dict['Hashes'] = item[4]

dehashed_text = re.sub(pattern, ',', cleaned_text).strip()
logger.debug(f"{dehashed_text}")
logger.debug(f"{structured_dict}")
logger.debug("%s",f"{dehashed_text}")
logger.debug("%s",f"{structured_dict}")
return dehashed_text, structured_dict

def clean_with_json(prestructured_data:dict, key_name: str) -> dict:
"""
Use json loads to arrange half-formatted dictinto valid dict\n
:param prestructured_data: `dict` A dict with a single working key
:param key_name: `str` The single working key name
:return: `dict` A formatted dictionary object
"""
cleaned_data = json.loads(prestructured_data[key_name])
return cleaned_data

def structured_metadata_list_to_dict(prestructured_data: list) -> dict:
"""
Convert delineated metadata into a dictionary\n
Expand All @@ -80,14 +93,15 @@ def structured_metadata_list_to_dict(prestructured_data: list) -> dict:
"""
system_metadata = {}
for key in prestructured_data:
logger.debug(key)
if key == 'Hashes':
system_metadata[key] = json.loads(prestructured_data[key])
system_metadata[key] = clean_with_json(prestructured_data, key)
elif ': "' in key and ':' in key: # Handle TI hashes, split by colon and quote
ti_hash_key = re.sub(r': .*$', '', key).strip()
system_metadata[ti_hash_key] = prestructured_data[key]
else: # Hardware Info, strip quotes"""
system_metadata[key.split(' ', 1)[0]] = prestructured_data[key].strip('"')
logger.debug(f"{system_metadata}")
logger.debug("%s", f"{system_metadata}")
return system_metadata


Expand All @@ -104,45 +118,91 @@ def dehashed_metadata_str_to_dict(dehashed_text: str) -> dict:

# Rest of dehashed as a dict:
dehashed_pairs = [p for p in dehashed_text.split(',') if ': ' in p and p.strip()]

neg_side = dehashed_pairs[0].split('Steps:')
match = re.search(r'Negative prompt:\s*(.*?)Steps', dehashed_pairs[0])
negative = match.group(1) if match else None
logger.debug(negative)
logger.debug(neg_side)
logger.debug("%s",f"{negative}")
logger.debug("%s",f"{neg_side}")

logger.debug(negative)
logger.debug(f"{dehashed_pairs}")
logger.debug("%s",f"{negative}")
logger.debug("%s",f"{dehashed_pairs}")

generation_metadata = {k: v for k, v in (pair.split(': ', 1) for pair in dehashed_pairs)}
generation_metadata = dict((pair.split(': ', 1) for pair in dehashed_pairs))
logger.debug(generation_metadata)

positive = next(iter(pos_key_val)) # Sample positive prompt
positive_prompt = pos_key_val[positive] # Separate prompts
positive = next(iter(pos_key_val), "") # Sample positive prompt
positive_prompt = pos_key_val.get(positive,None) # Separate prompts

prompt_metadata = {"Positive prompt" : positive_prompt } | generation_metadata
logger.debug(f"{prompt_metadata}")
logger.debug("%s",f"{prompt_metadata}")
return prompt_metadata, generation_metadata

def arrange_webui_metadata(header_chunks:str) -> dict:
"""
Using the header from a file, send to multiple formatting, cleaning, and parsing, processes \n
Return format : {"Prompts": , "Settings": , "System": } \n
:param header_chunks: `str` Header data from a file
:return: `dict` Metadata in a standardized format
"""
cleaned_text = format_chunk(header_chunks)
dehashed_text, structured_dict = extract_enclosed_values(cleaned_text)
system_metadata = structured_metadata_list_to_dict(structured_dict)
prompt_metadata, generation_metadata = dehashed_metadata_str_to_dict(dehashed_text)
logger.debug("%s",f"{prompt_metadata, generation_metadata, system_metadata}")
logger.debug("%s",f"{type(prompt_metadata), type(generation_metadata), type(system_metadata)}")
return {"Prompts": prompt_metadata, "Settings": generation_metadata, "System": system_metadata}

def arrange_nodeui_metadata(header_chunks:str) ->dict:
"""
Using the header from a file, run formatting and parsing processes \n
Return format : {"Prompts": , "Settings": , "System": } \n
:param header_chunks: `str` Header data from a file
:return: `dict` Metadata in a standardized format
"""
test_metadata = clean_with_json(header_chunks, 'prompt')
search_keys = ["text"]
prompt_data = {}
between_items= {}
misc_items = {}

logger.debug(test_metadata)
#or any(x in value.get('inputs') for x in search_keys):
for i, key in enumerate(test_metadata):
logger.debug(i)
value = test_metadata[key]
logger.debug(value)
key_name = value.get('class_type')
if "CLIPTextEncode" in key_name:
existing = prompt_data.get(key_name,{})
logger.debug(existing)
prompt_data.setdefault(key_name, existing | test_metadata[key].get('inputs'))
elif i <= 2:
between_items[key_name] = value['inputs']
else:
misc_items[key_name] = value['inputs']
prompt_items = {"Prompt": prompt_data }

return {'Prompts': prompt_items, 'Settings': between_items, "System": misc_items}


def parse_metadata(file_path_named: str) -> dict:
"""
Extract the metadata from the header of an image file\n
:param file_path_named: `str` The file to open
:return: `dict` The metadata from the header of the file
"""
header_chunks = open_png_header(file_path_named)
if next(iter(header_chunks)) == 'parameters':
logger.debug(next(iter(header_chunks)))
cleaned_text = format_chunk(header_chunks)
dehashed_text, structured_dict = extract_enclosed_values(cleaned_text)
system_metadata = structured_metadata_list_to_dict(structured_dict)
prompt_metadata, generation_metadata = dehashed_metadata_str_to_dict(dehashed_text)
logger.debug(f"{prompt_metadata | generation_metadata | system_metadata}")
elif next(iter(header_chunks)) == 'prompt':
"""Placeholder"""
metadata = {"Prompts": prompt_metadata, "Settings": generation_metadata, "System": system_metadata}
return metadata
logger.debug("%s",f"{next(iter(header_chunks))}")
metadata = None

if next(iter(header_chunks)) == 'parameters': # A1111 format
metadata = arrange_webui_metadata(header_chunks)
elif next(iter(header_chunks)) == 'prompt': # ComfyUI format
metadata = arrange_nodeui_metadata(header_chunks)

return metadata

# hash_sample = re.search(r', cleaned_text)
# hash_sample_structure = eval(hash_sample.group(1)) # Return result 1 if found else 0
#dehashed_text = re.sub(r'Hashes: \{.*?\}', '', cleaned_text).strip()
#dehashed_text = re.sub(r'Hashes: \{.*?\}', '', cleaned_text).strip()
Loading

0 comments on commit 2e742f8

Please sign in to comment.