Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-41879: Implement URL signing for client/server #920

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1998,10 +1998,23 @@ def get(
def prepare_get_for_external_client(self, ref: DatasetRef) -> FileDatastoreGetPayload:
# Docstring inherited

# 1 hour. Chosen somewhat arbitrarily -- this is long enough that the
# client should have time to download a large file with retries if
# needed, but short enough that it will become obvious quickly that
# these URLs expire.
# From a strictly technical standpoint there is no reason this
# shouldn't be a day or more, but there seems to be a political issue
# where people think there is a risk of end users posting presigned
# URLs for people without access rights to download.
url_expiration_time_seconds = 1 * 60 * 60

def to_file_info_payload(info: DatasetLocationInformation) -> FileDatastoreGetPayloadFileInfo:
location, file_info = info
return FileDatastoreGetPayloadFileInfo(
url=location.uri.geturl(), datastoreRecords=file_info.to_simple()
url=location.uri.generate_presigned_get_url(
expiration_time_seconds=url_expiration_time_seconds
),
datastoreRecords=file_info.to_simple(),
)

return FileDatastoreGetPayload(
Expand Down
16 changes: 8 additions & 8 deletions python/lsst/daf/butler/datastores/fileDatastoreClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
generate_datastore_get_information,
get_dataset_as_python_object_from_get_info,
)
from pydantic import AnyHttpUrl


class FileDatastoreGetPayloadFileInfo(_BaseModelCompat):
"""Information required to read a single file stored in `FileDatastore`"""

# TODO DM-41879: Allowing arbitrary URLs here is a severe security issue,
# since it allows the server to trick the client into fetching data from
# any file on its local filesystem or from remote storage using credentials
# laying around in the environment. This should be restricted to only
# HTTP, but we don't yet have a means of mocking out HTTP gets in tests.
url: str
"""An absolute URL that can be used to read the file"""
# This is intentionally restricted to HTTP for security reasons. Allowing
# arbitrary URLs here would allow the server to trick the client into
# fetching data from any file on its local filesystem or from remote
# storage using credentials laying around in the environment.
url: AnyHttpUrl
"""An HTTP URL that can be used to read the file"""

datastoreRecords: SerializedStoredFileInfo
"""`FileDatastore` metadata records for this file"""
Expand Down Expand Up @@ -76,7 +76,7 @@ def get_dataset_as_python_object(
The retrieved artifact, converted to a Python object
"""
fileLocations: list[DatasetLocationInformation] = [
(Location(None, file_info.url), StoredFileInfo.from_simple(file_info.datastoreRecords))
(Location(None, str(file_info.url)), StoredFileInfo.from_simple(file_info.datastoreRecords))
for file_info in payload.file_info
]

Expand Down
19 changes: 11 additions & 8 deletions python/lsst/daf/butler/datastores/file_datastore/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,14 @@ def _read_artifact_into_memory(
# Do not do this if the size is negative since that indicates
# we do not know.
recorded_size = getInfo.info.file_size
resource_size = uri.size()
if recorded_size >= 0 and resource_size != recorded_size:
raise RuntimeError(
"Integrity failure in Datastore. "
f"Size of file {uri} ({resource_size}) "
f"does not match size recorded in registry of {recorded_size}"
)

def check_resource_size(resource_size: int) -> None:
if recorded_size >= 0 and resource_size != recorded_size:
raise RuntimeError(
"Integrity failure in Datastore. "
f"Size of file {uri} ({resource_size}) "
f"does not match size recorded in registry of {recorded_size}"
)

# For the general case we have choices for how to proceed.
# 1. Always use a local file (downloading the remote resource to a
Expand All @@ -225,7 +226,7 @@ def _read_artifact_into_memory(

formatter = getInfo.formatter
nbytes_max = 10_000_000 # Arbitrary number that we can tune
if resource_size <= nbytes_max and formatter.can_read_bytes():
if recorded_size >= 0 and recorded_size <= nbytes_max and formatter.can_read_bytes():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The size of -1 for "unknown" was added long after this check was written.

with cache_manager.find_in_cache(cache_ref, uri.getExtension()) as cached_file:
if cached_file is not None:
desired_uri = cached_file
Expand All @@ -235,6 +236,7 @@ def _read_artifact_into_memory(
msg = ""
with time_this(log, msg="Reading bytes from %s%s", args=(desired_uri, msg)):
serializedDataset = desired_uri.read()
check_resource_size(len(serializedDataset))
log.debug(
"Deserializing %s from %d bytes from location %s with formatter %s",
f"component {getInfo.component}" if isComponent else "",
Expand Down Expand Up @@ -271,6 +273,7 @@ def _read_artifact_into_memory(
location_updated = True

with uri.as_local() as local_uri:
check_resource_size(local_uri.size())
can_be_cached = False
if uri != local_uri:
# URI was remote and file was downloaded
Expand Down
22 changes: 17 additions & 5 deletions python/lsst/daf/butler/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import astropy
from astropy.table import Table as AstropyTable

from .. import Butler, Config, StorageClassFactory, Timespan
from .. import Butler, Config, DatasetRef, StorageClassFactory, Timespan
from ..registry import CollectionType
from ..tests import MetricsExample, addDatasetType

Expand Down Expand Up @@ -221,6 +221,10 @@ class MetricTestRepo:
The location of the repository, to pass to ``Butler.makeRepo``.
configFile : `str`
The path to the config file, to pass to ``Butler.makeRepo``.
forceConfigRoot: `bool`, optional
If `False`, any values present in the supplied ``config`` that
would normally be reset are not overridden and will appear
directly in the output config. Passed to ``Butler.makeRepo``.
"""

METRICS_EXAMPLE_SUMMARY = {"AM1": 5.2, "AM2": 30.6}
Expand All @@ -237,9 +241,9 @@ def _makeExampleMetrics() -> MetricsExample:
[563, 234, 456.7, 752, 8, 9, 27],
)

def __init__(self, root: str, configFile: str) -> None:
def __init__(self, root: str, configFile: str, forceConfigRoot: bool = True) -> None:
self.root = root
Butler.makeRepo(self.root, config=Config(configFile))
Butler.makeRepo(self.root, config=Config(configFile), forceConfigRoot=forceConfigRoot)
butlerConfigFile = os.path.join(self.root, "butler.yaml")
self.storageClassFactory = StorageClassFactory()
self.storageClassFactory.addFromConfig(butlerConfigFile)
Expand Down Expand Up @@ -291,7 +295,7 @@ def __init__(self, root: str, configFile: str) -> None:

def addDataset(
self, dataId: dict[str, Any], run: str | None = None, datasetType: DatasetType | None = None
) -> None:
) -> DatasetRef:
"""Create a new example metric and add it to the named run with the
given dataId.

Expand All @@ -309,8 +313,16 @@ def addDataset(
datasetType : ``DatasetType``, optional
The dataset type of the added dataset. If `None`, will use the
default dataset type.

Returns
-------
datasetRef : `DatasetRef`
A reference to the added dataset.

"""
if run:
self.butler.registry.registerCollection(run, type=CollectionType.RUN)
metric = self._makeExampleMetrics()
self.butler.put(metric, self.datasetType if datasetType is None else datasetType, dataId, run=run)
return self.butler.put(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a Returns section to the docstring.

metric, self.datasetType if datasetType is None else datasetType, dataId, run=run
)
18 changes: 3 additions & 15 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
try:
import boto3
import botocore
from lsst.resources.s3utils import setAwsEnvCredentials, unsetAwsEnvCredentials
from lsst.resources.s3utils import clean_test_environment_for_s3
from moto import mock_s3 # type: ignore[import]
except ImportError:
boto3 = None
Expand Down Expand Up @@ -114,13 +114,7 @@ def mock_s3(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-untyped-def]

