From 8dcdb4b6ae5df5c2d44805ef4d4509de308045ed Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 10 Apr 2023 13:02:21 +0200 Subject: [PATCH] feat: allow passing http client options --- examples/object_store.ipynb | 33 +++ object-store/object_store/__init__.py | 1 + object-store/object_store/_internal.pyi | 99 ++++++++- object-store/src/file.rs | 13 +- object-store/src/lib.rs | 267 +++++++++++++++++++++++- poetry.lock | 6 +- 6 files changed, 408 insertions(+), 11 deletions(-) diff --git a/examples/object_store.ipynb b/examples/object_store.ipynb index 1994259..5be4d50 100644 --- a/examples/object_store.ipynb +++ b/examples/object_store.ipynb @@ -139,6 +139,39 @@ "dataset_part = ds.dataset(\"/partitioned\", format=\"parquet\", filesystem=store, partitioning=partitioning)\n", "dataset_part.schema\n" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from object_store import ObjectStore\n", + "\n", + "store = ObjectStore(\"az://delta-rs\", options={\"account_name\": \"mlfusiondev\", \"use_azure_cli\": \"true\"})\n", + "\n", + "store.list()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.fs as pa_fs\n", + "from object_store.arrow import ArrowFileSystemHandler\n", + "from object_store import ClientOptions\n", + "import os\n", + "\n", + "storage_options = {\n", + " \"account_name\": os.environ[\"AZURE_STORAGE_ACCOUNT_NAME\"],\n", + " \"account_key\": os.environ[\"AZURE_STORAGE_ACCOUNT_KEY\"],\n", + "}\n", + "\n", + "filesystem = pa_fs.PyFileSystem(ArrowFileSystemHandler(\"adl://simple\", storage_options, ClientOptions()))\n", + "filesystem.get_file_info([\"part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet\"])" + ] } ], "metadata": { diff --git a/object-store/object_store/__init__.py b/object-store/object_store/__init__.py index 28697ae..9ac8283 100644 --- a/object-store/object_store/__init__.py +++ b/object-store/object_store/__init__.py @@ -3,6 +3,7 @@ # NOTE aliasing the imports with 'as' makes them public in the eyes # of static code checkers. Thus we avoid listing them with __all__ = ... +from ._internal import ClientOptions as ClientOptions from ._internal import ListResult as ListResult from ._internal import ObjectMeta as ObjectMeta from ._internal import ObjectStore as _ObjectStore diff --git a/object-store/object_store/_internal.pyi b/object-store/object_store/_internal.pyi index 27e97b9..c3d6bd3 100644 --- a/object-store/object_store/_internal.pyi +++ b/object-store/object_store/_internal.pyi @@ -32,10 +32,103 @@ class ListResult: def objects(self) -> list[ObjectMeta]: """Object metadata for the listing""" +class ClientOptions: + """HTTP client configuration for remote object stores""" + + @property + def user_agent(self) -> str | None: + """Sets the User-Agent header to be used by this client + + Default is based on the version of this crate + """ + @property + def default_content_type(self) -> str | None: + """Set the default CONTENT_TYPE for uploads""" + @property + def proxy_url(self) -> str | None: + """Set an HTTP proxy to use for requests""" + @property + def allow_http(self) -> bool: + """Sets what protocol is allowed. + + If `allow_http` is : + * false (default): Only HTTPS ise allowed + * true: HTTP and HTTPS are allowed + """ + @property + def allow_insecure(self) -> bool: + """Allows connections to invalid SSL certificates + * false (default): Only valid HTTPS certificates are allowed + * true: All HTTPS certificates are allowed + + # Warning + + You should think very carefully before using this method. If + invalid certificates are trusted, *any* certificate for *any* site + will be trusted for use. This includes expired certificates. This + introduces significant vulnerabilities, and should only be used + as a last resort or for testing. + """ + @property + def timeout(self) -> int: + """Set a request timeout (seconds) + + The timeout is applied from when the request starts connecting until the + response body has finished + """ + @property + def connect_timeout(self) -> int: + """Set a timeout (seconds) for only the connect phase of a Client""" + @property + def pool_idle_timeout(self) -> int: + """Set the pool max idle timeout (seconds) + + This is the length of time an idle connection will be kept alive + + Default is 90 seconds + """ + @property + def pool_max_idle_per_host(self) -> int: + """Set the maximum number of idle connections per host + + Default is no limit""" + @property + def http2_keep_alive_interval(self) -> int: + """Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive. + + Default is disabled + """ + @property + def http2_keep_alive_timeout(self) -> int: + """Sets a timeout for receiving an acknowledgement of the keep-alive ping. + + If the ping is not acknowledged within the timeout, the connection will be closed. + Does nothing if http2_keep_alive_interval is disabled. + + Default is disabled + """ + @property + def http2_keep_alive_while_idle(self) -> bool: + """Enable HTTP2 keep alive pings for idle connections + + If disabled, keep-alive pings are only sent while there are open request/response + streams. If enabled, pings are also sent when no streams are active + + Default is disabled + """ + @property + def http1_only(self) -> bool: + """Only use http1 connections""" + @property + def http2_only(self) -> bool: + """Only use http2 connections""" + class ObjectStore: """A uniform API for interacting with object storage services and local files.""" - def __init__(self, root: str, options: dict[str, str] | None = None) -> None: ... + def __init__( + self, root: str, options: dict[str, str] | None = None, client_options: ClientOptions | None = None + ) -> None: ... def get(self, location: Path) -> bytes: """Return the bytes that are stored at the specified location.""" def get_range(self, location: Path, start: int, length: int) -> bytes: @@ -115,7 +208,9 @@ class ObjectOutputStream: class ArrowFileSystemHandler: """Implementation of pyarrow.fs.FileSystemHandler for use with pyarrow.fs.PyFileSystem""" - def __init__(self, root: str, options: dict[str, str] | None = None) -> None: ... + def __init__( + self, root: str, options: dict[str, str] | None = None, client_options: ClientOptions | None = None + ) -> None: ... def copy_file(self, src: str, dst: str) -> None: """Copy a file. diff --git a/object-store/src/file.rs b/object-store/src/file.rs index ba09bdf..6cbe104 100644 --- a/object-store/src/file.rs +++ b/object-store/src/file.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::builder::ObjectStoreBuilder; use crate::utils::{delete_dir, walk_tree}; -use crate::ObjectStoreError; +use crate::{ObjectStoreError, PyClientOptions}; use object_store::path::Path; use object_store::{DynObjectStore, Error as InnerObjectStoreError, ListResult, MultipartId}; @@ -25,11 +25,18 @@ pub struct ArrowFileSystemHandler { #[pymethods] impl ArrowFileSystemHandler { #[new] - #[pyo3(signature = (root, options = None))] - fn new(root: String, options: Option>) -> PyResult { + #[pyo3(signature = (root, options = None, client_options = None))] + fn new( + root: String, + options: Option>, + client_options: Option, + ) -> PyResult { + let client_options = client_options.unwrap_or_default(); let inner = ObjectStoreBuilder::new(root.clone()) .with_path_as_prefix(true) .with_options(options.clone().unwrap_or_default()) + .with_client_options(client_options.client_options()?) + .with_retry_config(client_options.retry_config()?) .build() .map_err(ObjectStoreError::from)?; Ok(Self { diff --git a/object-store/src/lib.rs b/object-store/src/lib.rs index 5a14fc8..6ef833a 100644 --- a/object-store/src/lib.rs +++ b/object-store/src/lib.rs @@ -5,12 +5,16 @@ mod utils; use std::collections::HashMap; use std::fmt; use std::sync::Arc; +use std::time::Duration; use crate::file::{ArrowFileSystemHandler, ObjectInputFile, ObjectOutputStream}; use crate::utils::{flatten_list_stream, get_bytes}; use object_store::path::{Error as PathError, Path}; -use object_store::{DynObjectStore, Error as InnerObjectStoreError, ListResult, ObjectMeta}; +use object_store::{ + BackoffConfig, ClientOptions, DynObjectStore, Error as InnerObjectStoreError, ListResult, + ObjectMeta, RetryConfig, +}; use pyo3::exceptions::{ PyException, PyFileExistsError, PyFileNotFoundError, PyNotImplementedError, }; @@ -28,6 +32,7 @@ pub enum ObjectStoreError { IO(std::io::Error), Task(tokio::task::JoinError), Path(PathError), + InputValue(String), } impl fmt::Display for ObjectStoreError { @@ -39,6 +44,7 @@ impl fmt::Display for ObjectStoreError { ObjectStoreError::IO(e) => write!(f, "IOError error {:?}", e), ObjectStoreError::Task(e) => write!(f, "Task error {:?}", e), ObjectStoreError::Common(e) => write!(f, "{}", e), + ObjectStoreError::InputValue(e) => write!(f, "Invalid input value: {}", e), } } } @@ -212,6 +218,253 @@ impl From for PyListResult { } } +#[pyclass(name = "ClientOptions")] +#[derive(Debug, Clone, Default)] +pub struct PyClientOptions { + #[pyo3(get, set)] + user_agent: Option, + #[pyo3(get, set)] + content_type_map: HashMap, + #[pyo3(get, set)] + default_content_type: Option, + // default_headers: Option, + #[pyo3(get, set)] + proxy_url: Option, + #[pyo3(get, set)] + allow_http: bool, + #[pyo3(get, set)] + allow_insecure: bool, + #[pyo3(get, set)] + timeout: Option, + #[pyo3(get, set)] + connect_timeout: Option, + #[pyo3(get, set)] + pool_idle_timeout: Option, + #[pyo3(get, set)] + pool_max_idle_per_host: Option, + #[pyo3(get, set)] + http2_keep_alive_interval: Option, + #[pyo3(get, set)] + http2_keep_alive_timeout: Option, + #[pyo3(get, set)] + http2_keep_alive_while_idle: bool, + #[pyo3(get, set)] + http1_only: bool, + #[pyo3(get, set)] + http2_only: bool, + #[pyo3(get, set)] + retry_init_backoff: Option, + #[pyo3(get, set)] + retry_max_backoff: Option, + #[pyo3(get, set)] + retry_backoff_base: Option, + #[pyo3(get, set)] + retry_max_retries: Option, + #[pyo3(get, set)] + retry_timeout: Option, +} + +impl PyClientOptions { + fn client_options(&self) -> Result { + let mut options = ClientOptions::new() + .with_allow_http(self.allow_http) + .with_allow_invalid_certificates(self.allow_insecure); + if let Some(user_agent) = &self.user_agent { + options = options.with_user_agent( + user_agent + .clone() + .try_into() + .map_err(|_| ObjectStoreError::InputValue(user_agent.into()))?, + ); + } + if let Some(default_content_type) = &self.default_content_type { + options = options.with_default_content_type(default_content_type); + } + if let Some(proxy_url) = &self.proxy_url { + options = options.with_proxy_url(proxy_url); + } + if let Some(timeout) = self.timeout { + options = options.with_timeout(Duration::from_secs(timeout)); + } + if let Some(connect_timeout) = self.connect_timeout { + options = options.with_connect_timeout(Duration::from_secs(connect_timeout)); + } + if let Some(pool_idle_timeout) = self.pool_idle_timeout { + options = options.with_pool_idle_timeout(Duration::from_secs(pool_idle_timeout)); + } + if let Some(pool_max_idle_per_host) = self.pool_max_idle_per_host { + options = options.with_pool_max_idle_per_host(pool_max_idle_per_host); + } + if let Some(http2_keep_alive_interval) = self.http2_keep_alive_interval { + options = options + .with_http2_keep_alive_interval(Duration::from_secs(http2_keep_alive_interval)); + } + if let Some(http2_keep_alive_timeout) = self.http2_keep_alive_timeout { + options = options + .with_http2_keep_alive_timeout(Duration::from_secs(http2_keep_alive_timeout)); + } + if self.http2_keep_alive_while_idle { + options = options.with_http2_keep_alive_while_idle(); + } + if self.http1_only { + options = options.with_http1_only(); + } + if self.http2_only { + options = options.with_http2_only(); + } + Ok(options) + } + + fn retry_config(&self) -> Result { + let mut backoff = BackoffConfig::default(); + if let Some(init_backoff) = self.retry_init_backoff { + backoff.init_backoff = Duration::from_secs(init_backoff); + } + if let Some(max_backoff) = self.retry_max_backoff { + backoff.max_backoff = Duration::from_secs(max_backoff); + } + if let Some(backoff_base) = self.retry_backoff_base { + backoff.base = backoff_base; + } + let mut config = RetryConfig::default(); + config.backoff = backoff; + if let Some(max_retries) = self.retry_max_retries { + config.max_retries = max_retries; + } + if let Some(timeout) = self.retry_timeout { + config.retry_timeout = Duration::from_secs(timeout); + } + Ok(config) + } +} + +impl TryFrom for ClientOptions { + type Error = ObjectStoreError; + + fn try_from(value: PyClientOptions) -> Result { + let mut options = ClientOptions::new() + .with_allow_http(value.allow_http) + .with_allow_invalid_certificates(value.allow_insecure); + if let Some(user_agent) = value.user_agent { + options = options.with_user_agent( + user_agent + .clone() + .try_into() + .map_err(|_| ObjectStoreError::InputValue(user_agent))?, + ); + } + if let Some(default_content_type) = value.default_content_type { + options = options.with_default_content_type(default_content_type); + } + if let Some(proxy_url) = value.proxy_url { + options = options.with_proxy_url(proxy_url); + } + if let Some(timeout) = value.timeout { + options = options.with_timeout(Duration::from_secs(timeout)); + } + if let Some(connect_timeout) = value.connect_timeout { + options = options.with_connect_timeout(Duration::from_secs(connect_timeout)); + } + if let Some(pool_idle_timeout) = value.pool_idle_timeout { + options = options.with_pool_idle_timeout(Duration::from_secs(pool_idle_timeout)); + } + if let Some(pool_max_idle_per_host) = value.pool_max_idle_per_host { + options = options.with_pool_max_idle_per_host(pool_max_idle_per_host); + } + if let Some(http2_keep_alive_interval) = value.http2_keep_alive_interval { + options = options + .with_http2_keep_alive_interval(Duration::from_secs(http2_keep_alive_interval)); + } + if let Some(http2_keep_alive_timeout) = value.http2_keep_alive_timeout { + options = options + .with_http2_keep_alive_timeout(Duration::from_secs(http2_keep_alive_timeout)); + } + if value.http2_keep_alive_while_idle { + options = options.with_http2_keep_alive_while_idle(); + } + if value.http1_only { + options = options.with_http1_only(); + } + if value.http2_only { + options = options.with_http2_only(); + } + Ok(options) + } +} + +#[pymethods] +impl PyClientOptions { + #[new] + #[pyo3(signature = ( + user_agent = None, + content_type_map = None, + default_content_type = None, + proxy_url = None, + allow_http = false, + allow_insecure = false, + timeout = None, + connect_timeout = None, + pool_idle_timeout = None, + pool_max_idle_per_host = None, + http2_keep_alive_interval = None, + http2_keep_alive_timeout = None, + http2_keep_alive_while_idle = false, + http1_only = false, + http2_only = false, + retry_init_backoff = None, + retry_max_backoff = None, + retry_backoff_base = None, + retry_max_retries = None, + retry_timeout = None, + ))] + /// Create a new ObjectStore instance + fn new( + user_agent: Option, + content_type_map: Option>, + default_content_type: Option, + proxy_url: Option, + allow_http: bool, + allow_insecure: bool, + timeout: Option, + connect_timeout: Option, + pool_idle_timeout: Option, + pool_max_idle_per_host: Option, + http2_keep_alive_interval: Option, + http2_keep_alive_timeout: Option, + http2_keep_alive_while_idle: bool, + http1_only: bool, + http2_only: bool, + retry_init_backoff: Option, + retry_max_backoff: Option, + retry_backoff_base: Option, + retry_max_retries: Option, + retry_timeout: Option, + ) -> Self { + Self { + user_agent, + content_type_map: content_type_map.unwrap_or_default(), + default_content_type, + proxy_url, + allow_http, + allow_insecure, + timeout, + connect_timeout, + pool_idle_timeout, + pool_max_idle_per_host, + http2_keep_alive_interval, + http2_keep_alive_timeout, + http2_keep_alive_while_idle, + http1_only, + http2_only, + retry_init_backoff, + retry_max_backoff, + retry_backoff_base, + retry_max_retries, + retry_timeout, + } + } +} + #[pyclass(name = "ObjectStore", subclass)] #[derive(Debug, Clone)] /// A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage, @@ -226,12 +479,19 @@ struct PyObjectStore { #[pymethods] impl PyObjectStore { #[new] - #[pyo3(signature = (root, options = None))] + #[pyo3(signature = (root, options = None, client_options = None))] /// Create a new ObjectStore instance - fn new(root: String, options: Option>) -> PyResult { + fn new( + root: String, + options: Option>, + client_options: Option, + ) -> PyResult { + let client_options = client_options.unwrap_or_default(); let inner = ObjectStoreBuilder::new(root.clone()) .with_path_as_prefix(true) .with_options(options.clone().unwrap_or_default()) + .with_client_options(client_options.client_options()?) + .with_retry_config(client_options.retry_config()?) .build() .map_err(ObjectStoreError::from)?; Ok(Self { @@ -386,6 +646,7 @@ impl PyObjectStore { #[pymodule] fn _internal(_py: Python, m: &PyModule) -> PyResult<()> { // Register the python classes + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/poetry.lock b/poetry.lock index 56e7e0a..265b5bb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -594,14 +594,14 @@ files = [ [[package]] name = "importlib-metadata" -version = "6.2.1" +version = "6.3.0" description = "Read metadata from Python packages" category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "importlib_metadata-6.2.1-py3-none-any.whl", hash = "sha256:f65e478a7c2177bd19517a3a15dac094d253446d8690c5f3e71e735a04312374"}, - {file = "importlib_metadata-6.2.1.tar.gz", hash = "sha256:5a66966b39ff1c14ef5b2d60c1d842b0141fefff0f4cc6365b4bc9446c652807"}, + {file = "importlib_metadata-6.3.0-py3-none-any.whl", hash = "sha256:8f8bd2af397cf33bd344d35cfe7f489219b7d14fc79a3f854b75b8417e9226b0"}, + {file = "importlib_metadata-6.3.0.tar.gz", hash = "sha256:23c2bcae4762dfb0bbe072d358faec24957901d75b6c4ab11172c0c982532402"}, ] [package.dependencies]