Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: authenticated file upload SDK #57

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions examples/store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from pathlib import Path
from typing import Optional, Literal

import click
from aleph_message.models import StoreMessage
Expand All @@ -8,8 +10,11 @@
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AuthenticatedAlephClient
from aleph.sdk.conf import settings
from aleph.sdk.types import Account, StorageEnum
from aleph_message.utils import Mebibytes

DEFAULT_SERVER = "https://api2.aleph.im"
MiB = 2**20


async def print_output_hash(message: StoreMessage, status: MessageStatus):
Expand All @@ -22,7 +27,13 @@ async def print_output_hash(message: StoreMessage, status: MessageStatus):
)


async def do_upload(account, engine, channel, filename=None, file_hash=None):
async def do_upload(
account: Account,
engine: Literal["STORAGE", "IPFS"],
channel: Optional[str] = None,
filename: Optional[str] = None,
file_hash: Optional[str] = None,
):
async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
) as session:
Expand All @@ -38,7 +49,7 @@ async def do_upload(account, engine, channel, filename=None, file_hash=None):
message, status = await session.create_store(
file_content=content,
channel=channel,
storage_engine=engine.lower(),
storage_engine=StorageEnum(engine.lower()),
)
except IOError:
print("File not accessible")
Expand All @@ -48,7 +59,7 @@ async def do_upload(account, engine, channel, filename=None, file_hash=None):
message, status = await session.create_store(
file_hash=file_hash,
channel=channel,
storage_engine=engine.lower(),
storage_engine=StorageEnum(engine.lower()),
)

await print_output_hash(message, status)
Expand Down Expand Up @@ -76,7 +87,12 @@ async def do_upload(account, engine, channel, filename=None, file_hash=None):
default="TEST",
help="Channel to write in (default: TEST)",
)
def main(filename, pkey=None, storage_engine="IPFS", channel="TEST"):
def main(
filename,
pkey=None,
storage_engine: Literal["IPFS", "STORAGE"] = "IPFS",
channel="TEST",
):
"""Uploads or store FILENAME.

If FILENAME is an IPFS multihash and IPFS is selected as an engine (default), don't try to upload, just pin it to the network.
Expand Down
89 changes: 88 additions & 1 deletion src/aleph/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,42 @@ async def storage_push_file(self, file_content) -> str:
resp.raise_for_status()
return (await resp.json()).get("hash")

async def _storage_push_file_with_message(
self,
file_content: bytes,
store_content: StoreContent,
channel: Optional[str] = None,
sync: bool = False,
) -> Tuple[StoreMessage, MessageStatus]:
"""Push a file to the storage service."""
data = aiohttp.FormData()

# Prepare the STORE message
message = await self._prepare_aleph_message(
message_type=MessageType.store,
content=store_content.dict(exclude_none=True),
channel=channel,
)
metadata = {
"message": message.dict(exclude_none=True),
"sync": sync,
}
data.add_field(
"metadata", json.dumps(metadata), content_type="application/json"
)
# Add the file
data.add_field("file", file_content)

url = "/api/v0/storage/add_file"
logger.debug(f"Posting file on {url}")

async with self.http_session.post(url, data=data) as resp:
resp.raise_for_status()
message_status = (
MessageStatus.PENDING if resp.status == 202 else MessageStatus.PROCESSED
)
return message, message_status

@staticmethod
def _log_publication_status(publication_status: Mapping[str, Any]):
status = publication_status.get("status")
Expand Down Expand Up @@ -1181,6 +1217,45 @@ async def create_aggregate(
sync=sync,
)

async def _upload_file_native(
self,
address: str,
file_content: bytes,
guess_mime_type: bool = False,
ref: Optional[str] = None,
extra_fields: Optional[dict] = None,
channel: Optional[str] = None,
sync: bool = False,
) -> Tuple[StoreMessage, MessageStatus]:
file_hash = hashlib.sha256(file_content).hexdigest()
if magic and guess_mime_type:
mime_type = magic.from_buffer(file_content, mime=True)
else:
mime_type = None

store_content = StoreContent(
address=address,
ref=ref,
item_type=StorageEnum.storage,
item_hash=file_hash,
mime_type=mime_type,
time=time.time(),
**extra_fields,
)
message, _ = await self._storage_push_file_with_message(
file_content=file_content,
store_content=store_content,
channel=channel,
sync=sync,
)

# Some nodes may not implement authenticated file upload yet. As we cannot detect
# this easily, broadcast the message a second time to ensure publication on older
# nodes.
status = await self._broadcast(message=message, sync=sync)
return message, status


async def create_store(
self,
address: Optional[str] = None,
Expand Down Expand Up @@ -1208,8 +1283,19 @@ async def create_store(
file_content = Path(file_path).read_bytes()

if storage_engine == StorageEnum.storage:
file_hash = await self.storage_push_file(file_content=file_content)
# Upload the file and message all at once using authenticated upload.
return await self._upload_file_native(
address=address,
file_content=file_content,
guess_mime_type=guess_mime_type,
ref=ref,
extra_fields=extra_fields,
channel=channel,
sync=sync,
)
elif storage_engine == StorageEnum.ipfs:
# We do not support authenticated upload for IPFS yet. Use the legacy method
# of uploading the file first then publishing the message using POST /messages.
file_hash = await self.ipfs_push_file(file_content=file_content)
else:
raise ValueError(f"Unknown storage engine: '{storage_engine}'")
Expand Down Expand Up @@ -1371,6 +1457,7 @@ async def _prepare_aleph_message(
allow_inlining: bool = True,
storage_engine: StorageEnum = StorageEnum.storage,
) -> AlephMessage:

message_dict: Dict[str, Any] = {
"sender": self.account.get_address(),
"chain": self.account.CHAIN,
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
def status(self):
return 200 if self.sync else 202

def raise_for_status(self):
pass

async def json(self):
message_status = "processed" if self.sync else "pending"
return {
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import hashlib

import pytest
from aleph_message.models import StoreMessage
from aleph_message.status import MessageStatus

from aleph.sdk import AuthenticatedAlephClient
from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.types import StorageEnum


@pytest.mark.asyncio
async def test_upload_with_message():
pkey = get_fallback_private_key()
account = ETHAccount(private_key=pkey)

content = b"Test pyaleph upload\n"
file_hash = hashlib.sha256(content).hexdigest()

async with AuthenticatedAlephClient(account=account, api_server=None) as client:
message, status = await client.create_store(
address=account.get_address(),
file_content=content,
storage_engine=StorageEnum.storage,
sync=True,
)
print(message, status)

assert status == MessageStatus.PROCESSED
assert message.content.item_hash == file_hash

server_content = await client.download_file(file_hash=file_hash)
assert server_content == content

server_message = await client.get_message(
item_hash=message.item_hash, message_type=StoreMessage
)
assert server_message.content.item_hash == file_hash