Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add azure storage support #11

Merged
merged 5 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The following **command line clients** are used to access the various databases:
| Local storage | unix shell | Included in standard distributions. |
| SFTP storage | `sftp`, `curl` | |
| Google Cloud Storage | `gsutil` | From [https://cloud.google.com/storage/docs/gsutil_install](https://cloud.google.com/storage/docs/gsutil_install). |
| Azure Storage | `azcopy` | From [https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10)

 

Expand Down
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ This section focuses on the supported storages.
:maxdepth: 2

storages-overview
storages/azure
storages/gcs
storages/local
storages/sftp
storages/gcs


API Reference
Expand Down
13 changes: 9 additions & 4 deletions docs/storages-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ Overview

The following storages are supported

| Storage | Configuration class |
| ---------------------- | ------------------- |
| Local bash | LocalStorage |
| [Google Cloud Storage] | GoogleCloudStorage |
| Storage | Configuration class |
| ------------------------- | ------------------- |
| Local bash | LocalStorage |
| [Google Cloud Storage] | GoogleCloudStorage |
| [Azure Blob Storage] | AzureStorage |
| [Azure Data Lake Storage] | AzureStorage |

[Google Cloud Storage]: https://cloud.google.com/storage
[Azure Blob Storage]: https://azure.microsoft.com/en-us/products/storage/blobs
[Azure Data Lake Storage]: https://azure.microsoft.com/en-us/products/storage/data-lake-storage/


Function support matrix
Expand All @@ -18,6 +22,7 @@ Function support matrix
| --------------------- | ---- | ----- | ------ | ------ |
| LocalStorage | Yes | Yes | Yes | Yes
| GoogleCloudStorage | Yes | Yes | Yes | Yes
| AzureStorage | Yes | Yes | Yes | Yes

```{note}
A `Move` operation is not implemented by design. Most of the blob storages do not
Expand Down
75 changes: 75 additions & 0 deletions docs/storages/azure.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
Google Cloud Storage
====================

Accessing a Azure Blob Storage (GCS) with the shell tool `azcopy`.

Installation
------------

You need to install `azcopy`. Take a look at `Get started with Azcopy <https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10>`_.


Configuration examples
----------------------

```{note}
Currently some of the functions require a SAS token, and some of the functions
require a account key. It is recommended to provide both a SAS token
and a account key.
```

.. tabs::

.. group-tab:: SAS token

.. code-block:: python

import pathlib
import mara_storage.storages
mara_storage.config.storages = lambda: {
'data': mara_storage.storages.AzureStorage(
account_name='account-name',
container_name='container-name',
sas='sp=racwdlm&st=2022-05-11T10:04:05Z&se=2023-05-11T18:04:05Z&spr=https&sv=2020-08-04&sr=c&sig=u7tqxugyv5MbyrtFdEUp22tnou4wifBoUfIaLDazeRT%3D'),

# optional
storage_type = 'dfs' # use a dfs client instead of 'blob' (default value)
}

.. group-tab:: Account key

.. code-block:: python

import pathlib
import mara_storage.storages
mara_storage.config.storages = lambda: {
'data': mara_storage.storages.AzureStorage(
account_name='account-name',
container_name='container-name',
account_key='<key>',

# optional
storage_type = 'dfs' # use a dfs client instead of 'blob' (default value)
),
}

|

|

API reference
-------------

This section contains database specific API in the module.


Configuration
~~~~~~~~~~~~~

.. module:: mara_storage.storages
:noindex:

.. autoclass:: AzureStorage
:special-members: __init__
:inherited-members:
:members:
16 changes: 16 additions & 0 deletions mara_storage/.scripts/install.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

install-azcopy:
# install azcopy in the virtual environment
# see also: https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10

# download azcopy
wget https://aka.ms/downloadazcopy-v10-linux
tar -xvf downloadazcopy-v10-linux

# install
rm -f .venv/bin/azcopy
mv ./azcopy_linux_amd64_*/azcopy .venv/bin/azcopy

# clean up
rm downloadazcopy-v10-linux
rm -rf ./azcopy_linux_amd64_*/ downloadazcopy-v10-linux
56 changes: 56 additions & 0 deletions mara_storage/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import datetime

from mara_storage.client import StorageClient
from . import storages

from azure.storage.blob import BlobClient, BlobServiceClient


def init_client(storage: storages.AzureStorage, path: str = None) -> BlobClient:
client = BlobClient.from_blob_url(storage.build_uri(path))
return client

def init_service_client(storage: storages.AzureStorage, path: str = None) -> BlobServiceClient:
client = BlobServiceClient.from_connection_string(storage.connection_string())
return client


class AzureStorageClient(StorageClient):
def __init__(self, storage: storages.AzureStorage):
super().__init__(storage)

self.__blob_service_client: BlobServiceClient = None
self.__container_client = None

@property
def _blob_service_client(self):
if not self.__blob_service_client:
self.__blob_service_client = init_service_client(self._storage)

return self.__blob_service_client

@property
def _container_client(self):
if not self.__container_client:
self.__container_client = self._blob_service_client.get_container_client(self._storage.container_name)

return self.__container_client

def creation_timestamp(self, path: str) -> datetime.datetime:
blob_client = self._container_client.get_blob_client(path)
properties = blob_client.get_blob_properties()

return properties.creation_time

def last_modification_timestamp(self, path: str) -> datetime.datetime:
blob_client = self._container_client.get_blob_client(path)
properties = blob_client.get_blob_properties()

return properties.last_modified

def iterate_files(self, file_pattern: str):
blobs = self._container_client.list_blobs(name_starts_with=file_pattern)

for blob in blobs:
if blob:
yield blob.name
5 changes: 5 additions & 0 deletions mara_storage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ def __(storage: storages.LocalStorage):
def __(storage: storages.GoogleCloudStorage):
from .google_cloud_storage import GoogleCloudStorageClient
return GoogleCloudStorageClient

leo-schick marked this conversation as resolved.
Show resolved Hide resolved
@storage_client_type.register(storages.AzureStorage)
def __(storage: storages.AzureStorage):
from .azure import AzureStorageClient
return AzureStorageClient
7 changes: 7 additions & 0 deletions mara_storage/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ def __(storage: storages.GoogleCloudStorage, file_name: str):

(exitcode, _) = subprocess.getstatusoutput(command)
return exitcode == 0


@file_exists.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str):
from . import azure
client = azure.init_client(storage, path=file_name)
return client.exists()
20 changes: 20 additions & 0 deletions mara_storage/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ def __(storage: storages.GoogleCloudStorage):
assert exitcode == 0


@ensure_storage.register(storages.AzureStorage)
def __(storage: storages.AzureStorage):
from . import azure
client = azure.init_service_client(storage)
container_client = client.get_container_client(container=storage.container_name)

if not container_client.exists():
container_client.create_container()


# -----------------------------------------------------------------------------


Expand Down Expand Up @@ -93,3 +103,13 @@ def __(storage: storages.GoogleCloudStorage, force: bool = False):
if exitcode != 0:
raise Exception(f'An error occured while dropping a GCS bucket. Stdout:\n{stdout}')
assert exitcode == 0


@drop_storage.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, force: bool = False):
from . import azure
client = azure.init_service_client(storage)
container_client = client.get_container_client(container=storage.container_name)

if container_client.exists():
container_client.delete_container()
51 changes: 51 additions & 0 deletions mara_storage/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ def __(storage: storages.GoogleCloudStorage, file_name: str, compression: Compre
+ (f'\\\n | {uncompressor(compression)} - ' if compression != Compression.NONE else ''))


@read_file_command.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str, compression: Compression = Compression.NONE):
if storage.sas:
return (f'curl -sf {shlex.quote(storage.build_uri(path=file_name))}'
+ (f'\\\n | {uncompressor(compression)} - ' if compression != Compression.NONE else ''))

azlogin_env = ('AZCOPY_AUTO_LOGIN_TYPE=SPN '
+ f'AZCOPY_TENANT_ID="{storage.spa_tenant}" '
+ f'AZCOPY_SPA_APPLICATION_ID="{storage.spa_application}" '
+ f'AZCOPY_SPA_CLIENT_SECRET="{storage.spa_client_secret}" '
) if not storage.sas else ''

return (f'{azlogin_env}azcopy cp '
+ shlex.quote(storage.build_uri(file_name))
+ ' --from-to BlobPipe'
+ (f'\\\n | {uncompressor(compression)} - ' if compression != Compression.NONE else ''))


# -----------------------------------------------------------------------------


Expand Down Expand Up @@ -135,6 +153,23 @@ def __(storage: storages.GoogleCloudStorage, file_name: str, compression: Compre
+ shlex.quote(storage.build_uri(file_name)))


@write_file_command.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str, compression: Compression = Compression.NONE):
if compression not in [Compression.NONE, Compression.GZIP]:
raise ValueError(f'Only compression NONE and GZIP is supported from storage type "{storage.__class__.__name__}"')

azlogin_env = ('AZCOPY_AUTO_LOGIN_TYPE=SPN '
+ f'AZCOPY_TENANT_ID="{storage.spa_tenant}" '
+ f'AZCOPY_SPA_APPLICATION_ID="{storage.spa_application}" '
+ f'AZCOPY_SPA_CLIENT_SECRET="{storage.spa_client_secret}" '
) if not storage.sas else ''

return ((f'gzip \\\n | ' if compression == Compression.GZIP else '')
+ f'{azlogin_env}azcopy cp '
+ shlex.quote(storage.build_uri(file_name))
+ ' --from-to PipeBlob')


# -----------------------------------------------------------------------------


Expand Down Expand Up @@ -202,3 +237,19 @@ def __(storage: storages.GoogleCloudStorage, file_name: str, force: bool = True,
+ ('-f ' if force else '')
+ ('-r ' if recursive else '')
+ shlex.quote(storage.build_uri(file_name)))


@delete_file_command.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str, force: bool = True, recursive: bool = False):
if storage.sas and not force and not recursive:
return (f'curl -sf -X DELETE {shlex.quote(storage.build_uri(path=file_name))}')

azlogin_env = ('AZCOPY_AUTO_LOGIN_TYPE=SPN '
+ f'AZCOPY_TENANT_ID="{storage.spa_tenant}" '
+ f'AZCOPY_SPA_APPLICATION_ID="{storage.spa_application}" '
+ f'AZCOPY_SPA_CLIENT_SECRET="{storage.spa_client_secret}" '
) if not storage.sas else ''

return (f'{azlogin_env}azcopy rm '
+ shlex.quote(storage.build_uri(file_name))
+ (' --recursive=true' if recursive else ''))
52 changes: 52 additions & 0 deletions mara_storage/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,55 @@ def base_uri(self):
def build_uri(self, path: str):
"""Returns a URI for a path on the storage"""
return f"{self.base_uri}/{path}"

class AzureStorage(Storage):
def __init__(self, account_name: str, container_name: str, sas: str = None,
storage_type: str = 'blob', account_key: str = None,
spa_tenant: str = None, spa_application: str = None, spa_client_secret: str = None):
"""
Connection information for a Azure sstorage bucket

Possible authentication methods:
SAS => "Shared access signature", see https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview
SPA => "Service principal"

Args:
account_name: The storage account name
container_name: The container name within the storage
storage_type: The storage type. Supports 'blob' or 'dfs'.
sas: The SAS token
account_key: The storage account key
spa_tenant: The service principal tenant id
spa_application: The service principal application id
spa_client_secret: The service principal client secret
"""
if sas is None and account_key is None and spa_client_secret is None:
raise ValueError('You have to provide either parameter sas, account_key or spa_client_secret for type AzureStorage.')
self.account_name = account_name
self.account_key = account_key
self.container_name = container_name
self.storage_type = storage_type
self.sas = (sas[1:] if sas.startswith('?') else sas) if sas else None
self.spa_tenant = spa_tenant
self.spa_application = spa_application
self.spa_client_secret = spa_client_secret

@property
def base_uri(self):
return f'https://{self.account_name}.{self.storage_type}.core.windows.net/{self.container_name}'

def build_uri(self, path: str):
"""Returns a URI for a path on the storage"""
if path and not path.startswith('/'):
path = '/' + path
return (f"{self.base_uri}{path}"
+ (f'?{self.sas}' if self.sas else ''))

def connection_string(self):
# see https://docs.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
if self.account_key:
return f'DefaultEndpointsProtocol=https;AccountName={self.account_name};AccountKey={self.account_key}'
else:
return ('DefaultEndpointsProtocol=https'
+ f';BlobEndpoint=https://{self.account_name}.{self.storage_type}.core.windows.net'
+ f';SharedAccessSignature={self.sas}')
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ install_requires =
test = pytest
sftp = pysftp
google-cloud-storage = google-cloud-storage; google-oauth
azure-blob = azure-storage-blob
8 changes: 7 additions & 1 deletion tests/local_config.py.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ SFTP_PUBLIC_IDENTITY_FILE = None
GCS_PROJECT_ID = ''

# required for GCS client
GCS_SERVICE_ACCOUNT_FILE = ''
GCS_SERVICE_ACCOUNT_FILE = ''


# the Azure Storage config used to perform the test
AZ_STORAGE_ACCOUNT_NAME = ''
AZ_STORAGE_TYPE = 'blob'
AZ_STORAGE_SAS = None
Loading