Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First version of file Execution model #891

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def get_required_setting(
"connector_v2",
"adapter_processor_v2",
"file_management",
"workflow_manager.file_execution",
"workflow_manager.endpoint_v2",
"workflow_manager.workflow_v2",
"tool_instance_v2",
Expand Down
7 changes: 6 additions & 1 deletion backend/scheduler/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,13 @@ def execute_pipeline_task_v2(
PipelineProcessor.update_pipeline(
pipeline_id, Pipeline.PipelineStatus.INPROGRESS
)
# Mark the File in file history to avoid duplicate execution
# only for ETL and TASK execution
use_file_history: bool = True
execution_response = WorkflowHelper.complete_execution(
workflow=workflow, pipeline_id=pipeline_id
workflow=workflow,
pipeline_id=pipeline_id,
use_file_history=use_file_history,
)
execution_response.remove_result_metadata_keys()
logger.info(
Expand Down
4 changes: 4 additions & 0 deletions backend/workflow_manager/endpoint_v2/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class FileHash:
file_hash: str
file_name: str
source_connection_type: str
file_size: Optional[int] = None
mime_type: Optional[str] = None
file_destination: Optional[tuple[str, str]] = (
None # To which destination this file wants to go for MRQ percentage
)
Expand All @@ -22,6 +24,8 @@ def to_json(self) -> dict[str, Any]:
"source_connection_type": self.source_connection_type,
"file_destination": self.file_destination,
"is_executed": self.is_executed,
"file_size": self.file_size,
"mime_type": self.mime_type,
}

@staticmethod
Expand Down
57 changes: 42 additions & 15 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Optional

import fsspec
import magic
from connector_processor.constants import ConnectorKeys
from connector_v2.models import ConnectorInstance
from django.core.files.uploadedfile import UploadedFile
Expand Down Expand Up @@ -313,10 +314,19 @@ def _get_matched_files(
break
if self._should_process_file(file, patterns):
file_path = str(os.path.join(root, file))
file_content, file_size = self.get_file_content(
input_file_path=file_path
)
if self._is_new_file(
file_path=file_path, workflow=self.endpoint.workflow
file_path=file_path,
file_content=file_content,
workflow=self.endpoint.workflow,
):
matched_files[file_path] = self._create_file_hash(file_path)
matched_files[file_path] = self._create_file_hash(
file_path=file_path,
file_content=file_content,
file_size=file_size,
)
count += 1

return matched_files, count
Expand All @@ -327,10 +337,11 @@ def _should_process_file(self, file: str, patterns: list[str]) -> bool:
fnmatch.fnmatchcase(file.lower(), pattern.lower()) for pattern in patterns
)

def _is_new_file(self, file_path: str, workflow: Workflow) -> bool:
def _is_new_file(
self, file_path: str, file_content: bytes, workflow: Workflow
) -> bool:
"""Check if the file is new or already processed."""
file_content = self.get_file_content(input_file_path=file_path)
file_hash = self.get_hash_value(file_content)
file_hash = self.get_file_content_hash(file_content)
file_history = FileHistoryHelper.get_file_history(
workflow=workflow, cache_key=file_hash
)
Expand All @@ -350,18 +361,21 @@ def _is_new_file(self, file_path: str, workflow: Workflow) -> bool:

return True

def _create_file_hash(self, file_path: str) -> FileHash:
def _create_file_hash(
self, file_path: str, file_content: bytes, file_size: Optional[int]
) -> FileHash:
"""Create a FileHash object for the matched file."""
file_name = os.path.basename(file_path)
file_content = self.get_file_content(input_file_path=file_path)
file_hash = self.get_hash_value(file_content)
file_hash = self.get_file_content_hash(file_content)
connection_type = self.endpoint.connection_type

file_type = magic.from_buffer(file_content, mime=True)
return FileHash(
file_path=file_path,
source_connection_type=connection_type,
file_name=file_name,
file_hash=file_hash,
file_size=file_size,
mime_type=file_type,
)

def list_files_from_source(
Expand Down Expand Up @@ -408,7 +422,9 @@ def hash_str(cls, string_to_hash: Any, hash_method: str = "sha256") -> str:
else:
raise ValueError(f"Unsupported hash_method: {hash_method}")

def get_file_content(self, input_file_path: str, chunk_size: int = 8192) -> bytes:
def get_file_content(
self, input_file_path: str, chunk_size: int = 8192
) -> tuple[bytes, Optional[int]]:
"""Read the content of a file from a remote filesystem in chunks.

Args:
Expand All @@ -417,21 +433,30 @@ def get_file_content(self, input_file_path: str, chunk_size: int = 8192) -> byte
(default is 8192 bytes).

Returns:
bytes: The content of the file.
tuple[bytes, int]:
A tuple containing the content of the file as bytes and
its size in bytes.
"""
connector: ConnectorInstance = self.endpoint.connector_instance
connector_settings: dict[str, Any] = connector.connector_metadata
source_fs = self.get_fsspec(
settings=connector_settings, connector_id=connector.connector_id
)

# Get file size
file_metadata = source_fs.stat(input_file_path)
file_size: Optional[int] = file_metadata.get("size")
if file_size is None:
logger.warning(f"File size for {input_file_path} could not be determined.")

file_content = bytearray() # Use bytearray for efficient byte concatenation
with source_fs.open(input_file_path, "rb") as remote_file:
while chunk := remote_file.read(chunk_size):
file_content.extend(chunk)

return bytes(file_content)
return bytes(file_content), file_size

def get_hash_value(self, file_content: bytes) -> str:
def get_file_content_hash(self, file_content: bytes) -> str:
"""Generate a hash value from the file content.

Args:
Expand Down Expand Up @@ -469,8 +494,8 @@ def add_input_from_connector_to_volume(self, input_file_path: str) -> str:
source_file = f"file://{source_file_path}"

# Get file content and hash value
file_content = self.get_file_content(input_file_path)
hash_value_of_file_content = self.get_hash_value(file_content)
file_content, _ = self.get_file_content(input_file_path)
hash_value_of_file_content = self.get_file_content_hash(file_content)

logger.info(
f"hash_value_of_file {source_file} is : {hash_value_of_file_content}"
Expand Down Expand Up @@ -695,6 +720,8 @@ def add_input_file_to_api_storage(
file_name=file_name,
file_hash=file_hash,
is_executed=is_executed,
file_size=file.size,
mime_type=file.content_type,
)
file_hashes.update({file_name: file_hash})
return file_hashes
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions backend/workflow_manager/file_execution/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class FileExecutionConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "workflow_manager.file_execution"
122 changes: 122 additions & 0 deletions backend/workflow_manager/file_execution/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Generated by Django 4.2.1 on 2024-12-12 05:41

import uuid

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

initial = True

dependencies = [
("workflow_v2", "0002_remove_workflow_llm_response_and_more"),
]

operations = [
migrations.CreateModel(
name="WorkflowFileExecution",
fields=[
("created_at", models.DateTimeField(auto_now_add=True)),
("modified_at", models.DateTimeField(auto_now=True)),
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
(
"file_name",
models.CharField(db_comment="Name of the file", max_length=255),
),
(
"file_path",
models.CharField(
db_comment="Full Path of the file", max_length=255, null=True
),
),
(
"file_size",
models.BigIntegerField(
db_comment="Size of the file in bytes", null=True
),
),
(
"file_hash",
models.CharField(
db_comment="Hash of the file content", max_length=64
),
),
(
"mime_type",
models.CharField(
blank=True,
db_comment="MIME type of the file",
max_length=128,
null=True,
),
),
(
"status",
models.TextField(
choices=[
("PENDING", "PENDING"),
("INITIATED", "INITIATED"),
("QUEUED", "QUEUED"),
("READY", "READY"),
("EXECUTING", "EXECUTING"),
("COMPLETED", "COMPLETED"),
("STOPPED", "STOPPED"),
("ERROR", "ERROR"),
],
db_comment="Current status of the execution",
),
),
(
"execution_time",
models.FloatField(
db_comment="Execution time in seconds", null=True
),
),
(
"execution_error",
models.TextField(
blank=True,
db_comment="Error message if execution failed",
null=True,
),
),
(
"workflow_execution",
models.ForeignKey(
db_comment="Foreign key from WorkflowExecution model",
editable=False,
on_delete=django.db.models.deletion.CASCADE,
to="workflow_v2.workflowexecution",
),
),
],
options={
"verbose_name": "Workflow File Execution",
"verbose_name_plural": "Workflow File Executions",
"db_table": "workflow_file_execution",
"indexes": [
models.Index(
fields=["workflow_execution", "file_hash"],
name="workflow_file_hash_idx",
)
],
},
),
migrations.AddConstraint(
model_name="workflowfileexecution",
constraint=models.UniqueConstraint(
fields=("workflow_execution", "file_hash", "file_path"),
name="unique_workflow_file_hash_path",
),
),
]
Empty file.
Loading
Loading