From cefcc233013978d8651151da58b1ecd50ece09c4 Mon Sep 17 00:00:00 2001 From: 1yam Date: Thu, 25 Apr 2024 18:08:29 +0200 Subject: [PATCH 01/18] Refactor: Multiplart MultipartUploadedFile will read with chunck to avoid sending huge data --- src/aleph/web/controllers/storage.py | 51 ++++++++++++++++++---------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 341f4dc16..1b656d4de 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -116,29 +116,43 @@ class StorageMetadata(pydantic.BaseModel): class UploadedFile(Protocol): @property - def size(self) -> int: - ... + def size(self) -> int: ... @property - def content(self) -> Union[str, bytes]: - ... + def content(self) -> Union[str, bytes]: ... class MultipartUploadedFile: _content: Optional[bytes] size: int - def __init__(self, file_field: FileField, size: int): + def __init__(self, file_field: FileField, max_size: int): self.file_field = file_field - self.size = size + self.max_size = max_size + self.size = 0 self._content = None @property def content(self) -> bytes: - # Only read the stream once + """Lazily loads the content, ensuring it is only read once and within the max size limit.""" if self._content is None: - self.file_field.file.seek(0) - self._content = self.file_field.file.read(self.size) + self._content = b"" + total_read = 0 + chunk_size = 8192 + + while total_read < self.max_size: + chunk = self.file_field.file.read(chunk_size) + if not chunk: + break + self.size += len(chunk) + total_read += len(chunk) + if total_read > self.max_size: + raise web.HTTPRequestEntityTooLarge( + reason="File size exceeds the maximum limit.", + max_size=self.max_size, + actual_size=total_read, + ) + self._content += chunk return self._content @@ -237,13 +251,18 @@ async def storage_add_file(request: web.Request): except KeyError: raise web.HTTPUnprocessableEntity(reason="Missing 'file' in multipart form.") + metadata = post.get("metadata") + max_upload_size = ( + MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE + ) + if isinstance(file_field, FileField): - uploaded_file: UploadedFile = MultipartUploadedFile(file_field, len(file_field.file.read())) + uploaded_file: UploadedFile = MultipartUploadedFile( + file_field, + max_size=max_upload_size, + ) else: uploaded_file = RawUploadedFile(file_field) - - metadata = post.get("metadata") - status_code = 200 if metadata: @@ -259,17 +278,13 @@ async def storage_add_file(request: web.Request): message = storage_metadata.message sync = storage_metadata.sync - max_upload_size = MAX_UPLOAD_FILE_SIZE - else: - # User did not provide a message in the `metadata` field message = None sync = False - max_upload_size = MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if uploaded_file.size > max_upload_size: raise web.HTTPRequestEntityTooLarge( - actual_size=uploaded_file.size, max_size=MAX_UPLOAD_FILE_SIZE + actual_size=uploaded_file.size, max_size=max_upload_size ) with session_factory() as session: From 885c59cd709f33ec5c54dd2abcd807147a600dfb Mon Sep 17 00:00:00 2001 From: 1yam Date: Thu, 2 May 2024 12:11:35 +0200 Subject: [PATCH 02/18] Feature: Upload storage endpoint using tempfile to reduce memory usage --- src/aleph/web/controllers/storage.py | 139 ++++++++++++++++----------- 1 file changed, 82 insertions(+), 57 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 1b656d4de..fd23d06be 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -1,5 +1,7 @@ import base64 import logging +import os +import tempfile from decimal import Decimal from typing import Optional, Protocol, Union @@ -92,7 +94,7 @@ async def add_storage_json_controller(request: web.Request): async def _verify_message_signature( - pending_message: BasePendingMessage, signature_verifier: SignatureVerifier + pending_message: BasePendingMessage, signature_verifier: SignatureVerifier ) -> None: try: await signature_verifier.verify_signature(pending_message) @@ -114,56 +116,81 @@ class StorageMetadata(pydantic.BaseModel): sync: bool -class UploadedFile(Protocol): - @property - def size(self) -> int: ... +class UploadedFile: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.cleanup() + + def cleanup(self): + pass @property - def content(self) -> Union[str, bytes]: ... + def size(self) -> int: + raise NotImplementedError + @property + def content(self) -> bytes: + raise NotImplementedError -class MultipartUploadedFile: - _content: Optional[bytes] - size: int +class MultipartUploadedFile(UploadedFile): def __init__(self, file_field: FileField, max_size: int): self.file_field = file_field self.max_size = max_size - self.size = 0 - self._content = None + self._temp_file = tempfile.NamedTemporaryFile(delete=False) + + def __exit__(self, exc_type, exc_value, traceback): + super().__exit__(exc_type, exc_value, traceback) + os.unlink(self._temp_file.name) @property def content(self) -> bytes: - """Lazily loads the content, ensuring it is only read once and within the max size limit.""" - if self._content is None: - self._content = b"" - total_read = 0 - chunk_size = 8192 - - while total_read < self.max_size: - chunk = self.file_field.file.read(chunk_size) - if not chunk: - break - self.size += len(chunk) - total_read += len(chunk) - if total_read > self.max_size: - raise web.HTTPRequestEntityTooLarge( - reason="File size exceeds the maximum limit.", - max_size=self.max_size, - actual_size=total_read, - ) - self._content += chunk - - return self._content - - -class RawUploadedFile: + total_read = 0 + chunk_size = 8192 + while total_read < self.max_size: + chunk = self.file_field.file.read(chunk_size) + if not chunk: + break + total_read += len(chunk) + if total_read > self.max_size: + raise web.HTTPRequestEntityTooLarge( + reason="File size exceeds the maximum limit.", + max_size=self.max_size, + actual_size=total_read, + ) + self._temp_file.write(chunk) + self._temp_file.seek(0) + return self._temp_file.read() + + @property + def size(self) -> int: + return os.path.getsize(self._temp_file.name) + + +class RawUploadedFile(UploadedFile): def __init__(self, content: Union[bytes, str]): - self.content = content + self._temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w+b') + # Encode only if the content is a string + if isinstance(content, str): + content = content.encode('utf-8') # Explicitly encode str to bytes + self._temp_file.write(content) + self._temp_file.seek(0) + + def __exit__(self, exc_type, exc_value, traceback): + super().__exit__(exc_type, exc_value, traceback) + os.unlink(self._temp_file.name) + + @property + def content(self) -> bytes: + self._temp_file.seek(0) + return self._temp_file.read() @property def size(self) -> int: - return len(self.content) + self._temp_file.seek(0, os.SEEK_END) + return self._temp_file.tell() async def _check_and_add_file( @@ -197,9 +224,6 @@ async def _check_and_add_file( # TODO: this can still reach 1 GiB in memory. We should look into streaming. file_content = file.content - file_bytes = ( - file_content.encode("utf-8") if isinstance(file_content, str) else file_content - ) file_hash = get_sha256(file_content) if message_content: @@ -210,7 +234,7 @@ async def _check_and_add_file( await storage_service.add_file_content_to_local_storage( session=session, - file_content=file_bytes, + file_content=file_content, file_hash=file_hash, ) @@ -224,9 +248,9 @@ async def _check_and_add_file( async def _make_mq_queue( - request: web.Request, - sync: bool, - routing_key: Optional[str] = None, + request: web.Request, + sync: bool, + routing_key: Optional[str] = None, ) -> Optional[aio_pika.abc.AbstractQueue]: if not sync: return None @@ -282,21 +306,22 @@ async def storage_add_file(request: web.Request): message = None sync = False - if uploaded_file.size > max_upload_size: - raise web.HTTPRequestEntityTooLarge( - actual_size=uploaded_file.size, max_size=max_upload_size - ) + with uploaded_file: + if uploaded_file.size > max_upload_size: + raise web.HTTPRequestEntityTooLarge( + actual_size=uploaded_file.size, max_size=max_upload_size + ) - with session_factory() as session: - file_hash = await _check_and_add_file( - session=session, - signature_verifier=signature_verifier, - storage_service=storage_service, - message=message, - file=uploaded_file, - grace_period=grace_period, - ) - session.commit() + with session_factory() as session: + file_hash = await _check_and_add_file( + session=session, + signature_verifier=signature_verifier, + storage_service=storage_service, + message=message, + file=uploaded_file, + grace_period=grace_period, + ) + session.commit() if message: broadcast_status = await broadcast_and_process_message( From e62da4b4246ac8e6dc5256b7bb117e84810d76c7 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 15 May 2024 12:27:00 +0200 Subject: [PATCH 03/18] Refacto: using request.read() when content type is not multipart/form-data instead of request.post() --- src/aleph/web/controllers/storage.py | 37 ++++++++++++++++++---------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index fd23d06be..4cbd4625d 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -94,7 +94,7 @@ async def add_storage_json_controller(request: web.Request): async def _verify_message_signature( - pending_message: BasePendingMessage, signature_verifier: SignatureVerifier + pending_message: BasePendingMessage, signature_verifier: SignatureVerifier ) -> None: try: await signature_verifier.verify_signature(pending_message) @@ -171,10 +171,10 @@ def size(self) -> int: class RawUploadedFile(UploadedFile): def __init__(self, content: Union[bytes, str]): - self._temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w+b') + self._temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w+b") # Encode only if the content is a string if isinstance(content, str): - content = content.encode('utf-8') # Explicitly encode str to bytes + content = content.encode("utf-8") # Explicitly encode str to bytes self._temp_file.write(content) self._temp_file.seek(0) @@ -248,9 +248,9 @@ async def _check_and_add_file( async def _make_mq_queue( - request: web.Request, - sync: bool, - routing_key: Optional[str] = None, + request: web.Request, + sync: bool, + routing_key: Optional[str] = None, ) -> Optional[aio_pika.abc.AbstractQueue]: if not sync: return None @@ -269,24 +269,35 @@ async def storage_add_file(request: web.Request): config = get_config_from_request(request) grace_period = config.storage.grace_period.value - post = await request.post() - try: - file_field = post["file"] - except KeyError: - raise web.HTTPUnprocessableEntity(reason="Missing 'file' in multipart form.") + if request.content_type == "multipart/form-data": + post = await request.post() + try: + file_field = post["file"] + except KeyError: + raise web.HTTPUnprocessableEntity( + reason="Missing 'file' in multipart form." + ) + + metadata = post.get("metadata") + else: + file_content = await request.read() + file_field = file_content + metadata = None - metadata = post.get("metadata") max_upload_size = ( MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE ) - if isinstance(file_field, FileField): + if request.content_type == "multipart/form-data" and isinstance( + file_field, FileField + ): uploaded_file: UploadedFile = MultipartUploadedFile( file_field, max_size=max_upload_size, ) else: uploaded_file = RawUploadedFile(file_field) + status_code = 200 if metadata: From ff83b6d66a8feced1f395d3abd655d9fae1bb064 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 15 May 2024 12:33:27 +0200 Subject: [PATCH 04/18] Fix: mypy issue --- src/aleph/web/controllers/storage.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 4cbd4625d..783f8989a 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -296,6 +296,8 @@ async def storage_add_file(request: web.Request): max_size=max_upload_size, ) else: + if not isinstance(file_field, (bytes, str)): + raise web.HTTPUnprocessableEntity(reason="Invalid file content type.") uploaded_file = RawUploadedFile(file_field) status_code = 200 From 68601a05105083e3e8d8786f486ac43f2bea543a Mon Sep 17 00:00:00 2001 From: lyam Date: Tue, 28 May 2024 19:31:07 +0200 Subject: [PATCH 05/18] Refactor: Upload endpoints using multipart instead of post and raw upload --- src/aleph/web/controllers/storage.py | 205 +++++++++++++++++---------- 1 file changed, 128 insertions(+), 77 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 783f8989a..f43b27d8b 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -1,13 +1,14 @@ import base64 +import hashlib import logging import os import tempfile from decimal import Decimal -from typing import Optional, Protocol, Union +from typing import Optional import aio_pika import pydantic -from aiohttp import web +from aiohttp import BodyPartReader, web from aiohttp.web_request import FileField from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.balances import get_total_balance @@ -118,7 +119,7 @@ class StorageMetadata(pydantic.BaseModel): class UploadedFile: def __enter__(self): - return self + raise NotImplementedError def __exit__(self, exc_type, exc_value, traceback): self.cleanup() @@ -126,6 +127,9 @@ def __exit__(self, exc_type, exc_value, traceback): def cleanup(self): pass + async def read_and_validate(self): + pass + @property def size(self) -> int: raise NotImplementedError @@ -134,23 +138,45 @@ def size(self) -> int: def content(self) -> bytes: raise NotImplementedError + @property + def file(self): + raise NotImplementedError + + @property + def get_hash(self) -> str: + raise NotImplementedError + class MultipartUploadedFile(UploadedFile): - def __init__(self, file_field: FileField, max_size: int): + def __init__(self, file_field: BodyPartReader, max_size: int, file_hash: str = None): self.file_field = file_field self.max_size = max_size - self._temp_file = tempfile.NamedTemporaryFile(delete=False) + self.file_hash = file_hash + try: + self._temp_file = tempfile.NamedTemporaryFile(delete=False) + self._file_content = bytearray() + except Exception as e: + web.HTTPInternalServerError(reason="Cannot create tempfile") + + def __enter__(self): + self._temp_file.seek(0) + return self def __exit__(self, exc_type, exc_value, traceback): - super().__exit__(exc_type, exc_value, traceback) - os.unlink(self._temp_file.name) + try: + self._temp_file.close() + if os.path.exists(self._temp_file.name): + os.unlink(self._temp_file.name) + except Exception as e: + web.HTTPInternalServerError(reason="Cannot create tempfile") - @property - def content(self) -> bytes: + async def read_and_validate(self): total_read = 0 chunk_size = 8192 + hash_sha256 = hashlib.sha256() + while total_read < self.max_size: - chunk = self.file_field.file.read(chunk_size) + chunk = await self.file_field.read_chunk(chunk_size) if not chunk: break total_read += len(chunk) @@ -161,46 +187,88 @@ def content(self) -> bytes: actual_size=total_read, ) self._temp_file.write(chunk) + self._file_content.extend(chunk) + hash_sha256.update(chunk) + self.file_hash = hash_sha256.hexdigest() self._temp_file.seek(0) - return self._temp_file.read() @property def size(self) -> int: return os.path.getsize(self._temp_file.name) + @property + def content(self) -> bytes: + return bytes(self._file_content) + + @property + def file(self) -> str: + return self._temp_file.name + + @property + def get_hash(self) -> str: + return self.file_hash + class RawUploadedFile(UploadedFile): - def __init__(self, content: Union[bytes, str]): - self._temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w+b") - # Encode only if the content is a string - if isinstance(content, str): - content = content.encode("utf-8") # Explicitly encode str to bytes - self._temp_file.write(content) + def __init__(self, request: web.Request, max_size: int): + self.request = request + self.max_size = max_size + self._temp_file = tempfile.NamedTemporaryFile(delete=False) + self._hasher = hashlib.sha256() + self._size = 0 + self._hash = None + + async def read_and_validate(self): + async for chunk in self.request.content.iter_chunked(8192): + self._temp_file.write(chunk) + self._hasher.update(chunk) + self._size += len(chunk) + if self._size > self.max_size: + raise web.HTTPRequestEntityTooLarge( + reason="File size exceeds the maximum limit.", + max_size=self.max_size, + actual_size=self._size, + ) self._temp_file.seek(0) + self._hash = self._hasher.hexdigest() + + def __enter__(self): + self._temp_file.seek(0) + return self._temp_file def __exit__(self, exc_type, exc_value, traceback): - super().__exit__(exc_type, exc_value, traceback) + self._temp_file.close() os.unlink(self._temp_file.name) + @property + def size(self) -> int: + return self._size + @property def content(self) -> bytes: self._temp_file.seek(0) return self._temp_file.read() @property - def size(self) -> int: - self._temp_file.seek(0, os.SEEK_END) - return self._temp_file.tell() + def file(self): + return self._temp_file.name + + @property + def get_hash(self) -> str: + if self._hash is None: + raise ValueError("Hash has not been computed yet") + return self._hash async def _check_and_add_file( - session: DbSession, - signature_verifier: SignatureVerifier, - storage_service: StorageService, - message: Optional[PendingStoreMessage], - file: UploadedFile, - grace_period: int, + session: DbSession, + signature_verifier: SignatureVerifier, + storage_service: StorageService, + message: Optional[PendingStoreMessage], + file: UploadedFile, + grace_period: int, ) -> str: + file_hash = file.get_hash # Perform authentication and balance checks if message: await _verify_message_signature( @@ -208,6 +276,10 @@ async def _check_and_add_file( ) try: message_content = StoreContent.parse_raw(message.item_content) + if message_content.item_hash != file_hash: + raise web.HTTPUnprocessableEntity( + reason=f"File hash does not match ({file_hash} != {message_content.item_hash})" + ) except ValidationError as e: raise web.HTTPUnprocessableEntity( reason=f"Invalid store message content: {e.json()}" @@ -218,23 +290,17 @@ async def _check_and_add_file( address=message_content.address, size=file.size, ) - else: message_content = None - # TODO: this can still reach 1 GiB in memory. We should look into streaming. file_content = file.content - file_hash = get_sha256(file_content) - - if message_content: - if message_content.item_hash != file_hash: - raise web.HTTPUnprocessableEntity( - reason=f"File hash does not match ({file_hash} != {message_content.item_hash})" - ) + file_bytes = ( + file_content.encode("utf-8") if isinstance(file_content, str) else file_content + ) await storage_service.add_file_content_to_local_storage( session=session, - file_content=file_content, + file_content=file_bytes, file_hash=file_hash, ) @@ -268,38 +334,25 @@ async def storage_add_file(request: web.Request): signature_verifier = get_signature_verifier_from_request(request) config = get_config_from_request(request) grace_period = config.storage.grace_period.value + metadata = None + uploaded_file: Optional[UploadedFile] = None if request.content_type == "multipart/form-data": - post = await request.post() - try: - file_field = post["file"] - except KeyError: - raise web.HTTPUnprocessableEntity( - reason="Missing 'file' in multipart form." - ) - - metadata = post.get("metadata") + reader = await request.multipart() + async for part in reader: + if part.name == 'file': + uploaded_file = MultipartUploadedFile(part, MAX_FILE_SIZE) + await uploaded_file.read_and_validate() + elif part.name == 'metadata': + metadata = await part.read(decode=True) else: - file_content = await request.read() - file_field = file_content - metadata = None + uploaded_file = RawUploadedFile(request=request, max_size=MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE) + await uploaded_file.read_and_validate() max_upload_size = ( MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE ) - if request.content_type == "multipart/form-data" and isinstance( - file_field, FileField - ): - uploaded_file: UploadedFile = MultipartUploadedFile( - file_field, - max_size=max_upload_size, - ) - else: - if not isinstance(file_field, (bytes, str)): - raise web.HTTPUnprocessableEntity(reason="Invalid file content type.") - uploaded_file = RawUploadedFile(file_field) - status_code = 200 if metadata: @@ -319,23 +372,21 @@ async def storage_add_file(request: web.Request): message = None sync = False - with uploaded_file: - if uploaded_file.size > max_upload_size: - raise web.HTTPRequestEntityTooLarge( - actual_size=uploaded_file.size, max_size=max_upload_size - ) - - with session_factory() as session: - file_hash = await _check_and_add_file( - session=session, - signature_verifier=signature_verifier, - storage_service=storage_service, - message=message, - file=uploaded_file, - grace_period=grace_period, - ) - session.commit() + if uploaded_file.size > max_upload_size: + raise web.HTTPRequestEntityTooLarge( + actual_size=uploaded_file.size, max_size=max_upload_size + ) + with session_factory() as session: + file_hash = await _check_and_add_file( + session=session, + signature_verifier=signature_verifier, + storage_service=storage_service, + message=message, + file=uploaded_file, + grace_period=grace_period, + ) + session.commit() if message: broadcast_status = await broadcast_and_process_message( pending_message=message, sync=sync, request=request, logger=logger From 372c839acb551ae1b725253121b52e5b25350599 Mon Sep 17 00:00:00 2001 From: lyam Date: Tue, 28 May 2024 19:32:40 +0200 Subject: [PATCH 06/18] Feature: Unit test for raw upload --- tests/api/test_storage.py | 47 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/api/test_storage.py b/tests/api/test_storage.py index 382eb2ca2..8f3486862 100644 --- a/tests/api/test_storage.py +++ b/tests/api/test_storage.py @@ -92,6 +92,41 @@ async def api_client(ccn_test_aiohttp_app, mocker, aiohttp_client): return client +async def add_file_raw_upload( + api_client, + session_factory: DbSessionFactory, + uri: str, + file_content: bytes, + expected_file_hash: str, +): + # Send the file content as raw bytes in the request body + headers = { + 'Content-Type': 'application/octet-stream' + } + post_response = await api_client.post(uri, data=file_content, headers=headers) + response_text = await post_response.text() + assert post_response.status == 200, response_text + post_response_json = await post_response.json() + assert post_response_json["status"] == "success" + file_hash = post_response_json["hash"] + assert file_hash == expected_file_hash + + # Assert that the file is downloadable + get_file_response = await api_client.get(f"{GET_STORAGE_RAW_URI}/{file_hash}") + assert get_file_response.status == 200, await get_file_response.text() + response_data = await get_file_response.read() + + # Check that the file appears in the DB + with session_factory() as session: + file = get_file(session=session, file_hash=file_hash) + assert file is not None + assert file.hash == file_hash + assert file.type == FileType.FILE + assert file.size == len(file_content) + + assert response_data == file_content + + async def add_file( api_client, session_factory: DbSessionFactory, @@ -101,6 +136,7 @@ async def add_file( ): form_data = aiohttp.FormData() form_data.add_field("file", file_content) + print(file_content) post_response = await api_client.post(uri, data=form_data) response_text = await post_response.text() @@ -220,6 +256,16 @@ async def test_storage_add_file(api_client, session_factory: DbSessionFactory): expected_file_hash=EXPECTED_FILE_SHA256, ) +@pytest.mark.asyncio +async def test_storage_add_file_raw_upload(api_client, session_factory: DbSessionFactory): + await add_file_raw_upload( + api_client, + session_factory, + uri=STORAGE_ADD_FILE_URI, + file_content=FILE_CONTENT, + expected_file_hash=EXPECTED_FILE_SHA256, + ) + @pytest.mark.parametrize( "file_content, expected_hash, size, error_code, balance", @@ -371,3 +417,4 @@ async def test_ipfs_add_json(api_client, session_factory: DbSessionFactory): # creating a second fixture. expected_file_hash=ItemHash(EXPECTED_FILE_CID), ) + From b9e39b68787dc3c583a41971a498f15828bf51c7 Mon Sep 17 00:00:00 2001 From: lyam Date: Tue, 28 May 2024 19:39:35 +0200 Subject: [PATCH 07/18] Fix: mypy error --- src/aleph/web/controllers/storage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index f43b27d8b..d4b65012e 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -148,10 +148,10 @@ def get_hash(self) -> str: class MultipartUploadedFile(UploadedFile): - def __init__(self, file_field: BodyPartReader, max_size: int, file_hash: str = None): + def __init__(self, file_field: BodyPartReader, max_size: int): self.file_field = file_field self.max_size = max_size - self.file_hash = file_hash + self._hash = None try: self._temp_file = tempfile.NamedTemporaryFile(delete=False) self._file_content = bytearray() @@ -189,7 +189,7 @@ async def read_and_validate(self): self._temp_file.write(chunk) self._file_content.extend(chunk) hash_sha256.update(chunk) - self.file_hash = hash_sha256.hexdigest() + self._hash = hash_sha256.hexdigest() self._temp_file.seek(0) @property @@ -206,7 +206,7 @@ def file(self) -> str: @property def get_hash(self) -> str: - return self.file_hash + return self._hash class RawUploadedFile(UploadedFile): From c87263ecc0f781d7e7f8c4a0512a9d16f12800c7 Mon Sep 17 00:00:00 2001 From: lyam Date: Tue, 28 May 2024 20:14:15 +0200 Subject: [PATCH 08/18] Fix: last mypy erro --- src/aleph/web/controllers/storage.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index d4b65012e..cda3b2de1 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -142,7 +142,6 @@ def content(self) -> bytes: def file(self): raise NotImplementedError - @property def get_hash(self) -> str: raise NotImplementedError @@ -204,8 +203,9 @@ def content(self) -> bytes: def file(self) -> str: return self._temp_file.name - @property def get_hash(self) -> str: + if self._hash is None: + raise ValueError("Hash has not been computed yet") return self._hash @@ -253,7 +253,6 @@ def content(self) -> bytes: def file(self): return self._temp_file.name - @property def get_hash(self) -> str: if self._hash is None: raise ValueError("Hash has not been computed yet") @@ -268,7 +267,7 @@ async def _check_and_add_file( file: UploadedFile, grace_period: int, ) -> str: - file_hash = file.get_hash + file_hash = file.get_hash() # Perform authentication and balance checks if message: await _verify_message_signature( @@ -294,9 +293,13 @@ async def _check_and_add_file( message_content = None file_content = file.content - file_bytes = ( - file_content.encode("utf-8") if isinstance(file_content, str) else file_content - ) + + if isinstance(file_content, bytes): + file_bytes = file_content + elif isinstance(file_content, str): + file_bytes = file_content.encode("utf-8") + else: + raise web.HTTPUnprocessableEntity(reason="Invalid file content type") await storage_service.add_file_content_to_local_storage( session=session, @@ -349,6 +352,9 @@ async def storage_add_file(request: web.Request): uploaded_file = RawUploadedFile(request=request, max_size=MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE) await uploaded_file.read_and_validate() + if uploaded_file is None: + raise web.HTTPBadRequest(reason="No file uploaded") + max_upload_size = ( MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE ) From e77361a790c12b2709419a554b1d520bfea619af Mon Sep 17 00:00:00 2001 From: lyam Date: Thu, 30 May 2024 13:10:28 +0200 Subject: [PATCH 09/18] Refactor: Uploaded_file & storage_upload --- setup.cfg | 2 + src/aleph/web/controllers/storage.py | 217 ++++++++++----------------- 2 files changed, 81 insertions(+), 138 deletions(-) diff --git a/setup.cfg b/setup.cfg index 1b37e0054..933510ef5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -77,6 +77,8 @@ install_requires = urllib3==2.0.7 uvloop==0.19.0 web3==6.11.2 + aiofiles==23.2.1 + types-aiofiles==23.2.0.20240403 dependency_links = https://github.com/aleph-im/py-libp2p/tarball/0.1.4-1-use-set#egg=libp2p diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index cda3b2de1..9aab9a275 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -7,6 +7,7 @@ from typing import Optional import aio_pika +import aiofiles import pydantic from aiohttp import BodyPartReader, web from aiohttp.web_request import FileField @@ -118,145 +119,85 @@ class StorageMetadata(pydantic.BaseModel): class UploadedFile: - def __enter__(self): - raise NotImplementedError + def __init__(self, max_size: int): + self.max_size = max_size + self._hasher = hashlib.sha256() + self._hash = "" + self._size = 0 + self._temp_file_path = None + self._temp_file = None + + async def __aenter__(self): + if not self._temp_file_path: + raise ValueError("File content has not been validated and read yet.") + self._temp_file = await aiofiles.open(self._temp_file_path, 'rb') + return self._temp_file - def __exit__(self, exc_type, exc_value, traceback): - self.cleanup() + async def __aexit__(self, exc_type, exc_value, traceback): + await self.cleanup() - def cleanup(self): - pass + async def cleanup(self): + if self._temp_file: + await self._temp_file.close() + if self._temp_file_path: + os.remove(self._temp_file_path) + self._temp_file_path = None async def read_and_validate(self): - pass + total_read = 0 + chunk_size = 8192 - @property - def size(self) -> int: - raise NotImplementedError + with tempfile.NamedTemporaryFile('w+b', delete=False) as temp_file: + self._temp_file_path = temp_file.name - @property - def content(self) -> bytes: - raise NotImplementedError + async with aiofiles.open(self._temp_file_path, 'w+b') as f: + async for chunk in self._read_chunks(chunk_size): + total_read += len(chunk) + if total_read > self.max_size: + raise web.HTTPRequestEntityTooLarge( + reason="File size exceeds the maximum limit.", + max_size=self.max_size, + actual_size=total_read, + ) + self._hasher.update(chunk) + await f.write(chunk) + + self._hash = self._hasher.hexdigest() + self._size = total_read + await f.seek(0) + + async def _read_chunks(self, chunk_size): + raise NotImplementedError("Subclasses must implement this method") @property - def file(self): - raise NotImplementedError + async def size(self) -> int: + return self._size def get_hash(self) -> str: - raise NotImplementedError + return self._hash class MultipartUploadedFile(UploadedFile): def __init__(self, file_field: BodyPartReader, max_size: int): + super().__init__(max_size) self.file_field = file_field - self.max_size = max_size - self._hash = None - try: - self._temp_file = tempfile.NamedTemporaryFile(delete=False) - self._file_content = bytearray() - except Exception as e: - web.HTTPInternalServerError(reason="Cannot create tempfile") - - def __enter__(self): - self._temp_file.seek(0) - return self - def __exit__(self, exc_type, exc_value, traceback): - try: - self._temp_file.close() - if os.path.exists(self._temp_file.name): - os.unlink(self._temp_file.name) - except Exception as e: - web.HTTPInternalServerError(reason="Cannot create tempfile") - - async def read_and_validate(self): - total_read = 0 - chunk_size = 8192 - hash_sha256 = hashlib.sha256() - - while total_read < self.max_size: + async def _read_chunks(self, chunk_size): + while True: chunk = await self.file_field.read_chunk(chunk_size) if not chunk: break - total_read += len(chunk) - if total_read > self.max_size: - raise web.HTTPRequestEntityTooLarge( - reason="File size exceeds the maximum limit.", - max_size=self.max_size, - actual_size=total_read, - ) - self._temp_file.write(chunk) - self._file_content.extend(chunk) - hash_sha256.update(chunk) - self._hash = hash_sha256.hexdigest() - self._temp_file.seek(0) - - @property - def size(self) -> int: - return os.path.getsize(self._temp_file.name) - - @property - def content(self) -> bytes: - return bytes(self._file_content) - - @property - def file(self) -> str: - return self._temp_file.name - - def get_hash(self) -> str: - if self._hash is None: - raise ValueError("Hash has not been computed yet") - return self._hash + yield chunk class RawUploadedFile(UploadedFile): def __init__(self, request: web.Request, max_size: int): + super().__init__(max_size) self.request = request - self.max_size = max_size - self._temp_file = tempfile.NamedTemporaryFile(delete=False) - self._hasher = hashlib.sha256() - self._size = 0 - self._hash = None - - async def read_and_validate(self): - async for chunk in self.request.content.iter_chunked(8192): - self._temp_file.write(chunk) - self._hasher.update(chunk) - self._size += len(chunk) - if self._size > self.max_size: - raise web.HTTPRequestEntityTooLarge( - reason="File size exceeds the maximum limit.", - max_size=self.max_size, - actual_size=self._size, - ) - self._temp_file.seek(0) - self._hash = self._hasher.hexdigest() - - def __enter__(self): - self._temp_file.seek(0) - return self._temp_file - def __exit__(self, exc_type, exc_value, traceback): - self._temp_file.close() - os.unlink(self._temp_file.name) - - @property - def size(self) -> int: - return self._size - - @property - def content(self) -> bytes: - self._temp_file.seek(0) - return self._temp_file.read() - - @property - def file(self): - return self._temp_file.name - - def get_hash(self) -> str: - if self._hash is None: - raise ValueError("Hash has not been computed yet") - return self._hash + async def _read_chunks(self, chunk_size): + async for chunk in self.request.content.iter_chunked(chunk_size): + yield chunk async def _check_and_add_file( @@ -287,39 +228,37 @@ async def _check_and_add_file( await _verify_user_balance( session=session, address=message_content.address, - size=file.size, + size=await file.size, ) else: message_content = None - file_content = file.content - - if isinstance(file_content, bytes): - file_bytes = file_content - elif isinstance(file_content, str): - file_bytes = file_content.encode("utf-8") - else: - raise web.HTTPUnprocessableEntity(reason="Invalid file content type") - - await storage_service.add_file_content_to_local_storage( - session=session, - file_content=file_bytes, - file_hash=file_hash, - ) + async with file as uploaded_file: + file_content = await uploaded_file.read() + if isinstance(file_content, bytes): + file_bytes = file_content + elif isinstance(file_content, str): + file_bytes = file_content.encode("utf-8") + else: + raise web.HTTPUnprocessableEntity(reason="Invalid file content type") + await storage_service.add_file_content_to_local_storage( + session=session, + file_content=file_bytes, + file_hash=file_hash + ) # For files uploaded without authenticated upload, add a grace period of 1 day. if not message_content: add_grace_period_for_file( session=session, file_hash=file_hash, hours=grace_period ) - return file_hash async def _make_mq_queue( - request: web.Request, - sync: bool, - routing_key: Optional[str] = None, + request: web.Request, + sync: bool, + routing_key: Optional[str] = None, ) -> Optional[aio_pika.abc.AbstractQueue]: if not sync: return None @@ -358,6 +297,13 @@ async def storage_add_file(request: web.Request): max_upload_size = ( MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE ) + file_size : int = await uploaded_file.size + if file_size > max_upload_size: + raise web.HTTPRequestEntityTooLarge( + actual_size=file_size, max_size=max_upload_size + ) + + uploaded_file.max_size = max_upload_size status_code = 200 @@ -378,11 +324,6 @@ async def storage_add_file(request: web.Request): message = None sync = False - if uploaded_file.size > max_upload_size: - raise web.HTTPRequestEntityTooLarge( - actual_size=uploaded_file.size, max_size=max_upload_size - ) - with session_factory() as session: file_hash = await _check_and_add_file( session=session, From 74375069db9891a470f40921015104abbcec06fb Mon Sep 17 00:00:00 2001 From: lyam Date: Thu, 30 May 2024 20:11:22 +0200 Subject: [PATCH 10/18] Refactor: remove the context manager from UploadedFile class to just use tempfile --- src/aleph/web/controllers/storage.py | 61 ++++++++++++++-------------- tests/api/test_storage.py | 1 - 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 9aab9a275..a70676639 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -96,7 +96,7 @@ async def add_storage_json_controller(request: web.Request): async def _verify_message_signature( - pending_message: BasePendingMessage, signature_verifier: SignatureVerifier + pending_message: BasePendingMessage, signature_verifier: SignatureVerifier ) -> None: try: await signature_verifier.verify_signature(pending_message) @@ -127,18 +127,19 @@ def __init__(self, max_size: int): self._temp_file_path = None self._temp_file = None - async def __aenter__(self): + async def open_temp_file(self): if not self._temp_file_path: raise ValueError("File content has not been validated and read yet.") self._temp_file = await aiofiles.open(self._temp_file_path, 'rb') return self._temp_file - async def __aexit__(self, exc_type, exc_value, traceback): - await self.cleanup() - - async def cleanup(self): + async def close_temp_file(self): if self._temp_file: await self._temp_file.close() + self._temp_file = None + + async def cleanup(self): + await self.close_temp_file() if self._temp_file_path: os.remove(self._temp_file_path) self._temp_file_path = None @@ -147,8 +148,9 @@ async def read_and_validate(self): total_read = 0 chunk_size = 8192 - with tempfile.NamedTemporaryFile('w+b', delete=False) as temp_file: - self._temp_file_path = temp_file.name + temp_file = tempfile.NamedTemporaryFile('w+b', delete=False) + self._temp_file_path = temp_file.name + temp_file.close() async with aiofiles.open(self._temp_file_path, 'w+b') as f: async for chunk in self._read_chunks(chunk_size): @@ -159,7 +161,7 @@ async def read_and_validate(self): max_size=self.max_size, actual_size=total_read, ) - self._hasher.update(chunk) + self._hasher.update(chunk) # Update file hash while reading the file await f.write(chunk) self._hash = self._hasher.hexdigest() @@ -170,7 +172,7 @@ async def _read_chunks(self, chunk_size): raise NotImplementedError("Subclasses must implement this method") @property - async def size(self) -> int: + def size(self) -> int: return self._size def get_hash(self) -> str: @@ -183,10 +185,7 @@ def __init__(self, file_field: BodyPartReader, max_size: int): self.file_field = file_field async def _read_chunks(self, chunk_size): - while True: - chunk = await self.file_field.read_chunk(chunk_size) - if not chunk: - break + async for chunk in self.file_field.__aiter__(): yield chunk @@ -228,25 +227,27 @@ async def _check_and_add_file( await _verify_user_balance( session=session, address=message_content.address, - size=await file.size, + size=file.size, ) else: message_content = None - async with file as uploaded_file: - file_content = await uploaded_file.read() - if isinstance(file_content, bytes): - file_bytes = file_content - elif isinstance(file_content, str): - file_bytes = file_content.encode("utf-8") - else: - raise web.HTTPUnprocessableEntity(reason="Invalid file content type") + temp_file = await file.open_temp_file() + file_content = await temp_file.read() + + if isinstance(file_content, bytes): + file_bytes = file_content + elif isinstance(file_content, str): + file_bytes = file_content.encode("utf-8") + else: + raise web.HTTPUnprocessableEntity(reason="Invalid file content type") + await storage_service.add_file_content_to_local_storage( + session=session, + file_content=file_bytes, + file_hash=file_hash + ) + await file.cleanup() - await storage_service.add_file_content_to_local_storage( - session=session, - file_content=file_bytes, - file_hash=file_hash - ) # For files uploaded without authenticated upload, add a grace period of 1 day. if not message_content: add_grace_period_for_file( @@ -292,12 +293,12 @@ async def storage_add_file(request: web.Request): await uploaded_file.read_and_validate() if uploaded_file is None: - raise web.HTTPBadRequest(reason="No file uploaded") + raise web.HTTPBadRequest(reason="File should be sent as FormData or Raw Upload") max_upload_size = ( MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE ) - file_size : int = await uploaded_file.size + file_size: int = uploaded_file.size if file_size > max_upload_size: raise web.HTTPRequestEntityTooLarge( actual_size=file_size, max_size=max_upload_size diff --git a/tests/api/test_storage.py b/tests/api/test_storage.py index 8f3486862..7fc383dd3 100644 --- a/tests/api/test_storage.py +++ b/tests/api/test_storage.py @@ -136,7 +136,6 @@ async def add_file( ): form_data = aiohttp.FormData() form_data.add_field("file", file_content) - print(file_content) post_response = await api_client.post(uri, data=form_data) response_text = await post_response.text() From f8d4b3f02959cbea65328acc45b7ce473556bd7b Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 17:57:29 +0200 Subject: [PATCH 11/18] refactor(storage): avoid using private variables when not needed --- src/aleph/web/controllers/storage.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index a70676639..858590157 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -121,9 +121,9 @@ class StorageMetadata(pydantic.BaseModel): class UploadedFile: def __init__(self, max_size: int): self.max_size = max_size + self.hash = "" + self.size = 0 self._hasher = hashlib.sha256() - self._hash = "" - self._size = 0 self._temp_file_path = None self._temp_file = None @@ -164,19 +164,15 @@ async def read_and_validate(self): self._hasher.update(chunk) # Update file hash while reading the file await f.write(chunk) - self._hash = self._hasher.hexdigest() - self._size = total_read + self.hash = self._hasher.hexdigest() + self.size = total_read await f.seek(0) async def _read_chunks(self, chunk_size): raise NotImplementedError("Subclasses must implement this method") - @property - def size(self) -> int: - return self._size - def get_hash(self) -> str: - return self._hash + return self._hasher.hexdigest() class MultipartUploadedFile(UploadedFile): From beda20b316ae361df7b9a8c74b2d6d1dac64a366 Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 17:58:06 +0200 Subject: [PATCH 12/18] feat(storage): make UploadedFile.cleanup callable multiple times --- src/aleph/web/controllers/storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 858590157..b5667aed6 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -134,13 +134,13 @@ async def open_temp_file(self): return self._temp_file async def close_temp_file(self): - if self._temp_file: + if self._temp_file is not None: await self._temp_file.close() self._temp_file = None async def cleanup(self): await self.close_temp_file() - if self._temp_file_path: + if self._temp_file_path and os.path.exists(self._temp_file_path): os.remove(self._temp_file_path) self._temp_file_path = None From f7fc4311fcd9deba03265a07aac7b5a37b6a15ea Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 17:58:37 +0200 Subject: [PATCH 13/18] doc(storage): add comment regarding NamedTemporaryFile API change in python 3.12 --- src/aleph/web/controllers/storage.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index b5667aed6..614e99e0c 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -148,6 +148,16 @@ async def read_and_validate(self): total_read = 0 chunk_size = 8192 + # From aiofiles changelog: + # On Python 3.12, aiofiles.tempfile.NamedTemporaryFile now accepts a + # delete_on_close argument, just like the stdlib version. + # On Python 3.12, aiofiles.tempfile.NamedTemporaryFile no longer + # exposes a delete attribute, just like the stdlib version. + # + # so we might need to modify this code for python 3.12 at some point + + # it would be ideal to uses aiofiles.tempfile.NamedTemporaryFile but it + # doesn't seems to be able to support our current workflow temp_file = tempfile.NamedTemporaryFile('w+b', delete=False) self._temp_file_path = temp_file.name temp_file.close() From 3d49f4cd8e5365812cdd6291addde611ecbfe8fe Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 17:59:08 +0200 Subject: [PATCH 14/18] refactor(storage): rename inner function variable for better readability --- src/aleph/web/controllers/storage.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 614e99e0c..7f3ee4365 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -210,10 +210,10 @@ async def _check_and_add_file( signature_verifier: SignatureVerifier, storage_service: StorageService, message: Optional[PendingStoreMessage], - file: UploadedFile, + uploaded_file: UploadedFile, grace_period: int, ) -> str: - file_hash = file.get_hash() + file_hash = uploaded_file.get_hash() # Perform authentication and balance checks if message: await _verify_message_signature( @@ -233,12 +233,12 @@ async def _check_and_add_file( await _verify_user_balance( session=session, address=message_content.address, - size=file.size, + size=uploaded_file.size, ) else: message_content = None - temp_file = await file.open_temp_file() + temp_file = await uploaded_file.open_temp_file() file_content = await temp_file.read() if isinstance(file_content, bytes): @@ -252,7 +252,7 @@ async def _check_and_add_file( file_content=file_bytes, file_hash=file_hash ) - await file.cleanup() + await uploaded_file.cleanup() # For files uploaded without authenticated upload, add a grace period of 1 day. if not message_content: From 1049a661968dd51660d18615a95d015d83bf03db Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 18:00:26 +0200 Subject: [PATCH 15/18] feat(storage): make web.HTTPUnprocessableEntity exception message more useful --- src/aleph/web/controllers/storage.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 7f3ee4365..d0e85dce4 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -246,7 +246,8 @@ async def _check_and_add_file( elif isinstance(file_content, str): file_bytes = file_content.encode("utf-8") else: - raise web.HTTPUnprocessableEntity(reason="Invalid file content type") + raise web.HTTPUnprocessableEntity(reason=f"Invalid file content type, got {type(file_content)}") + await storage_service.add_file_content_to_local_storage( session=session, file_content=file_bytes, From e69d550741be0a97722540e46195c2befc520023 Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 18:00:56 +0200 Subject: [PATCH 16/18] refactor(storage): make if more pythonic --- src/aleph/web/controllers/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index d0e85dce4..177fe1779 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -256,7 +256,7 @@ async def _check_and_add_file( await uploaded_file.cleanup() # For files uploaded without authenticated upload, add a grace period of 1 day. - if not message_content: + if message_content is None: add_grace_period_for_file( session=session, file_hash=file_hash, hours=grace_period ) From 110c4ee28f9d2bc2d9985d659c770e14266dfc79 Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 18:01:51 +0200 Subject: [PATCH 17/18] fix(storage): ensure that uploaded temporary file is **always** cleanup --- src/aleph/web/controllers/storage.py | 114 ++++++++++++++------------- 1 file changed, 59 insertions(+), 55 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 177fe1779..9e66c690d 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -287,69 +287,73 @@ async def storage_add_file(request: web.Request): metadata = None uploaded_file: Optional[UploadedFile] = None - if request.content_type == "multipart/form-data": - reader = await request.multipart() - async for part in reader: - if part.name == 'file': - uploaded_file = MultipartUploadedFile(part, MAX_FILE_SIZE) - await uploaded_file.read_and_validate() - elif part.name == 'metadata': - metadata = await part.read(decode=True) - else: - uploaded_file = RawUploadedFile(request=request, max_size=MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE) - await uploaded_file.read_and_validate() - - if uploaded_file is None: - raise web.HTTPBadRequest(reason="File should be sent as FormData or Raw Upload") - - max_upload_size = ( - MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE - ) - file_size: int = uploaded_file.size - if file_size > max_upload_size: - raise web.HTTPRequestEntityTooLarge( - actual_size=file_size, max_size=max_upload_size + try: + if request.content_type == "multipart/form-data": + reader = await request.multipart() + async for part in reader: + if part.name == 'file': + uploaded_file = MultipartUploadedFile(part, MAX_FILE_SIZE) + await uploaded_file.read_and_validate() + elif part.name == 'metadata': + metadata = await part.read(decode=True) + else: + uploaded_file = RawUploadedFile(request=request, max_size=MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE) + await uploaded_file.read_and_validate() + + if uploaded_file is None: + raise web.HTTPBadRequest(reason="File should be sent as FormData or Raw Upload") + + max_upload_size = ( + MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE if not metadata else MAX_FILE_SIZE ) + if uploaded_file.size > max_upload_size: + raise web.HTTPRequestEntityTooLarge( + actual_size=uploaded_file.size, max_size=max_upload_size + ) - uploaded_file.max_size = max_upload_size + uploaded_file.max_size = max_upload_size - status_code = 200 + status_code = 200 - if metadata: - metadata_bytes = ( - metadata.file.read() if isinstance(metadata, FileField) else metadata - ) - try: - storage_metadata = StorageMetadata.parse_raw(metadata_bytes) - except ValidationError as e: - raise web.HTTPUnprocessableEntity( - reason=f"Could not decode metadata: {e.json()}" + if metadata: + metadata_bytes = ( + metadata.file.read() if isinstance(metadata, FileField) else metadata ) + try: + storage_metadata = StorageMetadata.parse_raw(metadata_bytes) + except ValidationError as e: + raise web.HTTPUnprocessableEntity( + reason=f"Could not decode metadata: {e.json()}" + ) - message = storage_metadata.message - sync = storage_metadata.sync - else: - message = None - sync = False + message = storage_metadata.message + sync = storage_metadata.sync + else: + message = None + sync = False + + with session_factory() as session: + file_hash = await _check_and_add_file( + session=session, + signature_verifier=signature_verifier, + storage_service=storage_service, + message=message, + file=uploaded_file, + grace_period=grace_period, + ) + session.commit() + if message: + broadcast_status = await broadcast_and_process_message( + pending_message=message, sync=sync, request=request, logger=logger + ) + status_code = broadcast_status_to_http_status(broadcast_status) - with session_factory() as session: - file_hash = await _check_and_add_file( - session=session, - signature_verifier=signature_verifier, - storage_service=storage_service, - message=message, - file=uploaded_file, - grace_period=grace_period, - ) - session.commit() - if message: - broadcast_status = await broadcast_and_process_message( - pending_message=message, sync=sync, request=request, logger=logger - ) - status_code = broadcast_status_to_http_status(broadcast_status) + output = {"status": "success", "hash": file_hash} + return web.json_response(data=output, status=status_code) - output = {"status": "success", "hash": file_hash} - return web.json_response(data=output, status=status_code) + finally: + if uploaded_file is not None: + await uploaded_file.cleanup() def assert_file_is_downloadable(session: DbSession, file_hash: str) -> None: From 526de081bc1d2cc016a57f68ad0b8de9dbc1e281 Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Wed, 5 Jun 2024 18:12:15 +0200 Subject: [PATCH 18/18] fix(storage): function argument name has changed --- src/aleph/web/controllers/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 9e66c690d..ac90d8ca4 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -338,7 +338,7 @@ async def storage_add_file(request: web.Request): signature_verifier=signature_verifier, storage_service=storage_service, message=message, - file=uploaded_file, + uploaded_file=uploaded_file, grace_period=grace_period, ) session.commit()