Skip to content

Commit

Permalink
Add azure storage support (#11)
Browse files Browse the repository at this point in the history
* add Azure Storage WIP

* support Azure Storage connection strings with account key

* improve azure storage implementation, add AzureStorageClient

* update docs

* support recursive deletion
  • Loading branch information
leo-schick authored Feb 22, 2023
1 parent 76f30bb commit 40a764f
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The following **command line clients** are used to access the various databases:
| Local storage | unix shell | Included in standard distributions. |
| SFTP storage | `sftp`, `curl` | |
| Google Cloud Storage | `gsutil` | From [https://cloud.google.com/storage/docs/gsutil_install](https://cloud.google.com/storage/docs/gsutil_install). |
| Azure Storage | `azcopy` | From [https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10)

 

Expand Down
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ This section focuses on the supported storages.
:maxdepth: 2

storages-overview
storages/azure
storages/gcs
storages/local
storages/sftp
storages/gcs


API Reference
Expand Down
13 changes: 9 additions & 4 deletions docs/storages-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ Overview

The following storages are supported

| Storage | Configuration class |
| ---------------------- | ------------------- |
| Local bash | LocalStorage |
| [Google Cloud Storage] | GoogleCloudStorage |
| Storage | Configuration class |
| ------------------------- | ------------------- |
| Local bash | LocalStorage |
| [Google Cloud Storage] | GoogleCloudStorage |
| [Azure Blob Storage] | AzureStorage |
| [Azure Data Lake Storage] | AzureStorage |

[Google Cloud Storage]: https://cloud.google.com/storage
[Azure Blob Storage]: https://azure.microsoft.com/en-us/products/storage/blobs
[Azure Data Lake Storage]: https://azure.microsoft.com/en-us/products/storage/data-lake-storage/


Function support matrix
Expand All @@ -18,6 +22,7 @@ Function support matrix
| --------------------- | ---- | ----- | ------ | ------ |
| LocalStorage | Yes | Yes | Yes | Yes
| GoogleCloudStorage | Yes | Yes | Yes | Yes
| AzureStorage | Yes | Yes | Yes | Yes

```{note}
A `Move` operation is not implemented by design. Most of the blob storages do not
Expand Down
75 changes: 75 additions & 0 deletions docs/storages/azure.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
Google Cloud Storage
====================

Accessing a Azure Blob Storage (GCS) with the shell tool `azcopy`.

Installation
------------

You need to install `azcopy`. Take a look at `Get started with Azcopy <https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10>`_.


Configuration examples
----------------------

```{note}
Currently some of the functions require a SAS token, and some of the functions
require a account key. It is recommended to provide both a SAS token
and a account key.
```

.. tabs::

.. group-tab:: SAS token

.. code-block:: python
import pathlib
import mara_storage.storages
mara_storage.config.storages = lambda: {
'data': mara_storage.storages.AzureStorage(
account_name='account-name',
container_name='container-name',
sas='sp=racwdlm&st=2022-05-11T10:04:05Z&se=2023-05-11T18:04:05Z&spr=https&sv=2020-08-04&sr=c&sig=u7tqxugyv5MbyrtFdEUp22tnou4wifBoUfIaLDazeRT%3D'),
# optional
storage_type = 'dfs' # use a dfs client instead of 'blob' (default value)
}
.. group-tab:: Account key

.. code-block:: python
import pathlib
import mara_storage.storages
mara_storage.config.storages = lambda: {
'data': mara_storage.storages.AzureStorage(
account_name='account-name',
container_name='container-name',
account_key='<key>',
# optional
storage_type = 'dfs' # use a dfs client instead of 'blob' (default value)
),
}
|
|
API reference
-------------

This section contains database specific API in the module.


Configuration
~~~~~~~~~~~~~

.. module:: mara_storage.storages
:noindex:

.. autoclass:: AzureStorage
:special-members: __init__
:inherited-members:
:members:
16 changes: 16 additions & 0 deletions mara_storage/.scripts/install.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

install-azcopy:
# install azcopy in the virtual environment
# see also: https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10

# download azcopy
wget https://aka.ms/downloadazcopy-v10-linux
tar -xvf downloadazcopy-v10-linux

# install
rm -f .venv/bin/azcopy
mv ./azcopy_linux_amd64_*/azcopy .venv/bin/azcopy

# clean up
rm downloadazcopy-v10-linux
rm -rf ./azcopy_linux_amd64_*/ downloadazcopy-v10-linux
56 changes: 56 additions & 0 deletions mara_storage/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import datetime

from mara_storage.client import StorageClient
from . import storages

from azure.storage.blob import BlobClient, BlobServiceClient


def init_client(storage: storages.AzureStorage, path: str = None) -> BlobClient:
client = BlobClient.from_blob_url(storage.build_uri(path))
return client

def init_service_client(storage: storages.AzureStorage, path: str = None) -> BlobServiceClient:
client = BlobServiceClient.from_connection_string(storage.connection_string())
return client


class AzureStorageClient(StorageClient):
def __init__(self, storage: storages.AzureStorage):
super().__init__(storage)

self.__blob_service_client: BlobServiceClient = None
self.__container_client = None

@property
def _blob_service_client(self):
if not self.__blob_service_client:
self.__blob_service_client = init_service_client(self._storage)

return self.__blob_service_client

@property
def _container_client(self):
if not self.__container_client:
self.__container_client = self._blob_service_client.get_container_client(self._storage.container_name)

return self.__container_client

def creation_timestamp(self, path: str) -> datetime.datetime:
blob_client = self._container_client.get_blob_client(path)
properties = blob_client.get_blob_properties()

return properties.creation_time

def last_modification_timestamp(self, path: str) -> datetime.datetime:
blob_client = self._container_client.get_blob_client(path)
properties = blob_client.get_blob_properties()

return properties.last_modified

def iterate_files(self, file_pattern: str):
blobs = self._container_client.list_blobs(name_starts_with=file_pattern)

for blob in blobs:
if blob:
yield blob.name
5 changes: 5 additions & 0 deletions mara_storage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ def __(storage: storages.LocalStorage):
def __(storage: storages.GoogleCloudStorage):
from .google_cloud_storage import GoogleCloudStorageClient
return GoogleCloudStorageClient

@storage_client_type.register(storages.AzureStorage)
def __(storage: storages.AzureStorage):
from .azure import AzureStorageClient
return AzureStorageClient
7 changes: 7 additions & 0 deletions mara_storage/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ def __(storage: storages.GoogleCloudStorage, file_name: str):

(exitcode, _) = subprocess.getstatusoutput(command)
return exitcode == 0


@file_exists.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str):
from . import azure
client = azure.init_client(storage, path=file_name)
return client.exists()
20 changes: 20 additions & 0 deletions mara_storage/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ def __(storage: storages.GoogleCloudStorage):
assert exitcode == 0


@ensure_storage.register(storages.AzureStorage)
def __(storage: storages.AzureStorage):
from . import azure
client = azure.init_service_client(storage)
container_client = client.get_container_client(container=storage.container_name)

if not container_client.exists():
container_client.create_container()


# -----------------------------------------------------------------------------


Expand Down Expand Up @@ -93,3 +103,13 @@ def __(storage: storages.GoogleCloudStorage, force: bool = False):
if exitcode != 0:
raise Exception(f'An error occured while dropping a GCS bucket. Stdout:\n{stdout}')
assert exitcode == 0


@drop_storage.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, force: bool = False):
from . import azure
client = azure.init_service_client(storage)
container_client = client.get_container_client(container=storage.container_name)

if container_client.exists():
container_client.delete_container()
51 changes: 51 additions & 0 deletions mara_storage/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ def __(storage: storages.GoogleCloudStorage, file_name: str, compression: Compre
+ (f'\\\n | {uncompressor(compression)} - ' if compression != Compression.NONE else ''))


@read_file_command.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str, compression: Compression = Compression.NONE):
if storage.sas:
return (f'curl -sf {shlex.quote(storage.build_uri(path=file_name))}'
+ (f'\\\n | {uncompressor(compression)} - ' if compression != Compression.NONE else ''))

azlogin_env = ('AZCOPY_AUTO_LOGIN_TYPE=SPN '
+ f'AZCOPY_TENANT_ID="{storage.spa_tenant}" '
+ f'AZCOPY_SPA_APPLICATION_ID="{storage.spa_application}" '
+ f'AZCOPY_SPA_CLIENT_SECRET="{storage.spa_client_secret}" '
) if not storage.sas else ''

return (f'{azlogin_env}azcopy cp '
+ shlex.quote(storage.build_uri(file_name))
+ ' --from-to BlobPipe'
+ (f'\\\n | {uncompressor(compression)} - ' if compression != Compression.NONE else ''))


# -----------------------------------------------------------------------------


Expand Down Expand Up @@ -135,6 +153,23 @@ def __(storage: storages.GoogleCloudStorage, file_name: str, compression: Compre
+ shlex.quote(storage.build_uri(file_name)))


@write_file_command.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str, compression: Compression = Compression.NONE):
if compression not in [Compression.NONE, Compression.GZIP]:
raise ValueError(f'Only compression NONE and GZIP is supported from storage type "{storage.__class__.__name__}"')

azlogin_env = ('AZCOPY_AUTO_LOGIN_TYPE=SPN '
+ f'AZCOPY_TENANT_ID="{storage.spa_tenant}" '
+ f'AZCOPY_SPA_APPLICATION_ID="{storage.spa_application}" '
+ f'AZCOPY_SPA_CLIENT_SECRET="{storage.spa_client_secret}" '
) if not storage.sas else ''

return ((f'gzip \\\n | ' if compression == Compression.GZIP else '')
+ f'{azlogin_env}azcopy cp '
+ shlex.quote(storage.build_uri(file_name))
+ ' --from-to PipeBlob')


# -----------------------------------------------------------------------------


Expand Down Expand Up @@ -202,3 +237,19 @@ def __(storage: storages.GoogleCloudStorage, file_name: str, force: bool = True,
+ ('-f ' if force else '')
+ ('-r ' if recursive else '')
+ shlex.quote(storage.build_uri(file_name)))


@delete_file_command.register(storages.AzureStorage)
def __(storage: storages.AzureStorage, file_name: str, force: bool = True, recursive: bool = False):
if storage.sas and not force and not recursive:
return (f'curl -sf -X DELETE {shlex.quote(storage.build_uri(path=file_name))}')

azlogin_env = ('AZCOPY_AUTO_LOGIN_TYPE=SPN '
+ f'AZCOPY_TENANT_ID="{storage.spa_tenant}" '
+ f'AZCOPY_SPA_APPLICATION_ID="{storage.spa_application}" '
+ f'AZCOPY_SPA_CLIENT_SECRET="{storage.spa_client_secret}" '
) if not storage.sas else ''

return (f'{azlogin_env}azcopy rm '
+ shlex.quote(storage.build_uri(file_name))
+ (' --recursive=true' if recursive else ''))
52 changes: 52 additions & 0 deletions mara_storage/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,55 @@ def base_uri(self):
def build_uri(self, path: str):
"""Returns a URI for a path on the storage"""
return f"{self.base_uri}/{path}"

class AzureStorage(Storage):
def __init__(self, account_name: str, container_name: str, sas: str = None,
storage_type: str = 'blob', account_key: str = None,
spa_tenant: str = None, spa_application: str = None, spa_client_secret: str = None):
"""
Connection information for a Azure sstorage bucket
Possible authentication methods:
SAS => "Shared access signature", see https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview
SPA => "Service principal"
Args:
account_name: The storage account name
container_name: The container name within the storage
storage_type: The storage type. Supports 'blob' or 'dfs'.
sas: The SAS token
account_key: The storage account key
spa_tenant: The service principal tenant id
spa_application: The service principal application id
spa_client_secret: The service principal client secret
"""
if sas is None and account_key is None and spa_client_secret is None:
raise ValueError('You have to provide either parameter sas, account_key or spa_client_secret for type AzureStorage.')
self.account_name = account_name
self.account_key = account_key
self.container_name = container_name
self.storage_type = storage_type
self.sas = (sas[1:] if sas.startswith('?') else sas) if sas else None
self.spa_tenant = spa_tenant
self.spa_application = spa_application
self.spa_client_secret = spa_client_secret

@property
def base_uri(self):
return f'https://{self.account_name}.{self.storage_type}.core.windows.net/{self.container_name}'

def build_uri(self, path: str):
"""Returns a URI for a path on the storage"""
if path and not path.startswith('/'):
path = '/' + path
return (f"{self.base_uri}{path}"
+ (f'?{self.sas}' if self.sas else ''))

def connection_string(self):
# see https://docs.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
if self.account_key:
return f'DefaultEndpointsProtocol=https;AccountName={self.account_name};AccountKey={self.account_key}'
else:
return ('DefaultEndpointsProtocol=https'
+ f';BlobEndpoint=https://{self.account_name}.{self.storage_type}.core.windows.net'
+ f';SharedAccessSignature={self.sas}')
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ install_requires =
test = pytest
sftp = pysftp
google-cloud-storage = google-cloud-storage; google-oauth
azure-blob = azure-storage-blob
8 changes: 7 additions & 1 deletion tests/local_config.py.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ SFTP_PUBLIC_IDENTITY_FILE = None
GCS_PROJECT_ID = ''

# required for GCS client
GCS_SERVICE_ACCOUNT_FILE = ''
GCS_SERVICE_ACCOUNT_FILE = ''


# the Azure Storage config used to perform the test
AZ_STORAGE_ACCOUNT_NAME = ''
AZ_STORAGE_TYPE = 'blob'
AZ_STORAGE_SAS = None
Loading

0 comments on commit 40a764f

Please sign in to comment.