From 81a567bf7935e445e1cc3243f0c384224a6baf0f Mon Sep 17 00:00:00 2001 From: amraboelnaga Date: Mon, 8 Jul 2024 13:54:09 -0400 Subject: [PATCH 1/2] refactoring for modularization --- sip-validator.py | 426 +++++++++++++++++++++++++---------------------- 1 file changed, 230 insertions(+), 196 deletions(-) diff --git a/sip-validator.py b/sip-validator.py index 49fb5fa..3100fec 100644 --- a/sip-validator.py +++ b/sip-validator.py @@ -2,203 +2,237 @@ import csv import re from datetime import datetime -from collections import OrderedDict - -def validate_date_format(date_text): - try: - datetime.strptime(date_text, '%Y/%m/%d') - return True - except ValueError: - return False - -def validate_special_characters(string): - return bool(re.match(r'^[a-zA-Z0-9@#$%&*/!\']+$', string.lower())) - -def check_directory_structure(root_path): - required_folders = ['Data', 'Manifest', 'Metadata'] - errors = OrderedDict() - try: - existing_folders = os.listdir(root_path) - except Exception as e: - errors[f"Error accessing directory {root_path}: {str(e)}"] = None - return errors - - extra_folders = [folder for folder in existing_folders if folder not in required_folders and os.path.isdir(os.path.join(root_path, folder))] - for folder in required_folders: - matched_folders = [f for f in existing_folders if re.match(rf'{folder}[^a-zA-Z0-9]*$', f, re.IGNORECASE)] - if not matched_folders: - errors[f"Missing required folder: {folder}"] = None - else: - for matched_folder in matched_folders: - if matched_folder != folder: - errors[f"Folder name should be {folder} but found {matched_folder}"] = None - elif not validate_special_characters(matched_folder): - errors[f"Folder name contains special characters: {matched_folder}"] = None - if extra_folders: - errors[f"Extra folders found: {', '.join(extra_folders)}"] = None - if not any(fname.startswith('README') and fname.split('.')[-1] in ['txt', 'md'] for fname in existing_folders): - errors["Missing README file with .txt or .md extension"] = None - if not errors: - errors["Directory structure is valid."] = None - return errors - -def check_files(root_path): - manifest_path = os.path.join(root_path, 'Manifest') - metadata_path = os.path.join(root_path, 'Metadata') - - errors = OrderedDict() - # Checking manifest - if not os.path.exists(manifest_path): - errors["Missing required folder: Manifest"] = None - else: - try: - manifest_files = os.listdir(manifest_path) - except Exception as e: - errors[f"Error accessing Manifest folder: {str(e)}"] = None - return errors - if 'checksumsha1.csv' not in manifest_files: - errors["Missing required file: checksumsha1.csv in Manifest folder"] = None - extra_files = [file for file in manifest_files if file != 'checksumsha1.csv'] - if extra_files: - errors[f"Extra files found in Manifest folder: {', '.join(extra_files)}"] = None - - # Checking metadata - if not os.path.exists(metadata_path): - errors["Missing required folder: Metadata"] = None - else: +from urllib.parse import urlparse +import time +from PIL import Image + +class SIPValidator: + def __init__(self): + self.errors = [] + self.non_unique_rows = [] + + ## make dynamic for these variables + self.required_collection_fields = ['identifier', 'title', 'description', 'visibility', 'rights_holder', 'rights'] + self.required_item_fields = ['identifier', 'title', 'description', 'display_date', 'start_date', 'end_date', + 'rights_holder', 'rights', 'tags','type', 'language','visibility'] + + def has_special_characters(self, text): + special_characters = r'[@#$%&*/!]' + + if text is None: + return False + + parsed_url = urlparse(text) + if parsed_url.scheme and parsed_url.netloc: + return False + + return bool(re.search(special_characters, text)) + + def check_naming_convention(self, name, folder): + if self.has_special_characters(name): + self.errors.append(f"Error: Special characters found in '{name}' of folder '{folder}'") + + def check_special_characters_in_fields(self, row, row_number, identifier, file_name, fields): + field_errors = [] + for field in fields: + value = row.get(field) + if self.has_special_characters(value): + field_errors.append(f"Error: Special characters found in '{file_name}', Row {row_number}, Identifier {identifier}, Field '{field}'") + return field_errors + + def validate_date_format(self, date_string, row_number, identifier, file_name): try: - metadata_files = os.listdir(metadata_path) - except Exception as e: - errors[f"Error accessing Metadata folder: {str(e)}"] = None - return errors - pattern_collection = re.compile(r'.*collection_metadata\.csv') - pattern_item = re.compile(r'.*item_metadata\.csv') - collection_found = any(pattern_collection.match(file.lower()) for file in metadata_files) - item_found = any(pattern_item.match(file.lower()) for file in metadata_files) - - if not collection_found: - errors["Missing required *collection_metadata.csv file in Metadata folder"] = None - if not item_found: - errors["Missing required *item_metadata.csv file in Metadata folder"] = None - required_files = {'collection_metadata': collection_found, 'item_metadata': item_found} - for file_type, found in required_files.items(): - if found: - extra_files = [file for file in metadata_files if not pattern_collection.match(file.lower()) and not pattern_item.match(file.lower())] - if extra_files: - errors[f"Extra files found in Metadata folder: {', '.join(extra_files)}"] = None - - if not errors: - errors["All required files inside the folders are present."] = None - return errors - -def read_csv_file(file_path): - try: - with open(file_path, mode='r', newline='', encoding='utf-8') as file: - reader = csv.DictReader((line.lower() for line in file)) - return list(reader) - except UnicodeDecodeError: + datetime.strptime(date_string, '%Y-%m-%d') + except ValueError: + self.errors.append(f"Error: Incorrect date format in '{file_name}', Row {row_number}") + + def validate_csv_file(self, file_path, required_fields): try: - with open(file_path, mode='r', newline='', encoding='latin1') as file: - reader = csv.DictReader((line.lower() for line in file)) - return list(reader) + with open(file_path, 'r', encoding='utf-8-sig', newline='') as csv_file: + reader = csv.DictReader(csv_file) + fields = reader.fieldnames + + if sorted(required_fields) != sorted(fields): + incorrect_missing_fields = [field for field in fields if field not in required_fields] + + for field in required_fields: + if field not in fields: + self.errors.append(f"Error: Missing required field '{field}' in '{os.path.basename(file_path)}'") + + identifiers = set() + for row_number, row in enumerate(reader, start=2): + identifier = row.get('identifier') + if identifier in identifiers: + self.non_unique_rows.append(row) + else: + identifiers.add(identifier) + + start_date = row.get('start_date') + if start_date: + self.validate_date_format(start_date, row_number, identifier, os.path.basename(file_path)) + + field_errors = self.check_special_characters_in_fields(row, row_number, identifier, os.path.basename(file_path), fields) + self.errors.extend(field_errors) except Exception as e: - return str(e) - except Exception as e: - return str(e) - -def validate_metadata_files(root_path): - metadata_path = os.path.join(root_path, 'Metadata') - receipt = OrderedDict() - required_fields = ['identifier', 'title', 'description', 'visibility', 'rights_holder'] - - if not os.path.exists(metadata_path): - receipt["Missing required folder: Metadata"] = None - return receipt - - # Scan for all metadata files that match the patterns - for file_name in os.listdir(metadata_path): - if re.search(r'(collection_metadata\.csv|item_metadata\.csv)$', file_name.lower()): - file_path = os.path.join(metadata_path, file_name) - rows = read_csv_file(file_path) - if isinstance(rows, str): # An error message was returned - receipt[f"Validation error in {file_name}: {rows}"] = None - continue - if not rows: - receipt[f"Validation error in {file_name}: Could not read file or file is empty"] = None - continue - - fieldnames_lower = [field.strip() for field in rows[0].keys()] - - # Check field names once - missing_field = False - for field in required_fields: - matched_fields = [f for f in fieldnames_lower if re.match(rf'{field}[^a-zA-Z0-9]*$', f, re.IGNORECASE)] - if not matched_fields: - receipt[f"Validation error in {file_name}: Missing required column {field}"] = None - missing_field = True + self.errors.append(f"Error reading CSV file '{os.path.basename(file_path)}': {str(e)}") + + return self.errors, self.non_unique_rows + + def validate_folder_structure(self, sip_root_path, expected_structure): + for folder, items in expected_structure.items(): + folder_path = os.path.join(sip_root_path, folder) + self.check_naming_convention(folder, "") + + if not os.path.exists(folder_path): + self.errors.append(f"Error: Missing folder '{folder}'") + + for item in items: + if item.lower().endswith('.csv'): + item_regex = re.compile(r".*?" + re.escape(item) + r"$", re.IGNORECASE) + csv_files = [file for file in os.listdir(folder_path) if item_regex.match(file)] + for csv_file in csv_files: + item_path = os.path.join(folder_path, csv_file) + self.validate_csv_file(item_path, self.required_collection_fields if csv_file.lower() == 'collection_metadata.csv' else self.required_item_fields) else: - for matched_field in matched_fields: - if matched_field != field: - receipt[f"Validation error in {file_name}: Field name should be {field} but found {matched_field}"] = None - missing_field = True - elif field != 'rights_holder' and not validate_special_characters(matched_field): - receipt[f"Validation error in {file_name}: Field name contains special characters: {matched_field}"] = None - missing_field = True - - # If any required field is missing or incorrect, skip row validation - if missing_field: - continue - - # Check each row for content - for row in rows: - identifier = row.get('identifier', 'unknown').strip() - - # Validate identifier - if not validate_special_characters(identifier): - receipt[f"Validation error in {file_name} (identifier {identifier}): Invalid identifier."] = None - - # Validate date format - date = row.get('date', '').strip() - if date and not validate_date_format(date): - receipt[f"Validation error in {file_name} (identifier {identifier}): Invalid date format."] = None - - # Required fields check - for field in required_fields: - if field not in row or not row[field].strip(): - receipt[f"Validation error in {file_name} (identifier {identifier}): Missing or invalid {field}."] = None - - return receipt - -def write_validation_receipt(receipt, root_path): - receipt_path = os.path.join(root_path, 'validation_receipt.txt') - with open(receipt_path, 'w') as file: - for line in receipt: - file.write(line + '\n') - if any("Missing required folder" in line or "Validation error" in line or "Folder name should be" in line for line in receipt): - file.write("Validation failed.\n") - -def main(): - root_path = input("Enter the path to the SIP directory: ") - - issues = OrderedDict() - - # Validate directory structure - issues.update(check_directory_structure(root_path)) - - # Check files presence - issues.update(check_files(root_path)) - - # Validate metadata - validation_receipt = validate_metadata_files(root_path) - issues.update(validation_receipt) - - if issues: - for issue in issues: - print(issue) - write_validation_receipt(issues, root_path) - print(f"Validation receipt has been written to {os.path.join(root_path, 'validation_receipt.txt')}") - else: - print("All checks passed. No issues found.") - -if __name__ == "__main__": - main() + item_path = os.path.join(folder_path, item) + self.check_naming_convention(item, folder) + + if not os.path.exists(item_path): + self.errors.append(f"Error: Missing item '{item}' in folder '{folder}'") + + return self.errors + + def validate_metadata_files(self, metadata_folder): + collection_metadata_files = [file for file in os.listdir(metadata_folder) if file.lower().endswith('_collection_metadata.csv')] + if not collection_metadata_files: + self.errors.append(f"Error: Missing collection metadata file with the expected naming convention in the Metadata folder") + else: + for collection_metadata_file in collection_metadata_files: + self.validate_csv_file(os.path.join(metadata_folder, collection_metadata_file), self.required_collection_fields) + + item_metadata_files = [file for file in os.listdir(metadata_folder) if file.lower().endswith('_item_metadata.csv')] + if not item_metadata_files: + self.errors.append(f"Error: Missing item metadata file with the expected naming convention in the Metadata folder") + else: + for item_metadata_file in item_metadata_files: + self.validate_csv_file(os.path.join(metadata_folder, item_metadata_file), self.required_item_fields) + + return self.errors + + def validate_digital_objects(self, digital_objects_folder, current_metadata_level): + + #make dynamic + acceptable_formats = ['TIFF', 'PDF', 'JPG', 'JPEG', 'PNG', 'GIF', 'WAVE', 'WAV', 'MP3', 'MOV', 'MKV', 'MP4', + 'AVI', 'CSV', 'XML', 'XLSX', 'TXT', 'HTML', 'SGML', 'RTF', 'X3D', 'GLB', 'STL', 'CGM', + 'PDF/A', 'TIFF', 'SHP', 'SVG','PPTX', 'MBOX' , 'TIF'] + + file_formats = set() + for root, _, files in os.walk(digital_objects_folder): + for file in files: + _, file_extension = os.path.splitext(file) + file_format = file_extension.upper()[1:] + file_formats.add(file_format) + + if not file_formats: + self.errors.append(f"Images in Data folder do not have the acceptable file formats.") + + return self.errors + + def validate_special_characters_in_files(self, sip_root_path): + for root, dirs, files in os.walk(sip_root_path): + for dir in dirs: + dir_path = os.path.join(root, dir) + self.check_naming_convention(dir, os.path.relpath(dir_path, sip_root_path)) + + for file in files: + file_path = os.path.join(root, file) + if self.has_special_characters(file): + self.errors.append(f"Error: Special characters found in file name '{file_path}'") + + def validate_sip_structure(self, sip_root_path, expected_structure): + current_metadata_level = None + collection_metadata_name = None + existing_collection_metadata = False # Flag to track if collection_metadata.csv existed before + for error in self.errors: + if "Bronze" in error: + current_metadata_level = "Bronze" + elif "Silver" in error: + current_metadata_level = "Silver" + elif "Gold" in error: + current_metadata_level = "Gold" + + folder_structure_errors = self.validate_folder_structure(sip_root_path, expected_structure) + self.errors.extend(folder_structure_errors) + + metadata_folder = os.path.join(sip_root_path, 'Metadata') + metadata_errors = self.validate_metadata_files(metadata_folder) + self.errors.extend(metadata_errors) + + digital_objects_folder = os.path.join(sip_root_path, 'Data') + digital_objects_errors = self.validate_digital_objects(digital_objects_folder, current_metadata_level) + self.errors.extend(digital_objects_errors) + + self.validate_special_characters_in_files(sip_root_path) + + if 'collection_metadata.csv' in expected_structure.get('Metadata', []): + collection_metadata_name = 'collection_metadata.csv' + metadata_files = [file.lower() for file in os.listdir(metadata_folder)] + if any(file.endswith('collection_metadata.csv') for file in metadata_files): + existing_collection_metadata = True # Marking existing collection_metadata.csv + else: + supporting_info_folder = os.path.join(sip_root_path, 'Supporting Information') + if not os.path.exists(supporting_info_folder): + self.errors.append("Error: Missing Supporting Information folder") + + associated_image_found = False + for file in os.listdir(supporting_info_folder): + if file.lower().endswith(('.jpg', '.jpeg', '.png', '.tiff')): + associated_image_found = True + break + + if not associated_image_found: + self.errors.append(f"Error: No associated image found in Supporting Information folder for the '{collection_metadata_name}'") + + return self.errors, collection_metadata_name, existing_collection_metadata + + def generate_receipt(self, sip_root_path, collection_metadata_name, existing_collection_metadata): + timestamp = datetime.now().strftime('%Y%m%d%H%M%S') + receipt_path = os.path.join(sip_root_path, f'validation_receipt_{timestamp}.txt') + + for file in os.listdir(sip_root_path): + if file.startswith('validation_receipt_') and file.endswith('.txt'): + os.remove(os.path.join(sip_root_path, file)) + + with open(receipt_path, 'w', encoding='utf-8') as receipt_file: + error_set = set(self.errors) + self.errors = list(error_set) + for error in self.errors: + receipt_file.write(f"{error}\n") + + if collection_metadata_name and existing_collection_metadata: + receipt_file.write(f"collection_metadata.csv existed before.\n") + + receipt_file.write("\nValidation completed successfully.") + + return receipt_path + +# # Example usage +# if __name__ == "__main__": +# validator = SIPValidator() +# sip_root_path = 'path_to_sip_root_directory' +# expected_structure = { +# 'Metadata': ['collection_metadata.csv', 'item_metadata.csv'], +# 'Data': ['data_item_1.jpg', 'data_item_2.pdf'], +# 'Supporting Information': ['supporting_doc_1.pdf'] +# } + +# errors, collection_metadata_name, existing_collection_metadata = validator.validate_sip_structure(sip_root_path, expected_structure) +# if errors: +# print("Errors found during validation:") +# for error in errors: +# print(error) +# else: +# print("No errors found.") + +# receipt_path = validator.generate_receipt(sip_root_path, collection_metadata_name, existing_collection_metadata) +# print(f"Validation receipt generated at: {receipt_path}") From 06b3dc390a7093c7ec4b6e8bff15c4ae63e7e4cf Mon Sep 17 00:00:00 2001 From: amraboelnaga Date: Thu, 18 Jul 2024 23:57:08 -0400 Subject: [PATCH 2/2] fixing project structure --- __init__.py | 0 __pycache__/__init__.cpython-310.pyc | Bin 0 -> 162 bytes __pycache__/sip_validator.cpython-310.pyc | Bin 0 -> 8632 bytes sip-validator.py | 238 ---------------------- sip_validator.py | 207 +++++++++++++++++++ 5 files changed, 207 insertions(+), 238 deletions(-) create mode 100644 __init__.py create mode 100644 __pycache__/__init__.cpython-310.pyc create mode 100644 __pycache__/sip_validator.cpython-310.pyc delete mode 100644 sip-validator.py create mode 100644 sip_validator.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/__pycache__/__init__.cpython-310.pyc b/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..aae84e23e79bf3236d1a2a5efb4b24bf4b4ee2b0 GIT binary patch literal 162 zcmd1j<>g`k0^hqc(?IlN5P=LBfgA@QE@lA|DGb33nv8xc8Hzx{2;!HkerR!OQL%nv zZc$=Verir$VtS&!OKNd;Nq&KTR%%{eW?E{IesO+jQF5w&ab`h$Sz=CRN@7WVk$!x9 fW?p7Ve7s&k7)TcJAu#nd$lCpD6x`wp(jS)@ZGv?M*f^mutwdB*WQ=&YHFqk1bC|(?xNp znI5vLDT<9ATMHQw4Q#O4OMs0bV0aX;KoBHA&N&ANkZb1@K zSvVx7`lnu1z4z+}>)Bc?vhW{))T*gnnK;vi*YiqUW%4$qK z>or|njhcb0-ZndC&16Jlv@@M-Ez7j`HOF)^Uu#b0pECLPb)sbu+Ck#waUeU>~4uBUr4Q* zeyVQlgjT)Ya$8=#p2y6RmdY}VjWGY@u;kT*s>wi+{Lww}rBnc8*ZjPv^`Qob?deQ| zmFX+6GEd(*kyw{wtZ3U>#a#BSv!5=U?JS&iKE75l1Zgs~q$f7W3c><(zQwPSJ}Ru1 zC+nzt{Jy8;kD4Nln2hzw>xyKxD>@Bt1U<5ZM%PO$%jlK$9tsxps58FLOVv(LEb^rX)!Wt9`?Ki=@M|vB!Gsrskf;zL`5!ruDFs{z->uGmfG_Y%siR z01d#q%_Z#fr-)8H{Y%8HmH2L(ZEKI%BYjm@k;)3#6}V;3*v0kMrEj!H+A4Fhh+#>O z8JP0t@hZ7pjpD|Vw)vC51pO<$Opw{t?iz>+5g!&&IQbwGaj?0vtM_w3W{*h`QxsGm z&9Q=64DpNjJsiJp1&}gY!OMe3rh-7s$wMZ89n+33snz7S@$=Xcq$3Hj^W)3mnBbdR zE$~!d?Y0rJq(7%D)^0SLTuN-O?V!7f$gyKT*B9;7X3$QO^PHf~cmvHHM2lOKJr=NC z?H&`lr>$d4R13AOn*p3^5AOr^Ykd=rrEI5?3H7cFO^PC6rX{gpXs&l#u8JBJJ=Cjj zgxQF{iY~~81bO<94;7Z$E$KD8osD;XK)euRZ)pBv%EX#$ym_}i5T_3F-D-MUf>)Q? z-DaaL-@#2X%^^8+n3Cjtb`Fh_bF{}ORH$vlRtI=m*Y^+y66u~Wf8?_-2A*~6!kH(A z7{_oifhH_`@c#0LA6ULp#zaY6Axyo}}ye$Pm#bDBSUy%lXt zvUPT(nnl~6x~4YKb{idT%eC%S+p%wR`+3p6z2$*y%XMtOGE9Up?KN7iv}3ok_k9)f zoxF;0)rjYp(FBVygVzt6feGe*gr9s1%`XOK(1)e%vF>ER`V0{jNp4o#g_;~=R=cO& zK(^h4cfl4jwd_Y%-@S7EYRIZjzr$Z`T=0g|;dO*e?#fozmCHyc1?e?}C%0QE)ZU62B6nN^bg2|H*Y`K3GD<@YzlI}5}lajQRog}nX$wvc2Vznbhr#J9cLYIB?T zva8;G&+ZDl+Hvdw_x?*)297kaJi8@r6)vllf|5hbfU_`1W(Y%74iM(A-fwc`l&&kt z2txzI#Yx( zsa!RCtPjnB75QT{1*WqxB;R@btOxpwNz-Bz%*4;qEjGgn=+z(SzcVvaYzj|V(3#)) z=f2-Yq}-D7;~*DYf_p^ahB5$UZ*orxKk8Y>a}-b963wxdj|5x zOUN6!oy*7@z^wjwOlf*nV5)q9h#cn&^Gh?%7r3K*fwl5Z2B676`1?2>nN8jz-sc`; zMHR0tCy&fwsGNe_Al75*f>Z44ln3JIPBNGQ!lt|zQF{MQF~E`J}M zy+XeToXft^)ZHlVm{(HSZOj=5k6GdonJKEM>PT zk~Xm9zaHwJ-tQkC6XXU@pa`s%-pdF1etA!$9QwK5EA)y%@h_Ped4$}I%sivCs5qny zOFi*gvuQnU{x@buznqKv=Y}%jspXn=y+N2$dZBfldktW*hJQM7@k60DA@)xnOqWT| zAS#Dbop|PYGEJRgg+G0GcDzQ|Jz=+8`@lRY!>3G;!B6E~$`2|Z6{EZhIHb)I&=}+) z z-=o}t{tqcP?inc8pQ~PIcH3(=)<8B*y*5PgK-vnc7zYyJkL|!sVOxH({!YAW&KVrzue z`I#+m_2MNe$#l974|6WxZns<>THI}R9e~D^hA9E4LpH9@2$ibPn4=ui*Y*~3&3G_Y)~K-IdBzy!znlMr|~@rm1lYWNjStx87w zZcf!N;wV_^F+n89?GdOCT}sOgewS7IFoyNe|%2^Or=?hnB|{Svz!cy3K(YqjA_KQGZT*}1tswmqb7n9 zMm;@Z)a+v#MH-p^2FK{2aF5CV2@1Qwd$cCNZSIIsb4u1zk8>y_{paT2LEG5l%KWmR zP+prEB!;1U!rP7%m(cB3&kv=QghIiq;4rqTobe}bMAc!SF&mzMQ!pTdn5L=DE!yv? zl7!p!kc1mf1Svp5ZbV06-r4YWQua(yh{6Ry7fB^dg(b2{6dJA~G1LQ4klct%0Z3~t zh;hITRW;=iO-3_MYW}5^V+~4|iKJ1s912E*0+E76*A6jDCv=L0;uUIaYED!0DjEdI zPw9R?8WH6DCA~U}#-CfX58*Osf2-r#AN$jb_B9gdU~xxK6seU(>&}`d>uah21-?S_ zF5i)=JQ5mrxF`OMrWr+3DMv-2_i$eu9az1x5*debDjtbXh`c#eT=zDP{=RPjG;bK(hC4f zBgO;(RssKjN~)Tovnky)qT0$7k_J^*=5d`MfJGIS=Vo4KmVr5tOva26)LtxOioHyI zFZ2iIzxZdqkL{-{oLrI4kfcMoizZG5gY@E|y5i{gW_PitBc)Ji%=ZAfP+c(`V;ZL^ zRQ$$004VzQ7g11cLA*sYPL}!!dL__|Le;@%K((I^gD4fwa?8)BVs+9%5G8Bg+VH3n z7qab;6;fghV;f0!sIU#{xrm9CI7?_JF5)@Sf^U3SVy|s&#`1w zTFj^w)M=$+M#q#=tjg#kAHf{4OidbBg)_(T_BSLC9hq@jfccIA;gs0O?{wn_|LdPk z8Q^gUH4kEu6CIVX0o<+-l$>C-n#d zXkiIK%WE}}Xx6%p3Q$o^cb`*=ZxOp|bl0M0U+V*&MdFl_YOoeq)iWr<<_d6kYffLl zb<|oaS&EMM>!9FYulr}Ox`^HC+)D`7_PXjG^6h`}!I8NbeaWq^OVppQVTG{B-Fq$3 zbvsBKbi$<%y>&mMDE!xk%r3SZd#xey8G^eu?J$b@v=qK^A!)oy=W%tol=enLN*JlV z+yxYnTS*0}TA7QCmTD;K>j^#%5ieu~A-GG5KT>^xR#J*}>dUm;jV&)au1^o~j53X4 zS>blFeL4X}rw-u$Zo>%+^c+!%ZoJH3NSKe~OjoEq3CH7Sa!3VvWSCLcFtdRVq@GaQ zw_iUvVh=-WDYBJ!$iCG#6xvAoFOj?G+E><(EyQ)o1_d_%qGT2j?@jb$dlC0by7^!K E2OZ3Kf&c&j literal 0 HcmV?d00001 diff --git a/sip-validator.py b/sip-validator.py deleted file mode 100644 index 3100fec..0000000 --- a/sip-validator.py +++ /dev/null @@ -1,238 +0,0 @@ -import os -import csv -import re -from datetime import datetime -from urllib.parse import urlparse -import time -from PIL import Image - -class SIPValidator: - def __init__(self): - self.errors = [] - self.non_unique_rows = [] - - ## make dynamic for these variables - self.required_collection_fields = ['identifier', 'title', 'description', 'visibility', 'rights_holder', 'rights'] - self.required_item_fields = ['identifier', 'title', 'description', 'display_date', 'start_date', 'end_date', - 'rights_holder', 'rights', 'tags','type', 'language','visibility'] - - def has_special_characters(self, text): - special_characters = r'[@#$%&*/!]' - - if text is None: - return False - - parsed_url = urlparse(text) - if parsed_url.scheme and parsed_url.netloc: - return False - - return bool(re.search(special_characters, text)) - - def check_naming_convention(self, name, folder): - if self.has_special_characters(name): - self.errors.append(f"Error: Special characters found in '{name}' of folder '{folder}'") - - def check_special_characters_in_fields(self, row, row_number, identifier, file_name, fields): - field_errors = [] - for field in fields: - value = row.get(field) - if self.has_special_characters(value): - field_errors.append(f"Error: Special characters found in '{file_name}', Row {row_number}, Identifier {identifier}, Field '{field}'") - return field_errors - - def validate_date_format(self, date_string, row_number, identifier, file_name): - try: - datetime.strptime(date_string, '%Y-%m-%d') - except ValueError: - self.errors.append(f"Error: Incorrect date format in '{file_name}', Row {row_number}") - - def validate_csv_file(self, file_path, required_fields): - try: - with open(file_path, 'r', encoding='utf-8-sig', newline='') as csv_file: - reader = csv.DictReader(csv_file) - fields = reader.fieldnames - - if sorted(required_fields) != sorted(fields): - incorrect_missing_fields = [field for field in fields if field not in required_fields] - - for field in required_fields: - if field not in fields: - self.errors.append(f"Error: Missing required field '{field}' in '{os.path.basename(file_path)}'") - - identifiers = set() - for row_number, row in enumerate(reader, start=2): - identifier = row.get('identifier') - if identifier in identifiers: - self.non_unique_rows.append(row) - else: - identifiers.add(identifier) - - start_date = row.get('start_date') - if start_date: - self.validate_date_format(start_date, row_number, identifier, os.path.basename(file_path)) - - field_errors = self.check_special_characters_in_fields(row, row_number, identifier, os.path.basename(file_path), fields) - self.errors.extend(field_errors) - except Exception as e: - self.errors.append(f"Error reading CSV file '{os.path.basename(file_path)}': {str(e)}") - - return self.errors, self.non_unique_rows - - def validate_folder_structure(self, sip_root_path, expected_structure): - for folder, items in expected_structure.items(): - folder_path = os.path.join(sip_root_path, folder) - self.check_naming_convention(folder, "") - - if not os.path.exists(folder_path): - self.errors.append(f"Error: Missing folder '{folder}'") - - for item in items: - if item.lower().endswith('.csv'): - item_regex = re.compile(r".*?" + re.escape(item) + r"$", re.IGNORECASE) - csv_files = [file for file in os.listdir(folder_path) if item_regex.match(file)] - for csv_file in csv_files: - item_path = os.path.join(folder_path, csv_file) - self.validate_csv_file(item_path, self.required_collection_fields if csv_file.lower() == 'collection_metadata.csv' else self.required_item_fields) - else: - item_path = os.path.join(folder_path, item) - self.check_naming_convention(item, folder) - - if not os.path.exists(item_path): - self.errors.append(f"Error: Missing item '{item}' in folder '{folder}'") - - return self.errors - - def validate_metadata_files(self, metadata_folder): - collection_metadata_files = [file for file in os.listdir(metadata_folder) if file.lower().endswith('_collection_metadata.csv')] - if not collection_metadata_files: - self.errors.append(f"Error: Missing collection metadata file with the expected naming convention in the Metadata folder") - else: - for collection_metadata_file in collection_metadata_files: - self.validate_csv_file(os.path.join(metadata_folder, collection_metadata_file), self.required_collection_fields) - - item_metadata_files = [file for file in os.listdir(metadata_folder) if file.lower().endswith('_item_metadata.csv')] - if not item_metadata_files: - self.errors.append(f"Error: Missing item metadata file with the expected naming convention in the Metadata folder") - else: - for item_metadata_file in item_metadata_files: - self.validate_csv_file(os.path.join(metadata_folder, item_metadata_file), self.required_item_fields) - - return self.errors - - def validate_digital_objects(self, digital_objects_folder, current_metadata_level): - - #make dynamic - acceptable_formats = ['TIFF', 'PDF', 'JPG', 'JPEG', 'PNG', 'GIF', 'WAVE', 'WAV', 'MP3', 'MOV', 'MKV', 'MP4', - 'AVI', 'CSV', 'XML', 'XLSX', 'TXT', 'HTML', 'SGML', 'RTF', 'X3D', 'GLB', 'STL', 'CGM', - 'PDF/A', 'TIFF', 'SHP', 'SVG','PPTX', 'MBOX' , 'TIF'] - - file_formats = set() - for root, _, files in os.walk(digital_objects_folder): - for file in files: - _, file_extension = os.path.splitext(file) - file_format = file_extension.upper()[1:] - file_formats.add(file_format) - - if not file_formats: - self.errors.append(f"Images in Data folder do not have the acceptable file formats.") - - return self.errors - - def validate_special_characters_in_files(self, sip_root_path): - for root, dirs, files in os.walk(sip_root_path): - for dir in dirs: - dir_path = os.path.join(root, dir) - self.check_naming_convention(dir, os.path.relpath(dir_path, sip_root_path)) - - for file in files: - file_path = os.path.join(root, file) - if self.has_special_characters(file): - self.errors.append(f"Error: Special characters found in file name '{file_path}'") - - def validate_sip_structure(self, sip_root_path, expected_structure): - current_metadata_level = None - collection_metadata_name = None - existing_collection_metadata = False # Flag to track if collection_metadata.csv existed before - for error in self.errors: - if "Bronze" in error: - current_metadata_level = "Bronze" - elif "Silver" in error: - current_metadata_level = "Silver" - elif "Gold" in error: - current_metadata_level = "Gold" - - folder_structure_errors = self.validate_folder_structure(sip_root_path, expected_structure) - self.errors.extend(folder_structure_errors) - - metadata_folder = os.path.join(sip_root_path, 'Metadata') - metadata_errors = self.validate_metadata_files(metadata_folder) - self.errors.extend(metadata_errors) - - digital_objects_folder = os.path.join(sip_root_path, 'Data') - digital_objects_errors = self.validate_digital_objects(digital_objects_folder, current_metadata_level) - self.errors.extend(digital_objects_errors) - - self.validate_special_characters_in_files(sip_root_path) - - if 'collection_metadata.csv' in expected_structure.get('Metadata', []): - collection_metadata_name = 'collection_metadata.csv' - metadata_files = [file.lower() for file in os.listdir(metadata_folder)] - if any(file.endswith('collection_metadata.csv') for file in metadata_files): - existing_collection_metadata = True # Marking existing collection_metadata.csv - else: - supporting_info_folder = os.path.join(sip_root_path, 'Supporting Information') - if not os.path.exists(supporting_info_folder): - self.errors.append("Error: Missing Supporting Information folder") - - associated_image_found = False - for file in os.listdir(supporting_info_folder): - if file.lower().endswith(('.jpg', '.jpeg', '.png', '.tiff')): - associated_image_found = True - break - - if not associated_image_found: - self.errors.append(f"Error: No associated image found in Supporting Information folder for the '{collection_metadata_name}'") - - return self.errors, collection_metadata_name, existing_collection_metadata - - def generate_receipt(self, sip_root_path, collection_metadata_name, existing_collection_metadata): - timestamp = datetime.now().strftime('%Y%m%d%H%M%S') - receipt_path = os.path.join(sip_root_path, f'validation_receipt_{timestamp}.txt') - - for file in os.listdir(sip_root_path): - if file.startswith('validation_receipt_') and file.endswith('.txt'): - os.remove(os.path.join(sip_root_path, file)) - - with open(receipt_path, 'w', encoding='utf-8') as receipt_file: - error_set = set(self.errors) - self.errors = list(error_set) - for error in self.errors: - receipt_file.write(f"{error}\n") - - if collection_metadata_name and existing_collection_metadata: - receipt_file.write(f"collection_metadata.csv existed before.\n") - - receipt_file.write("\nValidation completed successfully.") - - return receipt_path - -# # Example usage -# if __name__ == "__main__": -# validator = SIPValidator() -# sip_root_path = 'path_to_sip_root_directory' -# expected_structure = { -# 'Metadata': ['collection_metadata.csv', 'item_metadata.csv'], -# 'Data': ['data_item_1.jpg', 'data_item_2.pdf'], -# 'Supporting Information': ['supporting_doc_1.pdf'] -# } - -# errors, collection_metadata_name, existing_collection_metadata = validator.validate_sip_structure(sip_root_path, expected_structure) -# if errors: -# print("Errors found during validation:") -# for error in errors: -# print(error) -# else: -# print("No errors found.") - -# receipt_path = validator.generate_receipt(sip_root_path, collection_metadata_name, existing_collection_metadata) -# print(f"Validation receipt generated at: {receipt_path}") diff --git a/sip_validator.py b/sip_validator.py new file mode 100644 index 0000000..d6cf1bc --- /dev/null +++ b/sip_validator.py @@ -0,0 +1,207 @@ +import os +import csv +import re +import json +from datetime import datetime +from collections import OrderedDict + +class SIPValidator: + def __init__(self, root_path, config): + self.root_path = root_path + self.errors = OrderedDict() + self.config = config['sip_validator'] + + @staticmethod + def validate_date_format(date_text): + try: + datetime.strptime(date_text, '%Y/%m/%d') + return True + except ValueError: + return False + + @staticmethod + def validate_special_characters(string): + return bool(re.match(r'^[a-zA-Z0-9@#$%&*/!\']+$', string.lower())) + + def check_directory_structure(self): + required_folders = self.config['required_folders'] + try: + existing_folders = os.listdir(self.root_path) + except Exception as e: + self.errors[f"Error accessing directory {self.root_path}: {str(e)}"] = None + return + + extra_folders = [folder for folder in existing_folders if folder not in required_folders and os.path.isdir(os.path.join(self.root_path, folder))] + for folder in required_folders: + matched_folders = [f for f in existing_folders if re.match(rf'{folder}[^a-zA-Z0-9]*$', f, re.IGNORECASE)] + if not matched_folders: + self.errors[f"Missing required folder: {folder}"] = None + else: + for matched_folder in matched_folders: + if matched_folder != folder: + self.errors[f"Folder name should be {folder} but found {matched_folder}"] = None + elif not self.validate_special_characters(matched_folder): + self.errors[f"Folder name contains special characters: {matched_folder}"] = None + if extra_folders: + self.errors[f"Extra folders found: {', '.join(extra_folders)}"] = None + if not any(fname.startswith('README') and fname.split('.')[-1] in self.config['readme_extensions'] for fname in existing_folders): + self.errors["Missing README file with .txt or .md extension"] = None + if not self.errors: + self.errors["Directory structure is valid."] = None + + def check_files(self): + manifest_path = os.path.join(self.root_path, 'Manifest') + metadata_path = os.path.join(self.root_path, 'Metadata') + + # Checking manifest + if not os.path.exists(manifest_path): + self.errors["Missing required folder: Manifest"] = None + else: + try: + manifest_files = os.listdir(manifest_path) + except Exception as e: + self.errors[f"Error accessing Manifest folder: {str(e)}"] = None + return + if self.config['manifest_required_file'] not in manifest_files: + self.errors[f"Missing required file: {self.config['manifest_required_file']} in Manifest folder"] = None + extra_files = [file for file in manifest_files if file != self.config['manifest_required_file']] + if extra_files: + self.errors[f"Extra files found in Manifest folder: {', '.join(extra_files)}"] = None + + # Checking metadata + if not os.path.exists(metadata_path): + self.errors["Missing required folder: Metadata"] = None + else: + try: + metadata_files = os.listdir(metadata_path) + except Exception as e: + self.errors[f"Error accessing Metadata folder: {str(e)}"] = None + return + pattern_collection = re.compile(r'.*collection_metadata\.csv') + pattern_item = re.compile(r'.*item_metadata\.csv') + collection_found = any(pattern_collection.match(file.lower()) for file in metadata_files) + item_found = any(pattern_item.match(file.lower()) for file in metadata_files) + + if not collection_found: + self.errors[f"Missing required *{self.config['metadata_required_files'][0]} file in Metadata folder"] = None + if not item_found: + self.errors[f"Missing required *{self.config['metadata_required_files'][1]} file in Metadata folder"] = None + required_files = {self.config['metadata_required_files'][0]: collection_found, self.config['metadata_required_files'][1]: item_found} + for file_type, found in required_files.items(): + if found: + extra_files = [file for file in metadata_files if not pattern_collection.match(file.lower()) and not pattern_item.match(file.lower())] + if extra_files: + self.errors[f"Extra files found in Metadata folder: {', '.join(extra_files)}"] = None + + if not self.errors: + self.errors["All required files inside the folders are present."] = None + + @staticmethod + def read_csv_file(file_path): + try: + with open(file_path, mode='r', newline='', encoding='utf-8') as file: + reader = csv.DictReader((line.lower() for line in file)) + return list(reader) + except UnicodeDecodeError: + try: + with open(file_path, mode='r', newline='', encoding='latin1') as file: + reader = csv.DictReader((line.lower() for line in file)) + return list(reader) + except Exception as e: + return str(e) + except Exception as e: + return str(e) + + def validate_metadata_files(self): + metadata_path = os.path.join(self.root_path, 'Metadata') + required_fields = self.config['required_metadata_fields'] + + if not os.path.exists(metadata_path): + self.errors["Missing required folder: Metadata"] = None + return + + # Scan for all metadata files that match the patterns + for file_name in os.listdir(metadata_path): + if re.search(r'(collection_metadata\.csv|item_metadata\.csv)$', file_name.lower()): + file_path = os.path.join(metadata_path, file_name) + rows = self.read_csv_file(file_path) + if isinstance(rows, str): # An error message was returned + self.errors[f"Validation error in {file_name}: {rows}"] = None + continue + if not rows: + self.errors[f"Validation error in {file_name}: Could not read file or file is empty"] = None + continue + + fieldnames_lower = [field.strip() for field in rows[0].keys()] + + # Check field names once + missing_field = False + for field in required_fields: + matched_fields = [f for f in fieldnames_lower if re.match(rf'{field}[^a-zA-Z0-9]*$', f, re.IGNORECASE)] + if not matched_fields: + self.errors[f"Validation error in {file_name}: Missing required column {field}"] = None + missing_field = True + else: + for matched_field in matched_fields: + if matched_field != field: + self.errors[f"Validation error in {file_name}: Field name should be {field} but found {matched_field}"] = None + missing_field = True + elif field != 'rights_holder' and not self.validate_special_characters(matched_field): + self.errors[f"Validation error in {file_name}: Field name contains special characters: {matched_field}"] = None + missing_field = True + + # If any required field is missing or incorrect, skip row validation + if missing_field: + continue + + # Check each row for content + for row in rows: + identifier = row.get('identifier', 'unknown').strip() + + # Validate identifier + if not self.validate_special_characters(identifier): + self.errors[f"Validation error in {file_name} (identifier {identifier}): Invalid identifier."] = None + + # Validate date format + date = row.get('date', '').strip() + if date and not self.validate_date_format(date): + self.errors[f"Validation error in {file_name} (identifier {identifier}): Invalid date format."] = None + + # Required fields check + for field in required_fields: + if field not in row or not row[field].strip(): + self.errors[f"Validation error in {file_name} (identifier {identifier}): Missing or invalid {field}."] = None + + def write_validation_receipt(self): + receipt_path = os.path.join(self.root_path, 'validation_receipt.txt') + with open(receipt_path, 'w') as file: + for line in self.errors: + file.write(line + '\n') + if any("Missing required folder" in line or "Validation error" in line or "Folder name should be" in line for line in self.errors): + file.write("Validation failed.\n") + + def run_validation(self): + self.check_directory_structure() + self.check_files() + self.validate_metadata_files() + return self.errors + +# if __name__ == "__main__": +# root_path = input("Enter the path to the SIP directory: ") +# environment_config_path = 'config.json' # Path to your main configuration file + +# with open(environment_config_path, 'r') as env_file: +# env_config = json.load(env_file) +# environment = env_config["environment"] + +# config_path = f'config_{environment}.json' # Select the appropriate environment configuration file + +# validator = SIPValidator(root_path, config_path) +# validation_errors = validator.run_validation() +# if validation_errors: +# for error in validation_errors: +# print(error) +# validator.write_validation_receipt() +# print(f"Validation receipt has been written to {os.path.join(root_path, 'validation_receipt.txt')}") +# else: +# print("All checks passed. No issues found.")