generated from kyegomez/Python-Package-Template
-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
81 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
from logging.handlers import RotatingFileHandler | ||
import concurrent.futures | ||
from functools import lru_cache | ||
import os | ||
|
||
class PHICategory(Enum): | ||
"""Enumeration of Protected Health Information (PHI) categories according to HIPAA.""" | ||
|
@@ -52,17 +53,8 @@ class PatternError(DeidentificationError): | |
class Deidentifier: | ||
""" | ||
Production-grade medical data de-identification system. | ||
Features: | ||
- HIPAA compliance-focused patterns | ||
- Configurable replacement strategies | ||
- Input validation | ||
- Comprehensive logging | ||
- Performance optimization | ||
- Error handling | ||
- Audit trail | ||
""" | ||
|
||
DEFAULT_PATTERNS = { | ||
PHICategory.NAME: r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,3}\b", | ||
PHICategory.SSN: r"\b\d{3}-?\d{2}-?\d{4}\b", | ||
|
@@ -74,37 +66,63 @@ class Deidentifier: | |
PHICategory.PROVIDER: r"(?:Dr\.|Doctor|Provider)\s*[A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2}", | ||
PHICategory.DEVICE_ID: r"\b(?:Device|Serial)\s*(?:ID|Number)?:?\s*[\w\-]{6,}\b", | ||
PHICategory.IP_ADDRESS: r"\b(?:\d{1,3}\.){3}\d{1,3}\b", | ||
PHICategory.URL: r"https?://(?:[\w-]\.)+[\w-]+(?:/[\w-./?%&=]*)?", | ||
# Fixed URL pattern | ||
PHICategory.URL: r"https?://[\w\.-]+\.\w+(?:/[\w\-./?%&=]*)?", | ||
PHICategory.ACCOUNT: r"\b(?:Acct|Account)\s*(?:#|Number)?:?\s*\d{6,12}\b" | ||
} | ||
|
||
def __init__(self, | ||
config_path: Optional[Path] = None, | ||
log_path: Optional[Path] = None, | ||
replacement_strategy: str = "hash", | ||
max_threads: int = 4) -> None: | ||
max_threads: int = 4, | ||
enable_file_logging: bool = False) -> None: | ||
""" | ||
Initialize the Deidentifier with configuration and logging setup. | ||
Args: | ||
config_path: Path to configuration file | ||
log_path: Path to log directory | ||
replacement_strategy: Strategy for replacing PHI ('hash', 'fixed', 'random') | ||
max_threads: Maximum number of threads for parallel processing | ||
""" | ||
self.patterns: Dict[PHICategory, DeidentificationPattern] = {} | ||
self.replacement_strategy = replacement_strategy | ||
self.max_threads = max_threads | ||
self._setup_logging(log_path) | ||
self._setup_logging(log_path if enable_file_logging else None) | ||
self._load_configuration(config_path) | ||
self._compile_patterns() | ||
self.audit_trail = [] | ||
|
||
def _create_default_patterns(self) -> Dict[PHICategory, DeidentificationPattern]: | ||
"""Create default pattern dictionary with metadata.""" | ||
patterns = {} | ||
for category, pattern in self.DEFAULT_PATTERNS.items(): | ||
patterns[category] = DeidentificationPattern( | ||
category=category, | ||
pattern=pattern, | ||
replacement=f"[{category.value}]", | ||
description=f"Default pattern for {category.value}", | ||
sensitivity=4 if category in [PHICategory.SSN, PHICategory.MRN] else 3 | ||
) | ||
return patterns | ||
|
||
def _create_patterns_from_config(self, config: Dict) -> Dict[PHICategory, DeidentificationPattern]: | ||
"""Create patterns from configuration file.""" | ||
patterns = {} | ||
for pattern_config in config.get('patterns', []): | ||
category = PHICategory[pattern_config['category']] | ||
patterns[category] = DeidentificationPattern( | ||
category=category, | ||
pattern=pattern_config['pattern'], | ||
replacement=pattern_config.get('replacement', f"[{category.value}]"), | ||
description=pattern_config.get('description', ''), | ||
sensitivity=pattern_config.get('sensitivity', 3) | ||
) | ||
return patterns | ||
|
||
def _setup_logging(self, log_path: Optional[Path]) -> None: | ||
"""Configure rotating file and console logging.""" | ||
self.logger = logging.getLogger(__name__) | ||
self.logger.setLevel(logging.INFO) | ||
|
||
# Remove any existing handlers | ||
self.logger.handlers = [] | ||
|
||
formatter = logging.Formatter( | ||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' | ||
) | ||
|
@@ -114,15 +132,25 @@ def _setup_logging(self, log_path: Optional[Path]) -> None: | |
console_handler.setFormatter(formatter) | ||
self.logger.addHandler(console_handler) | ||
|
||
# File handler with rotation | ||
if log_path: | ||
file_handler = RotatingFileHandler( | ||
log_path / 'deidentification.log', | ||
maxBytes=10485760, # 10MB | ||
backupCount=5 | ||
) | ||
file_handler.setFormatter(formatter) | ||
self.logger.addHandler(file_handler) | ||
# File handler with rotation (only if log_path is provided) | ||
if log_path is not None: | ||
try: | ||
# Create log directory if it doesn't exist | ||
log_path = Path(log_path) | ||
log_path.mkdir(parents=True, exist_ok=True) | ||
|
||
log_file = log_path / 'deidentification.log' | ||
file_handler = RotatingFileHandler( | ||
log_file, | ||
maxBytes=10485760, # 10MB | ||
backupCount=5 | ||
) | ||
file_handler.setFormatter(formatter) | ||
self.logger.addHandler(file_handler) | ||
self.logger.info(f"File logging enabled at: {log_file}") | ||
except Exception as e: | ||
self.logger.warning(f"Failed to set up file logging: {str(e)}") | ||
self.logger.warning("Continuing with console logging only") | ||
|
||
def _load_configuration(self, config_path: Optional[Path]) -> None: | ||
"""Load patterns and configuration from file or use defaults.""" | ||
|
@@ -151,7 +179,7 @@ def _compile_patterns(self) -> None: | |
def _generate_replacement(self, text: str, category: PHICategory) -> str: | ||
"""Generate replacement text based on strategy and category.""" | ||
if self.replacement_strategy == "hash": | ||
return hashlib.sha256(text.encode()).hexdigest()[:8] | ||
return f"[{category.value}_{hashlib.sha256(text.encode()).hexdigest()[:8]}]" | ||
elif self.replacement_strategy == "fixed": | ||
return f"[{category.value}]" | ||
else: # random | ||
|
@@ -160,16 +188,6 @@ def _generate_replacement(self, text: str, category: PHICategory) -> str: | |
def deidentify(self, text: Union[str, List[str]]) -> Union[str, List[str]]: | ||
""" | ||
De-identify text containing PHI. | ||
Args: | ||
text: Input text or list of texts to de-identify | ||
Returns: | ||
De-identified text or list of texts | ||
Raises: | ||
ValidationError: If input validation fails | ||
DeidentificationError: If processing fails | ||
""" | ||
start_time = datetime.now() | ||
self.logger.info(f"Starting de-identification process at {start_time}") | ||
|
@@ -242,33 +260,30 @@ def validate_patterns(cls, patterns: Dict[PHICategory, DeidentificationPattern]) | |
except Exception as e: | ||
raise ValidationError(f"Pattern validation failed: {str(e)}") | ||
|
||
if __name__ == "__main__": | ||
# Example usage with error handling | ||
try: | ||
deidentifier = Deidentifier( | ||
log_path=Path("./logs"), | ||
replacement_strategy="hash" | ||
) | ||
|
||
sample_text = """ | ||
Patient Name: John Smith | ||
DOB: 01/15/1980 | ||
SSN: 123-45-6789 | ||
MRN: 12345678 | ||
Provider: Dr. Jane Wilson | ||
Email: [email protected] | ||
Phone: (555) 123-4567 | ||
Address: 123 Main Street, Anytown, NY 12345 | ||
""" | ||
# if __name__ == "__main__": | ||
# # Example usage with error handling | ||
# try: | ||
# deidentifier = Deidentifier( | ||
# replacement_strategy="hash", | ||
# enable_file_logging=False # Set to True if you want file logging | ||
# ) | ||
|
||
deidentified_text = deidentifier.deidentify(sample_text) | ||
print("\nDe-identified text:") | ||
print(deidentified_text) | ||
# sample_text = """ | ||
# Patient Name: John Smith | ||
# DOB: 01/15/1980 | ||
# SSN: 123-45-6789 | ||
# MRN: 12345678 | ||
# Provider: Dr. Jane Wilson | ||
# Email: [email protected] | ||
# Phone: (555) 123-4567 | ||
# Address: 123 Main Street, Anytown, NY 12345 | ||
# """ | ||
|
||
# Export audit trail | ||
deidentifier.export_audit_trail(Path("./audit_trail.json")) | ||
# deidentified_text = deidentifier.deidentify(sample_text) | ||
# print("\nDe-identified text:") | ||
# print(deidentified_text) | ||
|
||
except DeidentificationError as e: | ||
print(f"De-identification error: {str(e)}") | ||
except Exception as e: | ||
print(f"Unexpected error: {str(e)}") | ||
# except DeidentificationError as e: | ||
# print(f"De-identification error: {str(e)}") | ||
# except Exception as e: | ||
# print(f"Unexpected error: {str(e)}") |