diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index f8341cdaf..025da5eb5 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -232,6 +232,7 @@ def get_required_setting( "connector_v2", "adapter_processor_v2", "file_management", + "workflow_manager.file_execution", "workflow_manager.endpoint_v2", "workflow_manager.workflow_v2", "tool_instance_v2", diff --git a/backend/scheduler/tasks.py b/backend/scheduler/tasks.py index e0c89e04c..4a4f17849 100644 --- a/backend/scheduler/tasks.py +++ b/backend/scheduler/tasks.py @@ -112,8 +112,13 @@ def execute_pipeline_task_v2( PipelineProcessor.update_pipeline( pipeline_id, Pipeline.PipelineStatus.INPROGRESS ) + # Mark the File in file history to avoid duplicate execution + # only for ETL and TASK execution + use_file_history: bool = True execution_response = WorkflowHelper.complete_execution( - workflow=workflow, pipeline_id=pipeline_id + workflow=workflow, + pipeline_id=pipeline_id, + use_file_history=use_file_history, ) execution_response.remove_result_metadata_keys() logger.info( diff --git a/backend/workflow_manager/endpoint_v2/dto.py b/backend/workflow_manager/endpoint_v2/dto.py index 0e7aa6fd9..6c701f2e9 100644 --- a/backend/workflow_manager/endpoint_v2/dto.py +++ b/backend/workflow_manager/endpoint_v2/dto.py @@ -9,6 +9,8 @@ class FileHash: file_hash: str file_name: str source_connection_type: str + file_size: Optional[int] = None + mime_type: Optional[str] = None file_destination: Optional[tuple[str, str]] = ( None # To which destination this file wants to go for MRQ percentage ) @@ -22,6 +24,8 @@ def to_json(self) -> dict[str, Any]: "source_connection_type": self.source_connection_type, "file_destination": self.file_destination, "is_executed": self.is_executed, + "file_size": self.file_size, + "mime_type": self.mime_type, } @staticmethod diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index c004f3397..273962787 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -8,6 +8,7 @@ from typing import Any, Optional import fsspec +import magic from connector_processor.constants import ConnectorKeys from connector_v2.models import ConnectorInstance from django.core.files.uploadedfile import UploadedFile @@ -313,10 +314,19 @@ def _get_matched_files( break if self._should_process_file(file, patterns): file_path = str(os.path.join(root, file)) + file_content, file_size = self.get_file_content( + input_file_path=file_path + ) if self._is_new_file( - file_path=file_path, workflow=self.endpoint.workflow + file_path=file_path, + file_content=file_content, + workflow=self.endpoint.workflow, ): - matched_files[file_path] = self._create_file_hash(file_path) + matched_files[file_path] = self._create_file_hash( + file_path=file_path, + file_content=file_content, + file_size=file_size, + ) count += 1 return matched_files, count @@ -327,10 +337,11 @@ def _should_process_file(self, file: str, patterns: list[str]) -> bool: fnmatch.fnmatchcase(file.lower(), pattern.lower()) for pattern in patterns ) - def _is_new_file(self, file_path: str, workflow: Workflow) -> bool: + def _is_new_file( + self, file_path: str, file_content: bytes, workflow: Workflow + ) -> bool: """Check if the file is new or already processed.""" - file_content = self.get_file_content(input_file_path=file_path) - file_hash = self.get_hash_value(file_content) + file_hash = self.get_file_content_hash(file_content) file_history = FileHistoryHelper.get_file_history( workflow=workflow, cache_key=file_hash ) @@ -350,18 +361,21 @@ def _is_new_file(self, file_path: str, workflow: Workflow) -> bool: return True - def _create_file_hash(self, file_path: str) -> FileHash: + def _create_file_hash( + self, file_path: str, file_content: bytes, file_size: Optional[int] + ) -> FileHash: """Create a FileHash object for the matched file.""" file_name = os.path.basename(file_path) - file_content = self.get_file_content(input_file_path=file_path) - file_hash = self.get_hash_value(file_content) + file_hash = self.get_file_content_hash(file_content) connection_type = self.endpoint.connection_type - + file_type = magic.from_buffer(file_content, mime=True) return FileHash( file_path=file_path, source_connection_type=connection_type, file_name=file_name, file_hash=file_hash, + file_size=file_size, + mime_type=file_type, ) def list_files_from_source( @@ -408,7 +422,9 @@ def hash_str(cls, string_to_hash: Any, hash_method: str = "sha256") -> str: else: raise ValueError(f"Unsupported hash_method: {hash_method}") - def get_file_content(self, input_file_path: str, chunk_size: int = 8192) -> bytes: + def get_file_content( + self, input_file_path: str, chunk_size: int = 8192 + ) -> tuple[bytes, Optional[int]]: """Read the content of a file from a remote filesystem in chunks. Args: @@ -417,21 +433,30 @@ def get_file_content(self, input_file_path: str, chunk_size: int = 8192) -> byte (default is 8192 bytes). Returns: - bytes: The content of the file. + tuple[bytes, int]: + A tuple containing the content of the file as bytes and + its size in bytes. """ connector: ConnectorInstance = self.endpoint.connector_instance connector_settings: dict[str, Any] = connector.connector_metadata source_fs = self.get_fsspec( settings=connector_settings, connector_id=connector.connector_id ) + + # Get file size + file_metadata = source_fs.stat(input_file_path) + file_size: Optional[int] = file_metadata.get("size") + if file_size is None: + logger.warning(f"File size for {input_file_path} could not be determined.") + file_content = bytearray() # Use bytearray for efficient byte concatenation with source_fs.open(input_file_path, "rb") as remote_file: while chunk := remote_file.read(chunk_size): file_content.extend(chunk) - return bytes(file_content) + return bytes(file_content), file_size - def get_hash_value(self, file_content: bytes) -> str: + def get_file_content_hash(self, file_content: bytes) -> str: """Generate a hash value from the file content. Args: @@ -469,8 +494,8 @@ def add_input_from_connector_to_volume(self, input_file_path: str) -> str: source_file = f"file://{source_file_path}" # Get file content and hash value - file_content = self.get_file_content(input_file_path) - hash_value_of_file_content = self.get_hash_value(file_content) + file_content, _ = self.get_file_content(input_file_path) + hash_value_of_file_content = self.get_file_content_hash(file_content) logger.info( f"hash_value_of_file {source_file} is : {hash_value_of_file_content}" @@ -695,6 +720,8 @@ def add_input_file_to_api_storage( file_name=file_name, file_hash=file_hash, is_executed=is_executed, + file_size=file.size, + mime_type=file.content_type, ) file_hashes.update({file_name: file_hash}) return file_hashes diff --git a/backend/workflow_manager/file_execution/__init__.py b/backend/workflow_manager/file_execution/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/workflow_manager/file_execution/apps.py b/backend/workflow_manager/file_execution/apps.py new file mode 100644 index 000000000..1beb32531 --- /dev/null +++ b/backend/workflow_manager/file_execution/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class FileExecutionConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "workflow_manager.file_execution" diff --git a/backend/workflow_manager/file_execution/migrations/0001_initial.py b/backend/workflow_manager/file_execution/migrations/0001_initial.py new file mode 100644 index 000000000..1b8a06d68 --- /dev/null +++ b/backend/workflow_manager/file_execution/migrations/0001_initial.py @@ -0,0 +1,122 @@ +# Generated by Django 4.2.1 on 2024-12-12 05:41 + +import uuid + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("workflow_v2", "0002_remove_workflow_llm_response_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="WorkflowFileExecution", + fields=[ + ("created_at", models.DateTimeField(auto_now_add=True)), + ("modified_at", models.DateTimeField(auto_now=True)), + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ( + "file_name", + models.CharField(db_comment="Name of the file", max_length=255), + ), + ( + "file_path", + models.CharField( + db_comment="Full Path of the file", max_length=255, null=True + ), + ), + ( + "file_size", + models.BigIntegerField( + db_comment="Size of the file in bytes", null=True + ), + ), + ( + "file_hash", + models.CharField( + db_comment="Hash of the file content", max_length=64 + ), + ), + ( + "mime_type", + models.CharField( + blank=True, + db_comment="MIME type of the file", + max_length=128, + null=True, + ), + ), + ( + "status", + models.TextField( + choices=[ + ("PENDING", "PENDING"), + ("INITIATED", "INITIATED"), + ("QUEUED", "QUEUED"), + ("READY", "READY"), + ("EXECUTING", "EXECUTING"), + ("COMPLETED", "COMPLETED"), + ("STOPPED", "STOPPED"), + ("ERROR", "ERROR"), + ], + db_comment="Current status of the execution", + ), + ), + ( + "execution_time", + models.FloatField( + db_comment="Execution time in seconds", null=True + ), + ), + ( + "execution_error", + models.TextField( + blank=True, + db_comment="Error message if execution failed", + null=True, + ), + ), + ( + "workflow_execution", + models.ForeignKey( + db_comment="Foreign key from WorkflowExecution model", + editable=False, + on_delete=django.db.models.deletion.CASCADE, + to="workflow_v2.workflowexecution", + ), + ), + ], + options={ + "verbose_name": "Workflow File Execution", + "verbose_name_plural": "Workflow File Executions", + "db_table": "workflow_file_execution", + "indexes": [ + models.Index( + fields=["workflow_execution", "file_hash"], + name="workflow_file_hash_idx", + ) + ], + }, + ), + migrations.AddConstraint( + model_name="workflowfileexecution", + constraint=models.UniqueConstraint( + fields=("workflow_execution", "file_hash", "file_path"), + name="unique_workflow_file_hash_path", + ), + ), + ] diff --git a/backend/workflow_manager/file_execution/migrations/__init__.py b/backend/workflow_manager/file_execution/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/workflow_manager/file_execution/models.py b/backend/workflow_manager/file_execution/models.py new file mode 100644 index 000000000..9f5edaaa1 --- /dev/null +++ b/backend/workflow_manager/file_execution/models.py @@ -0,0 +1,139 @@ +import uuid +from typing import Optional + +from django.db import models +from utils.models.base_model import BaseModel +from workflow_manager.workflow_v2.enums import ExecutionStatus +from workflow_manager.workflow_v2.models.execution import WorkflowExecution + +FILE_NAME_LENGTH = 255 +FILE_PATH_LENGTH = 255 +HASH_LENGTH = 64 +MIME_TYPE_LENGTH = 128 + + +class WorkflowFileExecutionManager(models.Manager): + def get_or_create_file_execution( + self, + workflow_execution: WorkflowExecution, + file_name: str, + file_size: int, + file_hash: str, + file_path: Optional[str] = None, + mime_type: Optional[str] = None, + ): + """ + Retrieves or creates a new input file record for a workflow execution. + + Args: + workflow_execution: The `WorkflowExecution` object associated with this file + file_name: The name of the input file + file_size: The size of the file in bytes + file_hash: The hash of the file content + file_path: (Optional) The full path of the input file + mime_type: (Optional) MIME type of the file + + return: + The `WorkflowFileExecution` object + """ + execution_file: WorkflowFileExecution + execution_file, is_created = self.get_or_create( + workflow_execution=workflow_execution, + file_hash=file_hash, + file_path=file_path, + ) + + if is_created: + execution_file.file_name = file_name + execution_file.file_size = file_size + execution_file.mime_type = mime_type + execution_file.save() + return execution_file + + +class WorkflowFileExecution(BaseModel): + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + workflow_execution = models.ForeignKey( + WorkflowExecution, + on_delete=models.CASCADE, + db_index=True, + editable=False, + db_comment="Foreign key from WorkflowExecution model", + ) + file_name = models.CharField( + max_length=FILE_NAME_LENGTH, db_comment="Name of the file" + ) + file_path = models.CharField( + max_length=FILE_PATH_LENGTH, null=True, db_comment="Full Path of the file" + ) + file_size = models.BigIntegerField( + null=True, db_comment="Size of the file in bytes" + ) + file_hash = models.CharField( + max_length=HASH_LENGTH, db_comment="Hash of the file content" + ) + mime_type = models.CharField( + max_length=MIME_TYPE_LENGTH, + blank=True, + null=True, + db_comment="MIME type of the file", + ) + status = models.TextField( + choices=ExecutionStatus.choices(), + db_comment="Current status of the execution", + ) + execution_time = models.FloatField( + null=True, db_comment="Execution time in seconds" + ) + execution_error = models.TextField( + blank=True, null=True, db_comment="Error message if execution failed" + ) + + # Custom manager + objects = WorkflowFileExecutionManager() + + def __str__(self): + return ( + f"WorkflowFileExecution: {self.file_name} " + f"(WorkflowExecution: {self.workflow_execution})" + ) + + def update_status( + self, + status: ExecutionStatus, + execution_time: int = 0, + execution_error: str = None, + ) -> None: + """ + Updates the status and execution details of an input file. + + Args: + execution_file: The `WorkflowExecutionFile` object to update + status: The new status of the file + execution_time: The execution time for processing the file + execution_error: (Optional) Error message if processing failed + + return: + The updated `WorkflowExecutionInputFile` object + """ + self.status = status + self.execution_time = execution_time + self.execution_error = execution_error + self.save() + + class Meta: + verbose_name = "Workflow File Execution" + verbose_name_plural = "Workflow File Executions" + db_table = "workflow_file_execution" + indexes = [ + models.Index( + fields=["workflow_execution", "file_hash"], + name="workflow_file_hash_idx", + ), + ] + constraints = [ + models.UniqueConstraint( + fields=["workflow_execution", "file_hash", "file_path"], + name="unique_workflow_file_hash_path", + ), + ] diff --git a/backend/workflow_manager/file_execution/views.py b/backend/workflow_manager/file_execution/views.py new file mode 100644 index 000000000..60f00ef0e --- /dev/null +++ b/backend/workflow_manager/file_execution/views.py @@ -0,0 +1 @@ +# Create your views here. diff --git a/backend/workflow_manager/workflow_v2/execution.py b/backend/workflow_manager/workflow_v2/execution.py index 7d302f03a..994300d01 100644 --- a/backend/workflow_manager/workflow_v2/execution.py +++ b/backend/workflow_manager/workflow_v2/execution.py @@ -14,6 +14,7 @@ from unstract.workflow_execution.exceptions import StopExecution from utils.local_context import StateStore from utils.user_context import UserContext +from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.constants import WorkflowKey from workflow_manager.workflow_v2.enums import ExecutionStatus from workflow_manager.workflow_v2.exceptions import WorkflowExecutionError @@ -84,6 +85,7 @@ def __init__( ) workflow_execution.save() else: + self.workflow_execution = workflow_execution self.execution_mode = workflow_execution.execution_mode self.execution_method = workflow_execution.execution_method self.execution_type = workflow_execution.execution_type @@ -309,6 +311,7 @@ def execute_input_file( run_id: str, file_name: str, single_step: bool, + workflow_file_execution: WorkflowFileExecution, ) -> None: """Executes the input file. @@ -330,6 +333,8 @@ def execute_input_file( message=f"{file_name} Sent for execution", component=LogComponent.SOURCE, ) + workflow_file_execution.update_status(ExecutionStatus.EXECUTING) + self.execute(run_id, file_name, single_step) self.publish_log(f"Tool executed successfully for '{file_name}'") self._handle_execution_type(execution_type) diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 3e1e4c80b..afb7962db 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -3,7 +3,6 @@ import os import traceback from typing import Any, Optional -from uuid import uuid4 from account_v2.constants import Common from api_v2.models import APIDeployment @@ -27,7 +26,9 @@ from utils.user_context import UserContext from workflow_manager.endpoint_v2.destination import DestinationConnector from workflow_manager.endpoint_v2.dto import FileHash +from workflow_manager.endpoint_v2.models import WorkflowEndpoint from workflow_manager.endpoint_v2.source import SourceConnector +from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.constants import ( CeleryConfigurations, WorkflowErrors, @@ -109,7 +110,27 @@ def build_workflow_execution_service( return workflow_execution_service @staticmethod + def _get_or_create_workflow_execution_file( + execution_service: WorkflowExecutionServiceHelper, + file_hash: FileHash, + source: SourceConnector, + ) -> WorkflowFileExecution: + is_api = source.endpoint.connection_type == WorkflowEndpoint.ConnectionType.API + # Determine file path based on connection type + execution_file_path = file_hash.file_path if not is_api else None + # Create or retrieve the workflow execution file + return WorkflowFileExecution.objects.get_or_create_file_execution( + workflow_execution=execution_service.workflow_execution, + file_name=file_hash.file_name, + file_hash=file_hash.file_hash, + file_path=execution_file_path, + file_size=file_hash.file_size, + mime_type=file_hash.mime_type, + ) + + @classmethod def process_input_files( + cls, workflow: Workflow, source: SourceConnector, destination: DestinationConnector, @@ -129,6 +150,12 @@ def process_input_files( q_file_no_list = WorkflowUtil.get_q_no_list(workflow, total_files) for index, (file_path, file_hash) in enumerate(input_files.items()): + # Get workflow execution file + workflow_execution_file = cls._get_or_create_workflow_execution_file( + execution_service=execution_service, + file_hash=file_hash, + source=source, + ) file_number = index + 1 file_hash = WorkflowUtil.add_file_destination_filehash( file_number, @@ -136,7 +163,7 @@ def process_input_files( file_hash, ) try: - error = WorkflowHelper.process_file( + error = cls._process_file( current_file_idx=file_number, total_files=total_files, input_file=file_hash.file_path, @@ -146,20 +173,33 @@ def process_input_files( execution_service=execution_service, single_step=single_step, file_hash=file_hash, + workflow_file_execution=workflow_execution_file, ) if error: failed_files += 1 + workflow_execution_file.update_status( + status=ExecutionStatus.ERROR, + execution_error=error, + ) else: successful_files += 1 + workflow_execution_file.update_status(ExecutionStatus.COMPLETED) except StopExecution as e: execution_service.update_execution( ExecutionStatus.STOPPED, error=str(e) ) + workflow_execution_file.update_status( + status=ExecutionStatus.STOPPED, execution_error=str(e) + ) break except Exception as e: failed_files += 1 error_message = f"Error processing file '{file_path}'. {e}" logger.error(error_message, stack_info=True, exc_info=True) + workflow_execution_file.update_status( + status=ExecutionStatus.ERROR, + execution_error=error_message, + ) execution_service.publish_log( message=error_message, level=LogLevel.ERROR ) @@ -178,7 +218,7 @@ def process_input_files( return execution_service.get_execution_instance() @staticmethod - def process_file( + def _process_file( current_file_idx: int, total_files: int, input_file: str, @@ -188,6 +228,7 @@ def process_file( execution_service: WorkflowExecutionServiceHelper, single_step: bool, file_hash: FileHash, + workflow_file_execution: WorkflowFileExecution, ) -> Optional[str]: error: Optional[str] = None file_name = source.add_file_to_volume( @@ -197,14 +238,17 @@ def process_file( execution_service.initiate_tool_execution( current_file_idx, total_files, file_name, single_step ) + workflow_file_execution.update_status(status=ExecutionStatus.INITIATED) if not file_hash.is_executed: # Multiple run_ids are linked to an execution_id # Each run_id corresponds to workflow runs for a single file - run_id = str(uuid4()) + # It should e uuid of workflow_file_execution + run_id = str(workflow_file_execution.id) execution_service.execute_input_file( run_id=run_id, file_name=file_name, single_step=single_step, + workflow_file_execution=workflow_file_execution, ) except StopExecution: raise