Skip to content

Commit

Permalink
[PROGRESS UPDATE]:
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Sep 24, 2024
1 parent 5140963 commit 6842ac8
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 82 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ downloads/
eggs/
.eggs/
lib/
agent_workspace
lib64/
parts/
.ruff_cache/
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@

# MedGuard


[![Join our Discord](https://img.shields.io/badge/Discord-Join%20our%20server-5865F2?style=for-the-badge&logo=discord&logoColor=white)](https://discord.gg/agora-999382051935506503) [![Subscribe on YouTube](https://img.shields.io/badge/YouTube-Subscribe-red?style=for-the-badge&logo=youtube&logoColor=white)](https://www.youtube.com/@kyegomez3242) [![Connect on LinkedIn](https://img.shields.io/badge/LinkedIn-Connect-blue?style=for-the-badge&logo=linkedin&logoColor=white)](https://www.linkedin.com/in/kye-g-38759a207/) [![Follow on X.com](https://img.shields.io/badge/X.com-Follow-1DA1F2?style=for-the-badge&logo=x&logoColor=white)](https://x.com/kyegomezb)

**MedGuard** is a robust, production-grade Python library that ensures HIPAA compliance for large language model (LLM) agents. Designed for enterprise applications in healthcare, MedGuard provides comprehensive security, privacy, and compliance frameworks that integrate seamlessly into your AI-driven workflows. The library guarantees that your AI models and agents operate within strict regulatory boundaries, particularly the Health Insurance Portability and Accountability Act (HIPAA), ensuring the protection of sensitive health data.
Expand All @@ -23,7 +21,7 @@
To install MedGuard, use the following pip command:

```bash
pip install medguard
pip3 install -U medguard
```

## Quick Start
Expand Down
41 changes: 41 additions & 0 deletions aes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import json
from medguard.secure_communications import SecureCommunicationAES

if __name__ == "__main__":
secure_comm = SecureCommunicationAES()

# Encrypt a simple string with an authorized role
user_role = "doctor"
message = "This is sensitive health information."
encrypted_message = secure_comm.encrypt(message, user_role)
print(f"Encrypted payload: {encrypted_message}")

# Decrypt the message with the same authorized role
decrypted_message = secure_comm.decrypt(
encrypted_message, user_role
)
print(f"Decrypted message: {decrypted_message}")

# Attempt to encrypt with an unauthorized role
try:
unauthorized_role = "patient"
encrypted_message = secure_comm.encrypt(
message, unauthorized_role
)
except PermissionError as e:
print(e)

# Encrypt a JSON object
sample_json = json.dumps(
{
"name": "John Doe",
"ssn": "123-45-6789",
"diagnosis": "Hypertension",
}
)
encrypted_json = secure_comm.encrypt(sample_json, user_role)
print(f"Encrypted JSON payload: {encrypted_json}")

# Decrypt the JSON object
decrypted_json = secure_comm.decrypt(encrypted_json, user_role)
print(f"Decrypted JSON: {decrypted_json}")
19 changes: 19 additions & 0 deletions agent_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dotenv import load_dotenv
from medinsight.agent import MedInsightPro

from medguard.main import MedGuard

load_dotenv()

# Initialize the MedInsight Pro agent
med_agent = MedInsightPro(max_articles=10)


# MedGuard
model = MedGuard(agent=med_agent, user="admin")

print(
model.run_agent(
"Patient Information: Name - Kye Gomez, Social Security Number - 888-233-2322, Email - [email protected]. Diagnosis: Pennuenmoia. Treatment Query: What is the most effective treatment plan for a patient diagnosed with Pennuenmoia?"
)
)
45 changes: 22 additions & 23 deletions medguard/de_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ def __init__(self, patterns: List[str] = None) -> None:
:param patterns: A list of regex patterns to identify sensitive data.
"""
if patterns is None:
patterns = [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN pattern
r"\b[A-Z][a-z]*\s[A-Z][a-z]*\b", # Names pattern (simple)
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b", # Email pattern
r"\b\d{3}-\d{3}-\d{4}\b", # Phone number pattern
r"\b\d{1,2}/\d{1,2}/\d{2,4}\b", # Date pattern (MM/DD/YYYY or MM/DD/YY)
r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", # Credit card number pattern
r"\b\d{3}-\d{4}-\d{4}-\d{4}\b", # Credit card number pattern with dashes
r"\b\d{16}\b", # Credit card number pattern without dashes
]
patterns = [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN pattern
r"\b[A-Z][a-z]*\s[A-Z][a-z]*\b", # Names pattern (simple)
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b", # Email pattern
r"\b\d{3}-\d{3}-\d{4}\b", # Phone number pattern
r"\b\d{1,2}/\d{1,2}/\d{2,4}\b", # Date pattern (MM/DD/YYYY or MM/DD/YY)
r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", # Credit card number pattern
r"\b\d{3}-\d{4}-\d{4}-\d{4}\b", # Credit card number pattern with dashes
r"\b\d{16}\b", # Credit card number pattern without dashes
]
self.patterns = patterns

def deidentify(self, text: str) -> str:
Expand All @@ -45,16 +44,16 @@ def deidentify(self, text: str) -> str:
return text


import time
# import time

# Example usage
if __name__ == "__main__":
deid = Deidentifier()
sample_text = "John Doe's SSN is 123-45-6789 and his email is [email protected]."
start_time = time.time()
clean_text = deid.deidentify(sample_text)
end_time = time.time()
print(
f"Deidentification completed in {end_time - start_time} seconds."
)
print(clean_text)
# # Example usage
# if __name__ == "__main__":
# deid = Deidentifier()
# sample_text = "John Doe's SSN is 123-45-6789 and his email is [email protected]."
# start_time = time.time()
# clean_text = deid.deidentify(sample_text)
# end_time = time.time()
# print(
# f"Deidentification completed in {end_time - start_time} seconds."
# )
# print(clean_text)
72 changes: 56 additions & 16 deletions medguard/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from typing import List, Dict, Any

from loguru import logger
Expand All @@ -6,21 +7,33 @@
from medguard.de_identifier import Deidentifier
from medguard.secure_communications import SecureCommunicationAES
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from pydantic import BaseModel, Field

import time
from typing import List, Dict, Any

class MedGuardInputLog(BaseModel):
agent_id: str
agent_config: Dict[Any, Any]
user_id: str
user_role: str
task: str
data_sources: str
output_data: str
timestamp: str = Field(
...,
)
id: str = Field(default_factory=uuid.uuid4.hex)
agent_id: str = Field(default="")
agent_config: Dict[Any, Any] = Field(default={})
user_id: str = Field(default="")
user_role: str = Field(default="")
task: str = Field(default="")
data_sources: str = Field(default="")
timestamp: str = Field(default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S"))

class MedGuardInputLogs(BaseModel):
output: str = Field(default="")
timestamp: str = Field(default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S"))
cleaned_task: str = Field(default="")
encrypted_input: str = Field(default="")
time: str = Field(default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S"))

class MedGuardOutputLog(BaseModel):
id: str = Field(default_factory=uuid.uuid4.hex)
input_config: MedGuardInputLog
outputs: List[MedGuardInputLogs] = Field(default=[])
time_stamp: str = Field(default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S"))


class MedGuard:
Expand All @@ -35,6 +48,7 @@ def __init__(
agent: Agent = None,
agents: List[Agent] = None,
user: str = None,
data_sources: List[str] = None, # PDFS, CSVs, any medical data sources available
*args,
**kwargs,
):
Expand All @@ -49,11 +63,21 @@ def __init__(
self.agent = agent
self.agents = agents
self.user = user
self.key = SecureCommunicationAES()
self.data_sources = data_sources
self.key = SecureCommunicationAES(roles=[self.user])
self.deidentifer = Deidentifier()
logger.info(f"MedGuard initialized with agent: {agent}")
logger.info(
f"MedGuard initialized with agents: {len(agents)}"

# self.logs = MedGuardInputLog
self.logs = MedGuardOutputLog(
input_config=MedGuardInputLog(
agent.id,
agent.to_dict(),
user_id=uuid.uuid4.hex(),
user_role=user,
task="",
data_sources=data_sources,
),
outputs=[],
)

def run_agent(self, task: str, *args, **kwargs):
Expand All @@ -62,26 +86,42 @@ def run_agent(self, task: str, *args, **kwargs):
This method implements the actual logic to run the MedGuard instance with the configured agent(s) while adhering to HIPAA compliance guidelines.
"""
self.logs.input_config.task = task

logger.info(
"Running MedGuard instance with HIPAA compliance..."
)

# De-identify sensitive information in the task before encryption
deidentified_task = self.deidentifer.deidentify(task)
logger.info(f"De-identified task: {deidentified_task}")

# Encrypt the de-identified task for secure transmission
encrypted_task = self.key.encrypt(
deidentified_task, self.user
)
logger.info(f"Encrypted task: {encrypted_task}")

# Decrypt the task for processing by the agent
decrypted_task = self.key.decrypt(encrypted_task)
decrypted_task = self.key.decrypt(encrypted_task, self.user)
logger.info(f"Decrypted task: {decrypted_task}")

# Run the task with the agent
output = self.agent.run(decrypted_task, *args, **kwargs)
logger.info(f"Agent output: {output}")

# De-identify sensitive information in the output before returning
deidentified_output = self.deidentifer.deidentify(output)
logger.info(f"De-identified output: {deidentified_output}")

# Update the loogs
self.logs.outputs.append(
MedGuardInputLogs(
output = output,
cleaned_task=deidentified_task,
encrypted_input=encrypted_task,
)
)

return deidentified_output

Expand Down
77 changes: 38 additions & 39 deletions medguard/secure_communications.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import base64
import hashlib
import hmac
import json
import os
from typing import Dict, List

Expand Down Expand Up @@ -187,41 +186,41 @@ def decrypt(

# Example usage: Encrypting and decrypting a string or JSON with access control

if __name__ == "__main__":
secure_comm = SecureCommunicationAES()

# Encrypt a simple string with an authorized role
user_role = "doctor"
message = "This is sensitive health information."
encrypted_message = secure_comm.encrypt(message, user_role)
print(f"Encrypted payload: {encrypted_message}")

# Decrypt the message with the same authorized role
decrypted_message = secure_comm.decrypt(
encrypted_message, user_role
)
print(f"Decrypted message: {decrypted_message}")

# Attempt to encrypt with an unauthorized role
try:
unauthorized_role = "patient"
encrypted_message = secure_comm.encrypt(
message, unauthorized_role
)
except PermissionError as e:
print(e)

# Encrypt a JSON object
sample_json = json.dumps(
{
"name": "John Doe",
"ssn": "123-45-6789",
"diagnosis": "Hypertension",
}
)
encrypted_json = secure_comm.encrypt(sample_json, user_role)
print(f"Encrypted JSON payload: {encrypted_json}")

# Decrypt the JSON object
decrypted_json = secure_comm.decrypt(encrypted_json, user_role)
print(f"Decrypted JSON: {decrypted_json}")
# if __name__ == "__main__":
# secure_comm = SecureCommunicationAES()

# # Encrypt a simple string with an authorized role
# user_role = "doctor"
# message = "This is sensitive health information."
# encrypted_message = secure_comm.encrypt(message, user_role)
# print(f"Encrypted payload: {encrypted_message}")

# # Decrypt the message with the same authorized role
# decrypted_message = secure_comm.decrypt(
# encrypted_message, user_role
# )
# print(f"Decrypted message: {decrypted_message}")

# # Attempt to encrypt with an unauthorized role
# try:
# unauthorized_role = "patient"
# encrypted_message = secure_comm.encrypt(
# message, unauthorized_role
# )
# except PermissionError as e:
# print(e)

# # Encrypt a JSON object
# sample_json = json.dumps(
# {
# "name": "John Doe",
# "ssn": "123-45-6789",
# "diagnosis": "Hypertension",
# }
# )
# encrypted_json = secure_comm.encrypt(sample_json, user_role)
# print(f"Encrypted JSON payload: {encrypted_json}")

# # Decrypt the JSON object
# decrypted_json = secure_comm.decrypt(encrypted_json, user_role)
# print(f"Decrypted JSON: {decrypted_json}")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "medguard"
version = "0.0.1"
version = "0.0.2"
description = "medguard - Swarms"
license = "MIT"
authors = ["Kye Gomez <[email protected]>"]
Expand Down

0 comments on commit 6842ac8

Please sign in to comment.