def clean_environment() -> None:
"""Remove external environment variables that affect the tests."""
for k in (
"DAF_BUTLER_REPOSITORY_INDEX",
"S3_ENDPOINT_URL",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SHARED_CREDENTIALS_FILE",
):
for k in ("DAF_BUTLER_REPOSITORY_INDEX",):
os.environ.pop(k, None)


Expand Down Expand Up @@ -1993,11 +1987,9 @@ def setUp(self) -> None:
self.bucketName = uri.netloc

# Enable S3 mocking of tests.
self.enterContext(clean_test_environment_for_s3())
self.mock_s3.start()

# set up some fake credentials if they do not exist
self.usingDummyCredentials = setAwsEnvCredentials()

if self.useTempRoot:
self.root = self.genRoot()
rooturi = f"s3://{self.bucketName}/{self.root}"
Expand Down Expand Up @@ -2035,10 +2027,6 @@ def tearDown(self) -> None:
# Stop the S3 mock.
self.mock_s3.stop()

# unset any potentially set dummy credentials
if self.usingDummyCredentials:
unsetAwsEnvCredentials()

if self.reg_dir is not None and os.path.exists(self.reg_dir):
shutil.rmtree(self.reg_dir, ignore_errors=True)

Expand Down
33 changes: 32 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from lsst.daf.butler.remote_butler import RemoteButler
from lsst.daf.butler.remote_butler.server import Factory, app
from lsst.daf.butler.remote_butler.server._dependencies import factory_dependency
from lsst.resources.s3utils import clean_test_environment_for_s3, getS3Client
from moto import mock_s3
except ImportError:
TestClient = None
app = None
Expand Down Expand Up @@ -68,11 +70,28 @@ class ButlerClientServerTestCase(unittest.TestCase):

