From 11cfba67f4e7fa419d2fe681daba54d0056f6e45 Mon Sep 17 00:00:00 2001 From: ali-zipstack Date: Thu, 12 Dec 2024 13:10:36 +0530 Subject: [PATCH 1/2] First version of file Execution model --- backend/backend/settings/base.py | 1 + backend/workflow_manager/endpoint_v2/dto.py | 4 + .../workflow_manager/endpoint_v2/source.py | 53 ++++++-- .../file_execution/__init__.py | 0 .../workflow_manager/file_execution/apps.py | 6 + .../file_execution/file_execution_helper.py | 89 +++++++++++++ .../file_execution/migrations/0001_initial.py | 122 ++++++++++++++++++ .../file_execution/migrations/__init__.py | 0 .../workflow_manager/file_execution/models.py | 73 +++++++++++ .../workflow_manager/file_execution/views.py | 1 + .../workflow_manager/workflow_v2/execution.py | 7 + .../workflow_v2/workflow_helper.py | 56 +++++++- 12 files changed, 396 insertions(+), 16 deletions(-) create mode 100644 backend/workflow_manager/file_execution/__init__.py create mode 100644 backend/workflow_manager/file_execution/apps.py create mode 100644 backend/workflow_manager/file_execution/file_execution_helper.py create mode 100644 backend/workflow_manager/file_execution/migrations/0001_initial.py create mode 100644 backend/workflow_manager/file_execution/migrations/__init__.py create mode 100644 backend/workflow_manager/file_execution/models.py create mode 100644 backend/workflow_manager/file_execution/views.py diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index f8341cdaf..025da5eb5 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -232,6 +232,7 @@ def get_required_setting( "connector_v2", "adapter_processor_v2", "file_management", + "workflow_manager.file_execution", "workflow_manager.endpoint_v2", "workflow_manager.workflow_v2", "tool_instance_v2", diff --git a/backend/workflow_manager/endpoint_v2/dto.py b/backend/workflow_manager/endpoint_v2/dto.py index 0e7aa6fd9..6c701f2e9 100644 --- a/backend/workflow_manager/endpoint_v2/dto.py +++ b/backend/workflow_manager/endpoint_v2/dto.py @@ -9,6 +9,8 @@ class FileHash: file_hash: str file_name: str source_connection_type: str + file_size: Optional[int] = None + mime_type: Optional[str] = None file_destination: Optional[tuple[str, str]] = ( None # To which destination this file wants to go for MRQ percentage ) @@ -22,6 +24,8 @@ def to_json(self) -> dict[str, Any]: "source_connection_type": self.source_connection_type, "file_destination": self.file_destination, "is_executed": self.is_executed, + "file_size": self.file_size, + "mime_type": self.mime_type, } @staticmethod diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index c004f3397..e84e2d0fa 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -8,6 +8,7 @@ from typing import Any, Optional import fsspec +import magic from connector_processor.constants import ConnectorKeys from connector_v2.models import ConnectorInstance from django.core.files.uploadedfile import UploadedFile @@ -313,10 +314,19 @@ def _get_matched_files( break if self._should_process_file(file, patterns): file_path = str(os.path.join(root, file)) + file_content, file_size = self.get_file_content( + input_file_path=file_path + ) if self._is_new_file( - file_path=file_path, workflow=self.endpoint.workflow + file_path=file_path, + file_content=file_content, + workflow=self.endpoint.workflow, ): - matched_files[file_path] = self._create_file_hash(file_path) + matched_files[file_path] = self._create_file_hash( + file_path=file_path, + file_content=file_content, + file_size=file_size, + ) count += 1 return matched_files, count @@ -327,10 +337,11 @@ def _should_process_file(self, file: str, patterns: list[str]) -> bool: fnmatch.fnmatchcase(file.lower(), pattern.lower()) for pattern in patterns ) - def _is_new_file(self, file_path: str, workflow: Workflow) -> bool: + def _is_new_file( + self, file_path: str, file_content: bytes, workflow: Workflow + ) -> bool: """Check if the file is new or already processed.""" - file_content = self.get_file_content(input_file_path=file_path) - file_hash = self.get_hash_value(file_content) + file_hash = self.get_file_content_hash(file_content) file_history = FileHistoryHelper.get_file_history( workflow=workflow, cache_key=file_hash ) @@ -350,18 +361,21 @@ def _is_new_file(self, file_path: str, workflow: Workflow) -> bool: return True - def _create_file_hash(self, file_path: str) -> FileHash: + def _create_file_hash( + self, file_path: str, file_content: bytes, file_size: int + ) -> FileHash: """Create a FileHash object for the matched file.""" file_name = os.path.basename(file_path) - file_content = self.get_file_content(input_file_path=file_path) - file_hash = self.get_hash_value(file_content) + file_hash = self.get_file_content_hash(file_content) connection_type = self.endpoint.connection_type - + file_type = magic.from_buffer(file_content, mime=True) return FileHash( file_path=file_path, source_connection_type=connection_type, file_name=file_name, file_hash=file_hash, + file_size=file_size, + mime_type=file_type, ) def list_files_from_source( @@ -408,7 +422,9 @@ def hash_str(cls, string_to_hash: Any, hash_method: str = "sha256") -> str: else: raise ValueError(f"Unsupported hash_method: {hash_method}") - def get_file_content(self, input_file_path: str, chunk_size: int = 8192) -> bytes: + def get_file_content( + self, input_file_path: str, chunk_size: int = 8192 + ) -> tuple[bytes, int]: """Read the content of a file from a remote filesystem in chunks. Args: @@ -417,21 +433,28 @@ def get_file_content(self, input_file_path: str, chunk_size: int = 8192) -> byte (default is 8192 bytes). Returns: - bytes: The content of the file. + tuple[bytes, int]: + A tuple containing the content of the file as bytes and + its size in bytes. """ connector: ConnectorInstance = self.endpoint.connector_instance connector_settings: dict[str, Any] = connector.connector_metadata source_fs = self.get_fsspec( settings=connector_settings, connector_id=connector.connector_id ) + + # Get file size + file_metadata = source_fs.stat(input_file_path) + file_size = file_metadata.get("size", 0) + file_content = bytearray() # Use bytearray for efficient byte concatenation with source_fs.open(input_file_path, "rb") as remote_file: while chunk := remote_file.read(chunk_size): file_content.extend(chunk) - return bytes(file_content) + return bytes(file_content), file_size - def get_hash_value(self, file_content: bytes) -> str: + def get_file_content_hash(self, file_content: bytes) -> str: """Generate a hash value from the file content. Args: @@ -470,7 +493,7 @@ def add_input_from_connector_to_volume(self, input_file_path: str) -> str: # Get file content and hash value file_content = self.get_file_content(input_file_path) - hash_value_of_file_content = self.get_hash_value(file_content) + hash_value_of_file_content = self.get_file_content_hash(file_content) logger.info( f"hash_value_of_file {source_file} is : {hash_value_of_file_content}" @@ -695,6 +718,8 @@ def add_input_file_to_api_storage( file_name=file_name, file_hash=file_hash, is_executed=is_executed, + file_size=file.size, + mime_type=file.content_type, ) file_hashes.update({file_name: file_hash}) return file_hashes diff --git a/backend/workflow_manager/file_execution/__init__.py b/backend/workflow_manager/file_execution/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/workflow_manager/file_execution/apps.py b/backend/workflow_manager/file_execution/apps.py new file mode 100644 index 000000000..1beb32531 --- /dev/null +++ b/backend/workflow_manager/file_execution/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class FileExecutionConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "workflow_manager.file_execution" diff --git a/backend/workflow_manager/file_execution/file_execution_helper.py b/backend/workflow_manager/file_execution/file_execution_helper.py new file mode 100644 index 000000000..405197cef --- /dev/null +++ b/backend/workflow_manager/file_execution/file_execution_helper.py @@ -0,0 +1,89 @@ +from typing import Optional + +from workflow_manager.file_execution.models import WorkflowFileExecution +from workflow_manager.workflow_v2.enums import ExecutionStatus +from workflow_manager.workflow_v2.models.execution import WorkflowExecution + + +class FileExecutionHelper: + """ + Helper class for handling operations related to `WorkflowExecutionFile` model. + """ + + @staticmethod + def get_or_create_file_execution( + workflow_execution: WorkflowExecution, + file_name: str, + file_size: int, + file_hash: str, + file_path: Optional[str] = None, + mime_type: Optional[str] = None, + ) -> WorkflowFileExecution: + """ + Retrieves or creates a new input file record for a workflow execution. + + Args: + workflow_execution: The `WorkflowExecution` object associated with this file + file_name: The name of the input file + file_size: The size of the file in bytes + file_hash: The hash of the file content + file_path: (Optional) The full path of the input file + mime_type: (Optional) MIME type of the file + + return: + The `WorkflowExecutionInputFile` object + """ + execution_file, is_created = WorkflowFileExecution.objects.get_or_create( + workflow_execution=workflow_execution, + file_hash=file_hash, + file_path=file_path, + ) + if is_created: + execution_file.file_name = file_name + execution_file.file_size = file_size + execution_file.mime_type = mime_type + execution_file.save() + return execution_file + + @staticmethod + def update_status( + execution_file: WorkflowFileExecution, + status: ExecutionStatus, + execution_time: int = 0, + execution_error: str = None, + ) -> WorkflowFileExecution: + """ + Updates the status and execution details of an input file. + + Args: + execution_file: The `WorkflowExecutionFile` object to update + status: The new status of the file + execution_time: The execution time for processing the file + execution_error: (Optional) Error message if processing failed + + return: + The updated `WorkflowExecutionInputFile` object + """ + execution_file.status = status + execution_file.execution_time = execution_time + execution_file.execution_error = execution_error + execution_file.save() + return execution_file + + @staticmethod + def update_execution_error( + execution_file: WorkflowFileExecution, error_message: str + ) -> None: + """ + Updates the execution error for a file in case of failure. + + Args: + execution_file: The `WorkflowExecutionFile` object to update + error_message: The error message to set + + return: + None + """ + execution_file.execution_error = error_message + execution_file.status = ExecutionStatus.ERROR + execution_file.save() diff --git a/backend/workflow_manager/file_execution/migrations/0001_initial.py b/backend/workflow_manager/file_execution/migrations/0001_initial.py new file mode 100644 index 000000000..1b8a06d68 --- /dev/null +++ b/backend/workflow_manager/file_execution/migrations/0001_initial.py @@ -0,0 +1,122 @@ +# Generated by Django 4.2.1 on 2024-12-12 05:41 + +import uuid + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("workflow_v2", "0002_remove_workflow_llm_response_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="WorkflowFileExecution", + fields=[ + ("created_at", models.DateTimeField(auto_now_add=True)), + ("modified_at", models.DateTimeField(auto_now=True)), + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ( + "file_name", + models.CharField(db_comment="Name of the file", max_length=255), + ), + ( + "file_path", + models.CharField( + db_comment="Full Path of the file", max_length=255, null=True + ), + ), + ( + "file_size", + models.BigIntegerField( + db_comment="Size of the file in bytes", null=True + ), + ), + ( + "file_hash", + models.CharField( + db_comment="Hash of the file content", max_length=64 + ), + ), + ( + "mime_type", + models.CharField( + blank=True, + db_comment="MIME type of the file", + max_length=128, + null=True, + ), + ), + ( + "status", + models.TextField( + choices=[ + ("PENDING", "PENDING"), + ("INITIATED", "INITIATED"), + ("QUEUED", "QUEUED"), + ("READY", "READY"), + ("EXECUTING", "EXECUTING"), + ("COMPLETED", "COMPLETED"), + ("STOPPED", "STOPPED"), + ("ERROR", "ERROR"), + ], + db_comment="Current status of the execution", + ), + ), + ( + "execution_time", + models.FloatField( + db_comment="Execution time in seconds", null=True + ), + ), + ( + "execution_error", + models.TextField( + blank=True, + db_comment="Error message if execution failed", + null=True, + ), + ), + ( + "workflow_execution", + models.ForeignKey( + db_comment="Foreign key from WorkflowExecution model", + editable=False, + on_delete=django.db.models.deletion.CASCADE, + to="workflow_v2.workflowexecution", + ), + ), + ], + options={ + "verbose_name": "Workflow File Execution", + "verbose_name_plural": "Workflow File Executions", + "db_table": "workflow_file_execution", + "indexes": [ + models.Index( + fields=["workflow_execution", "file_hash"], + name="workflow_file_hash_idx", + ) + ], + }, + ), + migrations.AddConstraint( + model_name="workflowfileexecution", + constraint=models.UniqueConstraint( + fields=("workflow_execution", "file_hash", "file_path"), + name="unique_workflow_file_hash_path", + ), + ), + ] diff --git a/backend/workflow_manager/file_execution/migrations/__init__.py b/backend/workflow_manager/file_execution/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/workflow_manager/file_execution/models.py b/backend/workflow_manager/file_execution/models.py new file mode 100644 index 000000000..bbaf613a5 --- /dev/null +++ b/backend/workflow_manager/file_execution/models.py @@ -0,0 +1,73 @@ +import uuid + +from django.db import models +from utils.models.base_model import BaseModel +from workflow_manager.workflow_v2.enums import ExecutionStatus +from workflow_manager.workflow_v2.models.execution import WorkflowExecution + +FILE_NAME_LENGTH = 255 +FILE_PATH_LENGTH = 255 +HASH_LENGTH = 64 +MIME_TYPE_LENGTH = 128 + + +class WorkflowFileExecution(BaseModel): + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + workflow_execution = models.ForeignKey( + WorkflowExecution, + on_delete=models.CASCADE, + db_index=True, + editable=False, + db_comment="Foreign key from WorkflowExecution model", + ) + file_name = models.CharField( + max_length=FILE_NAME_LENGTH, db_comment="Name of the file" + ) + file_path = models.CharField( + max_length=FILE_PATH_LENGTH, null=True, db_comment="Full Path of the file" + ) + file_size = models.BigIntegerField( + null=True, db_comment="Size of the file in bytes" + ) + file_hash = models.CharField( + max_length=HASH_LENGTH, db_comment="Hash of the file content" + ) + mime_type = models.CharField( + max_length=MIME_TYPE_LENGTH, + blank=True, + null=True, + db_comment="MIME type of the file", + ) + status = models.TextField( + choices=ExecutionStatus.choices(), + db_comment="Current status of the execution", + ) + execution_time = models.FloatField( + null=True, db_comment="Execution time in seconds" + ) + execution_error = models.TextField( + blank=True, null=True, db_comment="Error message if execution failed" + ) + + def __str__(self): + return ( + f"WorkflowFileExecution: {self.file_name} " + f"(WorkflowExecution: {self.workflow_execution})" + ) + + class Meta: + verbose_name = "Workflow File Execution" + verbose_name_plural = "Workflow File Executions" + db_table = "workflow_file_execution" + indexes = [ + models.Index( + fields=["workflow_execution", "file_hash"], + name="workflow_file_hash_idx", + ), + ] + constraints = [ + models.UniqueConstraint( + fields=["workflow_execution", "file_hash", "file_path"], + name="unique_workflow_file_hash_path", + ), + ] diff --git a/backend/workflow_manager/file_execution/views.py b/backend/workflow_manager/file_execution/views.py new file mode 100644 index 000000000..60f00ef0e --- /dev/null +++ b/backend/workflow_manager/file_execution/views.py @@ -0,0 +1 @@ +# Create your views here. diff --git a/backend/workflow_manager/workflow_v2/execution.py b/backend/workflow_manager/workflow_v2/execution.py index 7d302f03a..9de25b754 100644 --- a/backend/workflow_manager/workflow_v2/execution.py +++ b/backend/workflow_manager/workflow_v2/execution.py @@ -14,6 +14,8 @@ from unstract.workflow_execution.exceptions import StopExecution from utils.local_context import StateStore from utils.user_context import UserContext +from workflow_manager.file_execution.file_execution_helper import FileExecutionHelper +from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.constants import WorkflowKey from workflow_manager.workflow_v2.enums import ExecutionStatus from workflow_manager.workflow_v2.exceptions import WorkflowExecutionError @@ -84,6 +86,7 @@ def __init__( ) workflow_execution.save() else: + self.workflow_execution = workflow_execution self.execution_mode = workflow_execution.execution_mode self.execution_method = workflow_execution.execution_method self.execution_type = workflow_execution.execution_type @@ -309,6 +312,7 @@ def execute_input_file( run_id: str, file_name: str, single_step: bool, + workflow_file_execution: WorkflowFileExecution, ) -> None: """Executes the input file. @@ -330,6 +334,9 @@ def execute_input_file( message=f"{file_name} Sent for execution", component=LogComponent.SOURCE, ) + FileExecutionHelper.update_status( + workflow_file_execution, ExecutionStatus.EXECUTING + ) self.execute(run_id, file_name, single_step) self.publish_log(f"Tool executed successfully for '{file_name}'") self._handle_execution_type(execution_type) diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 8df5fc575..7a0604f45 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -27,7 +27,10 @@ from utils.user_context import UserContext from workflow_manager.endpoint_v2.destination import DestinationConnector from workflow_manager.endpoint_v2.dto import FileHash +from workflow_manager.endpoint_v2.models import WorkflowEndpoint from workflow_manager.endpoint_v2.source import SourceConnector +from workflow_manager.file_execution.file_execution_helper import FileExecutionHelper +from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.constants import ( CeleryConfigurations, WorkflowErrors, @@ -109,7 +112,28 @@ def build_workflow_execution_service( return workflow_execution_service @staticmethod + def _get_or_create_workflow_execution_file( + execution_service: WorkflowExecutionServiceHelper, + file_hash: FileHash, + source: SourceConnector, + ) -> WorkflowFileExecution: + is_api = source.endpoint.connection_type == WorkflowEndpoint.ConnectionType.API + # Determine file path based on connection type + execution_file_path = file_hash.file_path if not is_api else None + + # Create or retrieve the workflow execution file + return FileExecutionHelper.get_or_create_file_execution( + workflow_execution=execution_service.workflow_execution, + file_name=file_hash.file_name, + file_hash=file_hash.file_hash, + file_path=execution_file_path, + file_size=file_hash.file_size, + mime_type=file_hash.mime_type, + ) + + @classmethod def process_input_files( + cls, workflow: Workflow, source: SourceConnector, destination: DestinationConnector, @@ -129,6 +153,12 @@ def process_input_files( q_file_no_list = WorkflowUtil.get_q_no_list(workflow, total_files) for index, (file_path, file_hash) in enumerate(input_files.items()): + # Get workflow execution file + workflow_execution_file = cls._get_or_create_workflow_execution_file( + execution_service=execution_service, + file_hash=file_hash, + source=source, + ) file_number = index + 1 file_hash = WorkflowUtil.add_file_destination_filehash( file_number, @@ -136,7 +166,7 @@ def process_input_files( file_hash, ) try: - error = WorkflowHelper.process_file( + error = cls._process_file( current_file_idx=file_number, total_files=total_files, input_file=file_hash.file_path, @@ -146,20 +176,37 @@ def process_input_files( execution_service=execution_service, single_step=single_step, file_hash=file_hash, + workflow_file_execution=workflow_execution_file, ) if error: failed_files += 1 + FileExecutionHelper.update_execution_error( + workflow_execution_file, error + ) else: successful_files += 1 + FileExecutionHelper.update_status( + workflow_execution_file, ExecutionStatus.COMPLETED + ) except StopExecution as e: execution_service.update_execution( ExecutionStatus.STOPPED, error=str(e) ) + FileExecutionHelper.update_status( + execution_file=workflow_execution_file, + status=ExecutionStatus.STOPPED, + execution_error=str(e), + ) break except Exception as e: failed_files += 1 error_message = f"Error processing file '{file_path}'. {e}" logger.error(error_message, stack_info=True, exc_info=True) + FileExecutionHelper.update_status( + execution_file=workflow_execution_file, + status=ExecutionStatus.ERROR, + execution_error=error_message, + ) execution_service.publish_log( message=error_message, level=LogLevel.ERROR ) @@ -178,7 +225,7 @@ def process_input_files( return execution_service.get_execution_instance() @staticmethod - def process_file( + def _process_file( current_file_idx: int, total_files: int, input_file: str, @@ -188,6 +235,7 @@ def process_file( execution_service: WorkflowExecutionServiceHelper, single_step: bool, file_hash: FileHash, + workflow_file_execution: WorkflowFileExecution, ) -> Optional[str]: error: Optional[str] = None file_name = source.add_file_to_volume( @@ -197,6 +245,9 @@ def process_file( execution_service.initiate_tool_execution( current_file_idx, total_files, file_name, single_step ) + FileExecutionHelper.update_status( + workflow_file_execution, ExecutionStatus.INITIATED + ) if not file_hash.is_executed: # Multiple run_ids are linked to an execution_id # Each run_id corresponds to workflow runs for a single file @@ -205,6 +256,7 @@ def process_file( run_id=run_id, file_name=file_name, single_step=single_step, + workflow_file_execution=workflow_file_execution, ) except StopExecution: raise From 685765bbaa2198b0243ea71fe270452e870cc0f8 Mon Sep 17 00:00:00 2001 From: ali-zipstack Date: Fri, 13 Dec 2024 15:28:13 +0530 Subject: [PATCH 2/2] Replaced helper class with model methods and a custom manager. --- backend/scheduler/tasks.py | 7 +- .../workflow_manager/endpoint_v2/source.py | 10 ++- .../file_execution/file_execution_helper.py | 89 ------------------- .../workflow_manager/file_execution/models.py | 66 ++++++++++++++ .../workflow_manager/workflow_v2/execution.py | 6 +- .../workflow_v2/workflow_helper.py | 30 +++---- 6 files changed, 91 insertions(+), 117 deletions(-) delete mode 100644 backend/workflow_manager/file_execution/file_execution_helper.py diff --git a/backend/scheduler/tasks.py b/backend/scheduler/tasks.py index e0c89e04c..4a4f17849 100644 --- a/backend/scheduler/tasks.py +++ b/backend/scheduler/tasks.py @@ -112,8 +112,13 @@ def execute_pipeline_task_v2( PipelineProcessor.update_pipeline( pipeline_id, Pipeline.PipelineStatus.INPROGRESS ) + # Mark the File in file history to avoid duplicate execution + # only for ETL and TASK execution + use_file_history: bool = True execution_response = WorkflowHelper.complete_execution( - workflow=workflow, pipeline_id=pipeline_id + workflow=workflow, + pipeline_id=pipeline_id, + use_file_history=use_file_history, ) execution_response.remove_result_metadata_keys() logger.info( diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index e84e2d0fa..273962787 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -362,7 +362,7 @@ def _is_new_file( return True def _create_file_hash( - self, file_path: str, file_content: bytes, file_size: int + self, file_path: str, file_content: bytes, file_size: Optional[int] ) -> FileHash: """Create a FileHash object for the matched file.""" file_name = os.path.basename(file_path) @@ -424,7 +424,7 @@ def hash_str(cls, string_to_hash: Any, hash_method: str = "sha256") -> str: def get_file_content( self, input_file_path: str, chunk_size: int = 8192 - ) -> tuple[bytes, int]: + ) -> tuple[bytes, Optional[int]]: """Read the content of a file from a remote filesystem in chunks. Args: @@ -445,7 +445,9 @@ def get_file_content( # Get file size file_metadata = source_fs.stat(input_file_path) - file_size = file_metadata.get("size", 0) + file_size: Optional[int] = file_metadata.get("size") + if file_size is None: + logger.warning(f"File size for {input_file_path} could not be determined.") file_content = bytearray() # Use bytearray for efficient byte concatenation with source_fs.open(input_file_path, "rb") as remote_file: @@ -492,7 +494,7 @@ def add_input_from_connector_to_volume(self, input_file_path: str) -> str: source_file = f"file://{source_file_path}" # Get file content and hash value - file_content = self.get_file_content(input_file_path) + file_content, _ = self.get_file_content(input_file_path) hash_value_of_file_content = self.get_file_content_hash(file_content) logger.info( diff --git a/backend/workflow_manager/file_execution/file_execution_helper.py b/backend/workflow_manager/file_execution/file_execution_helper.py deleted file mode 100644 index 405197cef..000000000 --- a/backend/workflow_manager/file_execution/file_execution_helper.py +++ /dev/null @@ -1,89 +0,0 @@ -from typing import Optional - -from workflow_manager.file_execution.models import WorkflowFileExecution -from workflow_manager.workflow_v2.enums import ExecutionStatus -from workflow_manager.workflow_v2.models.execution import WorkflowExecution - - -class FileExecutionHelper: - """ - Helper class for handling operations related to `WorkflowExecutionFile` model. - """ - - @staticmethod - def get_or_create_file_execution( - workflow_execution: WorkflowExecution, - file_name: str, - file_size: int, - file_hash: str, - file_path: Optional[str] = None, - mime_type: Optional[str] = None, - ) -> WorkflowFileExecution: - """ - Retrieves or creates a new input file record for a workflow execution. - - Args: - workflow_execution: The `WorkflowExecution` object associated with this file - file_name: The name of the input file - file_size: The size of the file in bytes - file_hash: The hash of the file content - file_path: (Optional) The full path of the input file - mime_type: (Optional) MIME type of the file - - return: - The `WorkflowExecutionInputFile` object - """ - execution_file, is_created = WorkflowFileExecution.objects.get_or_create( - workflow_execution=workflow_execution, - file_hash=file_hash, - file_path=file_path, - ) - if is_created: - execution_file.file_name = file_name - execution_file.file_size = file_size - execution_file.mime_type = mime_type - execution_file.save() - return execution_file - - @staticmethod - def update_status( - execution_file: WorkflowFileExecution, - status: ExecutionStatus, - execution_time: int = 0, - execution_error: str = None, - ) -> WorkflowFileExecution: - """ - Updates the status and execution details of an input file. - - Args: - execution_file: The `WorkflowExecutionFile` object to update - status: The new status of the file - execution_time: The execution time for processing the file - execution_error: (Optional) Error message if processing failed - - return: - The updated `WorkflowExecutionInputFile` object - """ - execution_file.status = status - execution_file.execution_time = execution_time - execution_file.execution_error = execution_error - execution_file.save() - return execution_file - - @staticmethod - def update_execution_error( - execution_file: WorkflowFileExecution, error_message: str - ) -> None: - """ - Updates the execution error for a file in case of failure. - - Args: - execution_file: The `WorkflowExecutionFile` object to update - error_message: The error message to set - - return: - None - """ - execution_file.execution_error = error_message - execution_file.status = ExecutionStatus.ERROR - execution_file.save() diff --git a/backend/workflow_manager/file_execution/models.py b/backend/workflow_manager/file_execution/models.py index bbaf613a5..9f5edaaa1 100644 --- a/backend/workflow_manager/file_execution/models.py +++ b/backend/workflow_manager/file_execution/models.py @@ -1,4 +1,5 @@ import uuid +from typing import Optional from django.db import models from utils.models.base_model import BaseModel @@ -11,6 +12,45 @@ MIME_TYPE_LENGTH = 128 +class WorkflowFileExecutionManager(models.Manager): + def get_or_create_file_execution( + self, + workflow_execution: WorkflowExecution, + file_name: str, + file_size: int, + file_hash: str, + file_path: Optional[str] = None, + mime_type: Optional[str] = None, + ): + """ + Retrieves or creates a new input file record for a workflow execution. + + Args: + workflow_execution: The `WorkflowExecution` object associated with this file + file_name: The name of the input file + file_size: The size of the file in bytes + file_hash: The hash of the file content + file_path: (Optional) The full path of the input file + mime_type: (Optional) MIME type of the file + + return: + The `WorkflowFileExecution` object + """ + execution_file: WorkflowFileExecution + execution_file, is_created = self.get_or_create( + workflow_execution=workflow_execution, + file_hash=file_hash, + file_path=file_path, + ) + + if is_created: + execution_file.file_name = file_name + execution_file.file_size = file_size + execution_file.mime_type = mime_type + execution_file.save() + return execution_file + + class WorkflowFileExecution(BaseModel): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) workflow_execution = models.ForeignKey( @@ -49,12 +89,38 @@ class WorkflowFileExecution(BaseModel): blank=True, null=True, db_comment="Error message if execution failed" ) + # Custom manager + objects = WorkflowFileExecutionManager() + def __str__(self): return ( f"WorkflowFileExecution: {self.file_name} " f"(WorkflowExecution: {self.workflow_execution})" ) + def update_status( + self, + status: ExecutionStatus, + execution_time: int = 0, + execution_error: str = None, + ) -> None: + """ + Updates the status and execution details of an input file. + + Args: + execution_file: The `WorkflowExecutionFile` object to update + status: The new status of the file + execution_time: The execution time for processing the file + execution_error: (Optional) Error message if processing failed + + return: + The updated `WorkflowExecutionInputFile` object + """ + self.status = status + self.execution_time = execution_time + self.execution_error = execution_error + self.save() + class Meta: verbose_name = "Workflow File Execution" verbose_name_plural = "Workflow File Executions" diff --git a/backend/workflow_manager/workflow_v2/execution.py b/backend/workflow_manager/workflow_v2/execution.py index 9de25b754..994300d01 100644 --- a/backend/workflow_manager/workflow_v2/execution.py +++ b/backend/workflow_manager/workflow_v2/execution.py @@ -14,7 +14,6 @@ from unstract.workflow_execution.exceptions import StopExecution from utils.local_context import StateStore from utils.user_context import UserContext -from workflow_manager.file_execution.file_execution_helper import FileExecutionHelper from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.constants import WorkflowKey from workflow_manager.workflow_v2.enums import ExecutionStatus @@ -334,9 +333,8 @@ def execute_input_file( message=f"{file_name} Sent for execution", component=LogComponent.SOURCE, ) - FileExecutionHelper.update_status( - workflow_file_execution, ExecutionStatus.EXECUTING - ) + workflow_file_execution.update_status(ExecutionStatus.EXECUTING) + self.execute(run_id, file_name, single_step) self.publish_log(f"Tool executed successfully for '{file_name}'") self._handle_execution_type(execution_type) diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 7a0604f45..41076d827 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -3,7 +3,6 @@ import os import traceback from typing import Any, Optional -from uuid import uuid4 from account_v2.constants import Common from api_v2.models import APIDeployment @@ -29,7 +28,6 @@ from workflow_manager.endpoint_v2.dto import FileHash from workflow_manager.endpoint_v2.models import WorkflowEndpoint from workflow_manager.endpoint_v2.source import SourceConnector -from workflow_manager.file_execution.file_execution_helper import FileExecutionHelper from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.constants import ( CeleryConfigurations, @@ -120,9 +118,8 @@ def _get_or_create_workflow_execution_file( is_api = source.endpoint.connection_type == WorkflowEndpoint.ConnectionType.API # Determine file path based on connection type execution_file_path = file_hash.file_path if not is_api else None - # Create or retrieve the workflow execution file - return FileExecutionHelper.get_or_create_file_execution( + return WorkflowFileExecution.objects.get_or_create_file_execution( workflow_execution=execution_service.workflow_execution, file_name=file_hash.file_name, file_hash=file_hash.file_hash, @@ -180,30 +177,26 @@ def process_input_files( ) if error: failed_files += 1 - FileExecutionHelper.update_execution_error( - workflow_execution_file, error + workflow_execution_file.update_status( + status=ExecutionStatus.ERROR, + execution_error=error, ) else: successful_files += 1 - FileExecutionHelper.update_status( - workflow_execution_file, ExecutionStatus.COMPLETED - ) + workflow_execution_file.update_status(ExecutionStatus.COMPLETED) except StopExecution as e: execution_service.update_execution( ExecutionStatus.STOPPED, error=str(e) ) - FileExecutionHelper.update_status( - execution_file=workflow_execution_file, - status=ExecutionStatus.STOPPED, - execution_error=str(e), + workflow_execution_file.update_status( + status=ExecutionStatus.STOPPED, execution_error=str(e) ) break except Exception as e: failed_files += 1 error_message = f"Error processing file '{file_path}'. {e}" logger.error(error_message, stack_info=True, exc_info=True) - FileExecutionHelper.update_status( - execution_file=workflow_execution_file, + workflow_execution_file.update_status( status=ExecutionStatus.ERROR, execution_error=error_message, ) @@ -245,13 +238,12 @@ def _process_file( execution_service.initiate_tool_execution( current_file_idx, total_files, file_name, single_step ) - FileExecutionHelper.update_status( - workflow_file_execution, ExecutionStatus.INITIATED - ) + workflow_file_execution.update_status(status=ExecutionStatus.INITIATED) if not file_hash.is_executed: # Multiple run_ids are linked to an execution_id # Each run_id corresponds to workflow runs for a single file - run_id = str(uuid4()) + # It should e uuid of workflow_file_execution + run_id = str(workflow_file_execution.id) execution_service.execute_input_file( run_id=run_id, file_name=file_name,