diff --git a/docs/3.0/manage/cloud/manage-users/secure-access-by-ip-address.mdx b/docs/3.0/manage/cloud/manage-users/secure-access-by-ip-address.mdx index 2e864249ea36..38b684afaf5e 100644 --- a/docs/3.0/manage/cloud/manage-users/secure-access-by-ip-address.mdx +++ b/docs/3.0/manage/cloud/manage-users/secure-access-by-ip-address.mdx @@ -3,10 +3,11 @@ title: Secure access by IP address description: Manage network access to Prefect Cloud accounts with IP Allowlists. --- -Prefect Cloud's [Enterprise plan](https://www.prefect.io/pricing) offers IP allowlisting to restrict access to -Prefect Cloud APIs and UI at the network level by giving users the ability to manage an explicit list of allowed IP addresses for an account. +IP allowlisting is an available upgrade to certain Enterprise plans. +IP allowlisting enables account administrators to restrict access to Prefect Cloud APIs and the UI at the network level. +To learn more, please contact your account manager or the Prefect team at sales@prefect.io. -To get started, use the Prefect CLI to add an IP address to the allowlist: +Once the feature has been enabled for your team's Enterprise account, use the Prefect CLI to add an IP address to the allowlist: To help prevent accidental account lockouts, an update to an allowlist requires the requestor's current IP address to be on the list. @@ -39,4 +40,4 @@ For other related commands, see the CLI help documentation with: ```bash prefect cloud ip-allowlist --help -``` \ No newline at end of file +``` diff --git a/src/integrations/prefect-azure/tests/test_credentials.py b/src/integrations/prefect-azure/tests/test_credentials.py index 1d774b344302..5d7dd57f5731 100644 --- a/src/integrations/prefect-azure/tests/test_credentials.py +++ b/src/integrations/prefect-azure/tests/test_credentials.py @@ -1,13 +1,14 @@ from unittest.mock import MagicMock import pytest -from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient +from azure.storage.blob.aio import BlobClient, BlobServiceClient, ContainerClient from conftest import CosmosClientMock from prefect_azure.credentials import ( AzureBlobStorageCredentials, AzureCosmosDbCredentials, AzureMlCredentials, ) +from pydantic import SecretStr from prefect import flow @@ -34,7 +35,7 @@ def test_flow(): client = test_flow() assert isinstance(client, ContainerClient) - client.container_name == "container" + assert client.container_name == "container" def test_get_blob_client(blob_connection_string): @@ -47,8 +48,8 @@ def test_flow(): client = test_flow() assert isinstance(client, BlobClient) - client.container_name == "container" - client.blob_name == "blob" + assert client.container_name == "container" + assert client.blob_name == "blob" def test_get_service_client_either_error(): @@ -73,7 +74,7 @@ def test_get_blob_container_client_no_conn_str(account_url): "container" ) assert isinstance(client, ContainerClient) - client.container_name == "container" + assert client.container_name == "container" def test_get_blob_client_no_conn_str(account_url): @@ -81,8 +82,8 @@ def test_get_blob_client_no_conn_str(account_url): "container", "blob" ) assert isinstance(client, BlobClient) - client.container_name == "container" - client.blob_name == "blob" + assert client.container_name == "container" + assert client.blob_name == "blob" def test_get_cosmos_client(cosmos_connection_string): @@ -126,7 +127,7 @@ def test_flow(): workspace = AzureMlCredentials( tenant_id="tenant_id", service_principal_id="service_principal_id", - service_principal_password="service_principal_password", + service_principal_password=SecretStr("service_principal_password"), subscription_id="subscription_id", resource_group="resource_group", workspace_name="workspace_name", diff --git a/src/integrations/prefect-kubernetes/prefect_kubernetes/credentials.py b/src/integrations/prefect-kubernetes/prefect_kubernetes/credentials.py index eab8e3274fde..26353045bb70 100644 --- a/src/integrations/prefect-kubernetes/prefect_kubernetes/credentials.py +++ b/src/integrations/prefect-kubernetes/prefect_kubernetes/credentials.py @@ -181,6 +181,14 @@ async def get_client( context=context, client_configuration=client_configuration, ) + else: + try: + config.load_incluster_config(client_configuration=client_configuration) + except config.ConfigException: + # If in-cluster config fails, load the local kubeconfig + config.load_kube_config( + client_configuration=client_configuration, + ) async with ApiClient(configuration=client_configuration) as api_client: try: yield await self.get_resource_specific_client( diff --git a/src/integrations/prefect-kubernetes/tests/test_credentials.py b/src/integrations/prefect-kubernetes/tests/test_credentials.py index 1f8d1864b7a3..9ba2bba5d3f4 100644 --- a/src/integrations/prefect-kubernetes/tests/test_credentials.py +++ b/src/integrations/prefect-kubernetes/tests/test_credentials.py @@ -2,6 +2,7 @@ import tempfile from pathlib import Path from typing import Dict +from unittest.mock import AsyncMock, MagicMock import pydantic import pytest @@ -13,9 +14,13 @@ CoreV1Api, CustomObjectsApi, ) +from kubernetes_asyncio.config import ConfigException from kubernetes_asyncio.config.kube_config import list_kube_config_contexts from OpenSSL import crypto -from prefect_kubernetes.credentials import KubernetesClusterConfig +from prefect_kubernetes.credentials import ( + KubernetesClusterConfig, + KubernetesCredentials, +) @pytest.fixture @@ -107,6 +112,19 @@ def config_file(tmp_path, config_context) -> Path: return config_file +@pytest.fixture +def mock_cluster_config(monkeypatch): + mock = MagicMock() + # We cannot mock this or the `except` clause will complain + mock.ConfigException.return_value = ConfigException + mock.load_kube_config = AsyncMock() + monkeypatch.setattr("prefect_kubernetes.credentials.config", mock) + monkeypatch.setattr( + "prefect_kubernetes.credentials.config.ConfigException", ConfigException + ) + return mock + + class TestCredentials: @pytest.mark.parametrize( "resource_type,client_type", @@ -130,6 +148,21 @@ async def test_client_bad_resource_type(self, kubernetes_credentials): async with kubernetes_credentials.get_client("shoo-ba-daba-doo"): pass + async def test_incluster_config(self, mock_cluster_config): + kubernetes_credentials = KubernetesCredentials() + mock_cluster_config.load_incluster_config.return_value = None + async with kubernetes_credentials.get_client("batch"): + assert mock_cluster_config.load_incluster_config.called + assert not mock_cluster_config.load_kube_config.called + + async def test_load_kube_config(self, mock_cluster_config): + kubernetes_credentials = KubernetesCredentials() + mock_cluster_config.load_incluster_config.side_effect = ConfigException() + mock_cluster_config.load_kube_config.return_value = None + async with kubernetes_credentials.get_client("batch"): + assert mock_cluster_config.load_incluster_config.called + assert mock_cluster_config.load_kube_config.called + class TestKubernetesClusterConfig: async def test_instantiation_from_file(self, config_file, config_context): diff --git a/src/prefect/cli/_prompts.py b/src/prefect/cli/_prompts.py index e4968ca84673..838ea4d28b47 100644 --- a/src/prefect/cli/_prompts.py +++ b/src/prefect/cli/_prompts.py @@ -14,7 +14,6 @@ from rich.progress import Progress, SpinnerColumn, TextColumn from rich.prompt import Confirm, InvalidResponse, Prompt, PromptBase from rich.table import Table -from rich.text import Text from prefect.cli._utilities import exit_with_error from prefect.client.collections import get_collections_metadata_client @@ -96,45 +95,40 @@ def prompt_select_from_table( current_idx = 0 selected_row = None table_kwargs = table_kwargs or {} + visible_rows = min(10, console.height - 4) # Adjust number of visible rows + scroll_offset = 0 + total_options = len(data) + (1 if opt_out_message else 0) def build_table() -> Union[Table, Group]: - """ - Generate a table of options. The `current_idx` will be highlighted. - """ - + nonlocal scroll_offset table = Table(**table_kwargs) table.add_column() for column in columns: table.add_column(column.get("header", "")) - rows = [] - max_length = 250 - for item in data: - rows.append( - tuple( - ( - value[:max_length] + "...\n" - if isinstance(value := item.get(column.get("key")), str) - and len(value) > max_length - else value - ) - for column in columns + # Adjust scroll_offset if necessary + if current_idx < scroll_offset: + scroll_offset = current_idx + elif current_idx >= scroll_offset + visible_rows: + scroll_offset = current_idx - visible_rows + 1 + + for i, item in enumerate(data[scroll_offset : scroll_offset + visible_rows]): + row = [item.get(column.get("key", "")) for column in columns] + if i + scroll_offset == current_idx: + table.add_row( + "[bold][blue]>", *[f"[bold][blue]{cell}[/]" for cell in row] ) - ) - - for i, row in enumerate(rows): - if i == current_idx: - # Use blue for selected options - table.add_row("[bold][blue]>", f"[bold][blue]{row[0]}[/]", *row[1:]) else: table.add_row(" ", *row) if opt_out_message: - prefix = " > " if current_idx == len(data) else " " * 4 - bottom_text = Text(prefix + opt_out_message) + opt_out_row = [""] * (len(columns) - 1) + [opt_out_message] if current_idx == len(data): - bottom_text.stylize("bold blue") - return Group(table, bottom_text) + table.add_row( + "[bold][blue]>", *[f"[bold][blue]{cell}[/]" for cell in opt_out_row] + ) + else: + table.add_row(" ", *opt_out_row) return table @@ -151,24 +145,14 @@ def build_table() -> Union[Table, Group]: key = readchar.readkey() if key == readchar.key.UP: - current_idx = current_idx - 1 - # wrap to bottom if at the top - if opt_out_message and current_idx < 0: - current_idx = len(data) - elif not opt_out_message and current_idx < 0: - current_idx = len(data) - 1 + current_idx = (current_idx - 1) % total_options elif key == readchar.key.DOWN: - current_idx = current_idx + 1 - # wrap to top if at the bottom - if opt_out_message and current_idx >= len(data) + 1: - current_idx = 0 - elif not opt_out_message and current_idx >= len(data): - current_idx = 0 + current_idx = (current_idx + 1) % total_options elif key == readchar.key.CTRL_C: # gracefully exit with no message exit_with_error("") elif key == readchar.key.ENTER or key == readchar.key.CR: - if current_idx >= len(data): + if current_idx == len(data): return opt_out_response else: selected_row = data[current_idx] @@ -181,8 +165,6 @@ def build_table() -> Union[Table, Group]: # Interval schedule prompting utilities - - class IntervalValuePrompt(PromptBase[timedelta]): response_type = timedelta validate_error_message = ( @@ -858,6 +840,7 @@ async def prompt_select_blob_storage_credentials( url = urls.url_for(new_block_document) if url: console.print( - "\nView/Edit your new credentials block in the UI:" f"\n[blue]{url}[/]\n" + "\nView/Edit your new credentials block in the UI:" f"\n[blue]{url}[/]\n", + soft_wrap=True, ) return f"{{{{ prefect.blocks.{creds_block_type_slug}.{new_block_document.name} }}}}" diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 8ac4093644e4..f01ed8e97cc7 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -1032,8 +1032,6 @@ def my_flow(name: str = "world"): if not isinstance(storage, LocalStorage): storage.set_base_path(Path(tmpdir)) await storage.pull_code() - storage.set_base_path(Path(tmpdir)) - await storage.pull_code() full_entrypoint = str(storage.destination / entrypoint) flow: Flow = await from_async.wait_for_call_in_new_thread( diff --git a/src/prefect/results.py b/src/prefect/results.py index 667786ddc2f1..7cac1b8e7a7e 100644 --- a/src/prefect/results.py +++ b/src/prefect/results.py @@ -172,9 +172,18 @@ def _format_user_supplied_storage_key(key: str) -> str: class ResultStore(BaseModel): """ A utility to generate `Result` types. + + Attributes: + result_storage: The storage for result records. + metadata_storage: The storage for result record metadata. If not provided, the metadata will be stored alongside the results. + persist_result: Whether to persist results. + cache_result_in_memory: Whether to cache results in memory. + serializer: The serializer to use for results. + storage_key_fn: The function to generate storage keys. """ result_storage: Optional[WritableFileSystem] = Field(default=None) + metadata_storage: Optional[WritableFileSystem] = Field(default=None) persist_result: bool = Field(default_factory=get_default_persist_setting) cache_result_in_memory: bool = Field(default=True) serializer: Serializer = Field(default_factory=get_default_result_serializer) @@ -238,6 +247,56 @@ async def update_for_task(self: Self, task: "Task") -> Self: update["result_storage"] = await get_default_result_storage() return self.model_copy(update=update) + @sync_compatible + async def _exists(self, key: str) -> bool: + """ + Check if a result record exists in storage. + + Args: + key: The key to check for the existence of a result record. + + Returns: + bool: True if the result record exists, False otherwise. + """ + if self.metadata_storage is not None: + # TODO: Add an `exists` method to commonly used storage blocks + # so the entire payload doesn't need to be read + try: + metadata_content = await self.metadata_storage.read_path(key) + return metadata_content is not None + except Exception: + return False + else: + try: + content = await self.result_storage.read_path(key) + return content is not None + except Exception: + return False + + def exists(self, key: str) -> bool: + """ + Check if a result record exists in storage. + + Args: + key: The key to check for the existence of a result record. + + Returns: + bool: True if the result record exists, False otherwise. + """ + return self._exists(key=key, _sync=True) + + async def aexists(self, key: str) -> bool: + """ + Check if a result record exists in storage. + + Args: + key: The key to check for the existence of a result record. + + Returns: + bool: True if the result record exists, False otherwise. + """ + return await self._exists(key=key, _sync=False) + @sync_compatible async def _read(self, key: str) -> "ResultRecord": """ @@ -255,8 +314,19 @@ async def _read(self, key: str) -> "ResultRecord": if self.result_storage is None: self.result_storage = await get_default_result_storage() - content = await self.result_storage.read_path(f"{key}") - return ResultRecord.deserialize(content) + if self.metadata_storage is not None: + metadata_content = await self.metadata_storage.read_path(key) + metadata = ResultRecordMetadata.load_bytes(metadata_content) + assert ( + metadata.storage_key is not None + ), "Did not find storage key in metadata" + result_content = await self.result_storage.read_path(metadata.storage_key) + return ResultRecord.deserialize_from_result_and_metadata( + result=result_content, metadata=metadata_content + ) + else: + content = await self.result_storage.read_path(key) + return ResultRecord.deserialize(content) def read(self, key: str) -> "ResultRecord": """ @@ -300,8 +370,6 @@ async def _write( obj: The object to write to storage. expiration: The expiration time for the result record. """ - if self.result_storage is None: - self.result_storage = await get_default_result_storage() key = key or self.storage_key_fn() record = ResultRecord( @@ -347,9 +415,24 @@ async def _persist_result_record(self, result_record: "ResultRecord"): if self.result_storage is None: self.result_storage = await get_default_result_storage() - await self.result_storage.write_path( - result_record.metadata.storage_key, content=result_record.serialize() - ) + assert ( + result_record.metadata.storage_key is not None + ), "Storage key is required on result record" + # If metadata storage is configured, write result and metadata separately + if self.metadata_storage is not None: + await self.result_storage.write_path( + result_record.metadata.storage_key, + content=result_record.serialize_result(), + ) + await self.metadata_storage.write_path( + result_record.metadata.storage_key, + content=result_record.serialize_metadata(), + ) + # Otherwise, write the result metadata and result together + else: + await self.result_storage.write_path( + result_record.metadata.storage_key, content=result_record.serialize() + ) def persist_result_record(self, result_record: "ResultRecord"): """ diff --git a/tests/cli/test_deploy.py b/tests/cli/test_deploy.py index a315febaf5cb..df335421d2cf 100644 --- a/tests/cli/test_deploy.py +++ b/tests/cli/test_deploy.py @@ -804,7 +804,7 @@ async def test_deploy_with_blob_storage_create_credentials( invoke_and_assert, command=( "deploy ./flows/hello.py:my_flow -n test-name -p" - f" {work_pool.name} --version 1.0.0 -v env=prod -t foo-bar" + f" {work_pool.name} --version 1.0.0 -jv env=prod -t foo-bar" " --interval 60" ), expected_code=0, @@ -6339,7 +6339,7 @@ async def test_uses_job_variables(self, project_dir, work_pool, prefect_client): invoke_and_assert, command=( "deploy ./flows/hello.py:my_flow -n test-name -p test-pool --version" - " 1.0.0 -v env=prod -t foo-bar --variable" + " 1.0.0 -v env=prod -t foo-bar --job-variable" ' \'{"resources":{"limits":{"cpu": 1}}}\'' ), expected_code=0, @@ -6366,7 +6366,7 @@ async def test_rejects_json_strings(self, project_dir, work_pool): invoke_and_assert, command=( "deploy ./flows/hello.py:my_flow -n test-name -p test-pool --version" - " 1.0.0 -v env=prod -t foo-bar --variable 'my-variable'" + " 1.0.0 -v env=prod -t foo-bar --job-variable 'my-variable'" ), expected_code=1, expected_output_contains=[ @@ -6379,7 +6379,7 @@ async def test_rejects_json_arrays(self, project_dir, work_pool): invoke_and_assert, command=( "deploy ./flows/hello.py:my_flow -n test-name -p test-pool --version" - " 1.0.0 -v env=prod -t foo-bar --variable ['my-variable']" + " 1.0.0 -v env=prod -t foo-bar --job-variable ['my-variable']" ), expected_code=1, expected_output_contains=[ @@ -6392,7 +6392,7 @@ async def test_rejects_invalid_json(self, project_dir, work_pool): invoke_and_assert, command=( "deploy ./flows/hello.py:my_flow -n test-name -p test-pool --version" - " 1.0.0 -v env=prod -t foo-bar --variable " + " 1.0.0 -v env=prod -t foo-bar --job-variable " ' \'{"resources":{"limits":{"cpu"}\'' ), expected_code=1, diff --git a/tests/results/test_result_store.py b/tests/results/test_result_store.py index cfb6496fcb68..242df77a6f49 100644 --- a/tests/results/test_result_store.py +++ b/tests/results/test_result_store.py @@ -731,3 +731,66 @@ def foo(): result_store = foo() assert result_store.persist_result is persist_result + + +async def test_result_store_read_and_write_with_metadata_storage(tmp_path): + metadata_storage = LocalFileSystem(basepath=tmp_path / "metadata") + result_storage = LocalFileSystem(basepath=tmp_path / "results") + result_store = ResultStore( + metadata_storage=metadata_storage, result_storage=result_storage + ) + + key = "test" + value = "test" + await result_store.awrite(key=key, obj=value) + read_value = await result_store.aread(key=key) + assert read_value.result == value + + # Check that the result is written to the result storage + assert ( + result_store.serializer.loads((tmp_path / "results" / key).read_bytes()) + == value + ) + + # Check that the metadata is written to the metadata storage + assert ( + tmp_path / "metadata" / key + ).read_text() == read_value.metadata.model_dump_json(serialize_as_any=True) + + +async def test_result_store_exists_with_metadata_storage(tmp_path): + metadata_storage = LocalFileSystem(basepath=tmp_path / "metadata") + result_storage = LocalFileSystem(basepath=tmp_path / "results") + result_store = ResultStore( + metadata_storage=metadata_storage, result_storage=result_storage + ) + + key = "test" + value = "test" + await result_store.awrite(key=key, obj=value) + + assert await result_store.aexists(key=key) is True + assert await result_store.aexists(key="nonexistent") is False + assert result_store.exists(key=key) is True + assert result_store.exists(key="nonexistent") is False + + # Remove the metadata file and check that the result is not found + (tmp_path / "metadata" / key).unlink() + assert await result_store.aexists(key=key) is False + assert result_store.exists(key=key) is False + + +async def test_result_store_exists_with_no_metadata_storage(tmp_path): + result_storage = LocalFileSystem(basepath=tmp_path / "results") + result_store = ResultStore(result_storage=result_storage) + + key = "test" + value = "test" + await result_store.awrite(key=key, obj=value) + assert await result_store.aexists(key=key) is True + assert result_store.exists(key=key) is True + + # Remove the result file and check that the result is not found + (tmp_path / "results" / key).unlink() + assert await result_store.aexists(key=key) is False + assert result_store.exists(key=key) is False diff --git a/tests/test_flows.py b/tests/test_flows.py index 8bff75e285f1..a2103bd3a3cf 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -4214,6 +4214,21 @@ class UnsupportedType: def test_load_flow_from_source_on_flow_function(self): assert hasattr(flow, "from_source") + async def test_no_pull_for_local_storage(self, monkeypatch): + from prefect.runner.storage import LocalStorage + + storage = LocalStorage(path="/tmp/test") + + mock_load_flow = AsyncMock(return_value=MagicMock(spec=Flow)) + monkeypatch.setattr("prefect.flows.load_flow_from_entrypoint", mock_load_flow) + + pull_code_spy = AsyncMock() + monkeypatch.setattr(LocalStorage, "pull_code", pull_code_spy) + + await Flow.from_source(entrypoint="flows.py:test_flow", source=storage) + + pull_code_spy.assert_not_called() + class TestFlowDeploy: @pytest.fixture diff --git a/ui/package-lock.json b/ui/package-lock.json index 08b6aba50474..520c06388ee4 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -9,7 +9,7 @@ "version": "2.8.0", "dependencies": { "@prefecthq/prefect-design": "2.12.1", - "@prefecthq/prefect-ui-library": "3.7.7", + "@prefecthq/prefect-ui-library": "3.8.0", "@prefecthq/vue-charts": "2.0.4", "@prefecthq/vue-compositions": "1.11.4", "@types/lodash.debounce": "4.0.9", @@ -1112,9 +1112,9 @@ } }, "node_modules/@prefecthq/prefect-ui-library": { - "version": "3.7.7", - "resolved": "https://registry.npmjs.org/@prefecthq/prefect-ui-library/-/prefect-ui-library-3.7.7.tgz", - "integrity": "sha512-vg7kSsjapo4QPV8n8DMdb6MGXUkBvs5SPF4shKUvPfsc8pfIDy2ywtU+htcFteLuWwFKtPxJ4Zgw+F2NaGuSCQ==", + "version": "3.8.0", + "resolved": "https://registry.npmjs.org/@prefecthq/prefect-ui-library/-/prefect-ui-library-3.8.0.tgz", + "integrity": "sha512-6R+M5KMJT9AV2+HfGa/ggRDYvsAaGPizZYDz0LTN4xvM1NjAHGCMzSkpOCfKG9sEWojZetz3salOPsD8hA1zCw==", "dependencies": { "@prefecthq/graphs": "2.4.0", "axios": "1.7.4", @@ -7788,9 +7788,9 @@ } }, "@prefecthq/prefect-ui-library": { - "version": "3.7.7", - "resolved": "https://registry.npmjs.org/@prefecthq/prefect-ui-library/-/prefect-ui-library-3.7.7.tgz", - "integrity": "sha512-vg7kSsjapo4QPV8n8DMdb6MGXUkBvs5SPF4shKUvPfsc8pfIDy2ywtU+htcFteLuWwFKtPxJ4Zgw+F2NaGuSCQ==", + "version": "3.8.0", + "resolved": "https://registry.npmjs.org/@prefecthq/prefect-ui-library/-/prefect-ui-library-3.8.0.tgz", + "integrity": "sha512-6R+M5KMJT9AV2+HfGa/ggRDYvsAaGPizZYDz0LTN4xvM1NjAHGCMzSkpOCfKG9sEWojZetz3salOPsD8hA1zCw==", "requires": { "@prefecthq/graphs": "2.4.0", "axios": "1.7.4", diff --git a/ui/package.json b/ui/package.json index 6a34ab8a0667..5f7e93f545cf 100644 --- a/ui/package.json +++ b/ui/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@prefecthq/prefect-design": "2.12.1", - "@prefecthq/prefect-ui-library": "3.7.7", + "@prefecthq/prefect-ui-library": "3.8.0", "@prefecthq/vue-charts": "2.0.4", "@prefecthq/vue-compositions": "1.11.4", "@types/lodash.debounce": "4.0.9",