@classmethod
def setUpClass(cls):
# Set up a mock S3 environment using Moto. Moto also monkeypatches the
# `requests` library so that any HTTP requests to presigned S3 URLs get
# redirected to the mocked S3.
# Note that all files are stored in memory.
cls.enterClassContext(clean_test_environment_for_s3())
cls.enterClassContext(mock_s3())
bucket_name = "anybucketname" # matches s3Datastore.yaml
getS3Client().create_bucket(Bucket=bucket_name)

cls.storageClassFactory = StorageClassFactory()

# First create a butler and populate it.
cls.root = makeTestTempDir(TESTDIR)
cls.repo = MetricTestRepo(root=cls.root, configFile=os.path.join(TESTDIR, "config/basic/butler.yaml"))
cls.repo = MetricTestRepo(
root=cls.root,
configFile=os.path.join(TESTDIR, "config/basic/butler-s3store.yaml"),
forceConfigRoot=False,
)

# Add a file with corrupted data for testing error conditions
cls.dataset_with_corrupted_data = _create_corrupted_dataset(cls.repo)

# Override the server's Butler initialization to point at our test repo
server_butler = Butler.from_config(cls.root, writeable=True)

Expand Down Expand Up @@ -212,6 +231,9 @@ def test_get(self):
with self.assertRaises(LookupError):
self.butler.get(invalid_ref)

with self.assertRaises(RuntimeError):
self.butler.get(self.dataset_with_corrupted_data)

# Test storage class override
new_sc = self.storageClassFactory.getStorageClass("MetricsConversion")
converted = self.butler.get(ref, storageClass=new_sc)
Expand All @@ -220,5 +242,14 @@ def test_get(self):
self.assertEqual(metric, converted)


def _create_corrupted_dataset(repo: MetricTestRepo) -> DatasetRef:
run = "corrupted-run"
ref = repo.addDataset({"instrument": "DummyCamComp", "visit": 423}, run=run)
uris = repo.butler.getURIs(ref)
oneOfTheComponents = list(uris.componentURIs.values())[0]
oneOfTheComponents.write("corrupted data")
return ref


if __name__ == "__main__":
unittest.main()
Loading