Skip to content

Commit

Permalink
feat: allow passing http client options
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Apr 10, 2023
1 parent b71b7f0 commit 8dcdb4b
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 11 deletions.
33 changes: 33 additions & 0 deletions examples/object_store.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,39 @@
"dataset_part = ds.dataset(\"/partitioned\", format=\"parquet\", filesystem=store, partitioning=partitioning)\n",
"dataset_part.schema\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from object_store import ObjectStore\n",
"\n",
"store = ObjectStore(\"az://delta-rs\", options={\"account_name\": \"mlfusiondev\", \"use_azure_cli\": \"true\"})\n",
"\n",
"store.list()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.fs as pa_fs\n",
"from object_store.arrow import ArrowFileSystemHandler\n",
"from object_store import ClientOptions\n",
"import os\n",
"\n",
"storage_options = {\n",
" \"account_name\": os.environ[\"AZURE_STORAGE_ACCOUNT_NAME\"],\n",
" \"account_key\": os.environ[\"AZURE_STORAGE_ACCOUNT_KEY\"],\n",
"}\n",
"\n",
"filesystem = pa_fs.PyFileSystem(ArrowFileSystemHandler(\"adl://simple\", storage_options, ClientOptions()))\n",
"filesystem.get_file_info([\"part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet\"])"
]
}
],
"metadata": {
Expand Down
1 change: 1 addition & 0 deletions object-store/object_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# NOTE aliasing the imports with 'as' makes them public in the eyes
# of static code checkers. Thus we avoid listing them with __all__ = ...
from ._internal import ClientOptions as ClientOptions
from ._internal import ListResult as ListResult
from ._internal import ObjectMeta as ObjectMeta
from ._internal import ObjectStore as _ObjectStore
Expand Down
99 changes: 97 additions & 2 deletions object-store/object_store/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,103 @@ class ListResult:
def objects(self) -> list[ObjectMeta]:
"""Object metadata for the listing"""

class ClientOptions:
"""HTTP client configuration for remote object stores"""

@property
def user_agent(self) -> str | None:
"""Sets the User-Agent header to be used by this client
Default is based on the version of this crate
"""
@property
def default_content_type(self) -> str | None:
"""Set the default CONTENT_TYPE for uploads"""
@property
def proxy_url(self) -> str | None:
"""Set an HTTP proxy to use for requests"""
@property
def allow_http(self) -> bool:
"""Sets what protocol is allowed.
If `allow_http` is :
* false (default): Only HTTPS ise allowed
* true: HTTP and HTTPS are allowed
"""
@property
def allow_insecure(self) -> bool:
"""Allows connections to invalid SSL certificates
* false (default): Only valid HTTPS certificates are allowed
* true: All HTTPS certificates are allowed
# Warning
You should think very carefully before using this method. If
invalid certificates are trusted, *any* certificate for *any* site
will be trusted for use. This includes expired certificates. This
introduces significant vulnerabilities, and should only be used
as a last resort or for testing.
"""
@property
def timeout(self) -> int:
"""Set a request timeout (seconds)
The timeout is applied from when the request starts connecting until the
response body has finished
"""
@property
def connect_timeout(self) -> int:
"""Set a timeout (seconds) for only the connect phase of a Client"""
@property
def pool_idle_timeout(self) -> int:
"""Set the pool max idle timeout (seconds)
This is the length of time an idle connection will be kept alive
Default is 90 seconds
"""
@property
def pool_max_idle_per_host(self) -> int:
"""Set the maximum number of idle connections per host
Default is no limit"""
@property
def http2_keep_alive_interval(self) -> int:
"""Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive.
Default is disabled
"""
@property
def http2_keep_alive_timeout(self) -> int:
"""Sets a timeout for receiving an acknowledgement of the keep-alive ping.
If the ping is not acknowledged within the timeout, the connection will be closed.
Does nothing if http2_keep_alive_interval is disabled.
Default is disabled
"""
@property
def http2_keep_alive_while_idle(self) -> bool:
"""Enable HTTP2 keep alive pings for idle connections
If disabled, keep-alive pings are only sent while there are open request/response
streams. If enabled, pings are also sent when no streams are active
Default is disabled
"""
@property
def http1_only(self) -> bool:
"""Only use http1 connections"""
@property
def http2_only(self) -> bool:
"""Only use http2 connections"""

class ObjectStore:
"""A uniform API for interacting with object storage services and local files."""

def __init__(self, root: str, options: dict[str, str] | None = None) -> None: ...
def __init__(
self, root: str, options: dict[str, str] | None = None, client_options: ClientOptions | None = None
) -> None: ...
def get(self, location: Path) -> bytes:
"""Return the bytes that are stored at the specified location."""
def get_range(self, location: Path, start: int, length: int) -> bytes:
Expand Down Expand Up @@ -115,7 +208,9 @@ class ObjectOutputStream:
class ArrowFileSystemHandler:
"""Implementation of pyarrow.fs.FileSystemHandler for use with pyarrow.fs.PyFileSystem"""

def __init__(self, root: str, options: dict[str, str] | None = None) -> None: ...
def __init__(
self, root: str, options: dict[str, str] | None = None, client_options: ClientOptions | None = None
) -> None: ...
def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
Expand Down
13 changes: 10 additions & 3 deletions object-store/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use crate::builder::ObjectStoreBuilder;
use crate::utils::{delete_dir, walk_tree};
use crate::ObjectStoreError;
use crate::{ObjectStoreError, PyClientOptions};

use object_store::path::Path;
use object_store::{DynObjectStore, Error as InnerObjectStoreError, ListResult, MultipartId};
Expand All @@ -25,11 +25,18 @@ pub struct ArrowFileSystemHandler {
#[pymethods]
impl ArrowFileSystemHandler {
#[new]
#[pyo3(signature = (root, options = None))]
fn new(root: String, options: Option<HashMap<String, String>>) -> PyResult<Self> {
#[pyo3(signature = (root, options = None, client_options = None))]
fn new(
root: String,
options: Option<HashMap<String, String>>,
client_options: Option<PyClientOptions>,
) -> PyResult<Self> {
let client_options = client_options.unwrap_or_default();
let inner = ObjectStoreBuilder::new(root.clone())
.with_path_as_prefix(true)
.with_options(options.clone().unwrap_or_default())
.with_client_options(client_options.client_options()?)
.with_retry_config(client_options.retry_config()?)
.build()
.map_err(ObjectStoreError::from)?;
Ok(Self {
Expand Down
Loading

0 comments on commit 8dcdb4b

Please sign in to comment.