diff --git a/src/prefect/cli/__init__.py b/src/prefect/cli/__init__.py index d328400854a2..5b967b4aca14 100644 --- a/src/prefect/cli/__init__.py +++ b/src/prefect/cli/__init__.py @@ -8,6 +8,7 @@ import prefect.cli.artifact import prefect.cli.block import prefect.cli.cloud +import prefect.cli.cloud.ip_allowlist import prefect.cli.cloud.webhook import prefect.cli.shell import prefect.cli.concurrency_limit diff --git a/src/prefect/cli/cloud/ip_allowlist.py b/src/prefect/cli/cloud/ip_allowlist.py new file mode 100644 index 000000000000..05ebea7ed526 --- /dev/null +++ b/src/prefect/cli/cloud/ip_allowlist.py @@ -0,0 +1,256 @@ +import asyncio +from typing import Annotated, Optional + +import typer +from pydantic import BaseModel, IPvAnyNetwork +from rich.panel import Panel +from rich.table import Table + +from prefect.cli._types import PrefectTyper +from prefect.cli._utilities import exit_with_error, exit_with_success +from prefect.cli.cloud import cloud_app, confirm_logged_in +from prefect.cli.root import app +from prefect.client.cloud import get_cloud_client +from prefect.client.schemas.objects import IPAllowlist, IPAllowlistEntry +from prefect.exceptions import PrefectHTTPStatusError +from prefect.logging.loggers import get_logger + +ip_allowlist_app = PrefectTyper( + name="ip-allowlist", help="Manage Prefect Cloud IP Allowlists" +) +cloud_app.add_typer(ip_allowlist_app, aliases=["ip-allowlists"]) + +logger = get_logger(__name__) + + +@ip_allowlist_app.callback() +def require_access_to_ip_allowlisting(ctx: typer.Context): + """Enforce access to IP allowlisting for all subcommands.""" + asyncio.run(_require_access_to_ip_allowlisting(ctx)) + + +async def _require_access_to_ip_allowlisting(ctx: typer.Context): + """Check if the account has access to IP allowlisting. + + Exits with an error if the account does not have access to IP allowlisting. + + On success, sets Typer context meta["enforce_ip_allowlist"] to + True if the account has IP allowlist enforcement enabled, False otherwise. + """ + confirm_logged_in() + + async with get_cloud_client(infer_cloud_url=True) as client: + account_settings = await client.read_account_settings() + + if "enforce_ip_allowlist" not in account_settings: + return exit_with_error("IP allowlisting is not available for this account.") + + enforce_ip_allowlist = account_settings.get("enforce_ip_allowlist", False) + ctx.meta["enforce_ip_allowlist"] = enforce_ip_allowlist + + +@ip_allowlist_app.command() +async def enable(ctx: typer.Context): + """Enable the IP allowlist for your account. When enabled, if the allowlist is non-empty, then access to your Prefect Cloud account will be restricted to only those IP addresses on the allowlist.""" + enforcing_ip_allowlist = ctx.meta["enforce_ip_allowlist"] + if enforcing_ip_allowlist: + exit_with_success("IP allowlist is already enabled.") + + async with get_cloud_client(infer_cloud_url=True) as client: + my_access_if_enabled = await client.check_ip_allowlist_access() + if not my_access_if_enabled.allowed: + exit_with_error( + f"Error enabling IP allowlist: {my_access_if_enabled.detail}" + ) + + logger.debug(my_access_if_enabled.detail) + + if not typer.confirm( + "Enabling the IP allowlist will restrict Prefect Cloud API and UI access to only the IP addresses on the list. " + "Continue?" + ): + exit_with_error("Aborted.") + await client.update_account_settings({"enforce_ip_allowlist": True}) + + exit_with_success("IP allowlist enabled.") + + +@ip_allowlist_app.command() +async def disable(): + """Disable the IP allowlist for your account. When disabled, all IP addresses will be allowed to access your Prefect Cloud account.""" + async with get_cloud_client(infer_cloud_url=True) as client: + await client.update_account_settings({"enforce_ip_allowlist": False}) + + exit_with_success("IP allowlist disabled.") + + +@ip_allowlist_app.command() +async def ls(ctx: typer.Context): + """Fetch and list all IP allowlist entries in your account.""" + async with get_cloud_client(infer_cloud_url=True) as client: + ip_allowlist = await client.read_account_ip_allowlist() + + _print_ip_allowlist_table( + ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] + ) + + +class IPNetworkArg(BaseModel): + raw: str + parsed: IPvAnyNetwork + + +def parse_ip_network_argument(val: str) -> IPNetworkArg: + return IPNetworkArg( + raw=val, + parsed=val, # type: ignore + ) + + +IP_ARGUMENT = Annotated[ + IPNetworkArg, + typer.Argument( + parser=parse_ip_network_argument, + help="An IP address or range in CIDR notation. E.g. 192.168.1.0 or 192.168.1.0/24", + metavar="IP address or range", + ), +] + + +@ip_allowlist_app.command() +async def add( + ctx: typer.Context, + ip_address_or_range: IP_ARGUMENT, + description: Optional[str] = typer.Option( + None, + "--description", + "-d", + help="A short description to annotate the entry with.", + ), +): + """Add a new IP entry to your account IP allowlist.""" + new_entry = IPAllowlistEntry( + ip_network=ip_address_or_range.parsed, description=description, enabled=True + ) + + async with get_cloud_client(infer_cloud_url=True) as client: + ip_allowlist = await client.read_account_ip_allowlist() + + existing_entry_with_same_ip = None + for entry in ip_allowlist.entries: + if entry.ip_network == ip_address_or_range.parsed: + existing_entry_with_same_ip = entry + break + + if existing_entry_with_same_ip: + if not typer.confirm( + f"There's already an entry for this IP ({ip_address_or_range.raw}). Do you want to overwrite it?" + ): + exit_with_error("Aborted.") + ip_allowlist.entries.remove(existing_entry_with_same_ip) + + ip_allowlist.entries.append(new_entry) + + try: + await client.update_account_ip_allowlist(ip_allowlist) + except PrefectHTTPStatusError as exc: + _handle_update_error(exc) + + updated_ip_allowlist = await client.read_account_ip_allowlist() + _print_ip_allowlist_table( + updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] + ) + + +@ip_allowlist_app.command() +async def remove(ctx: typer.Context, ip_address_or_range: IP_ARGUMENT): + """Remove an IP entry from your account IP allowlist.""" + async with get_cloud_client(infer_cloud_url=True) as client: + ip_allowlist = await client.read_account_ip_allowlist() + ip_allowlist.entries = [ + entry + for entry in ip_allowlist.entries + if entry.ip_network != ip_address_or_range.parsed + ] + + try: + await client.update_account_ip_allowlist(ip_allowlist) + except PrefectHTTPStatusError as exc: + _handle_update_error(exc) + + updated_ip_allowlist = await client.read_account_ip_allowlist() + _print_ip_allowlist_table( + updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] + ) + + +@ip_allowlist_app.command() +async def toggle(ctx: typer.Context, ip_address_or_range: IP_ARGUMENT): + """Toggle the enabled status of an individual IP entry in your account IP allowlist.""" + async with get_cloud_client(infer_cloud_url=True) as client: + ip_allowlist = await client.read_account_ip_allowlist() + + found_matching_entry = False + for entry in ip_allowlist.entries: + if entry.ip_network == ip_address_or_range.parsed: + entry.enabled = not entry.enabled + found_matching_entry = True + break + + if not found_matching_entry: + exit_with_error( + f"No entry found with IP address `{ip_address_or_range.raw}`." + ) + + try: + await client.update_account_ip_allowlist(ip_allowlist) + except PrefectHTTPStatusError as exc: + _handle_update_error(exc) + + updated_ip_allowlist = await client.read_account_ip_allowlist() + _print_ip_allowlist_table( + updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] + ) + + +def _print_ip_allowlist_table(ip_allowlist: IPAllowlist, enabled: bool): + if not ip_allowlist.entries: + app.console.print( + Panel( + "IP allowlist is empty. Add an entry to secure access to your Prefect Cloud account.", + expand=False, + ) + ) + return + + red_asterisk_if_not_enabled = "[red]*[/red]" if enabled is False else "" + + table = Table( + title="IP Allowlist " + red_asterisk_if_not_enabled, + caption=f"{red_asterisk_if_not_enabled} Enforcement is " + f"[bold]{'ENABLED' if enabled else '[red]DISABLED[/red]'}[/bold].", + caption_style="not dim", + ) + + table.add_column("IP Address", style="cyan", no_wrap=True) + table.add_column("Description", style="blue", no_wrap=False) + table.add_column("Enabled", style="green", justify="right", no_wrap=True) + table.add_column("Last Seen", style="magenta", justify="right", no_wrap=True) + + for entry in ip_allowlist.entries: + table.add_row( + str(entry.ip_network), + entry.description, + str(entry.enabled), + entry.last_seen or "Never", + style="dim" if not entry.enabled else None, + ) + + app.console.print(table) + + +def _handle_update_error(error: PrefectHTTPStatusError): + if error.response.status_code == 422 and ( + details := error.response.json().get("detail") + ): + exit_with_error(f"Error updating allowlist: {details}") diff --git a/src/prefect/client/cloud.py b/src/prefect/client/cloud.py index 1676d418f333..101a7b3dc98c 100644 --- a/src/prefect/client/cloud.py +++ b/src/prefect/client/cloud.py @@ -1,5 +1,5 @@ import re -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, cast import anyio import httpx @@ -9,7 +9,11 @@ import prefect.context import prefect.settings from prefect.client.base import PrefectHttpxAsyncClient -from prefect.client.schemas.objects import Workspace +from prefect.client.schemas.objects import ( + IPAllowlist, + IPAllowlistMyAccessResponse, + Workspace, +) from prefect.exceptions import ObjectNotFound, PrefectException from prefect.settings import ( PREFECT_API_KEY, @@ -69,6 +73,26 @@ def __init__( **httpx_settings, enable_csrf_support=False ) + if match := ( + re.search(PARSE_API_URL_REGEX, host) + or re.search(PARSE_API_URL_REGEX, prefect.settings.PREFECT_API_URL.value()) + ): + self.account_id, self.workspace_id = match.groups() + + @property + def account_base_url(self) -> str: + if not self.account_id: + raise ValueError("Account ID not set") + + return f"accounts/{self.account_id}" + + @property + def workspace_base_url(self) -> str: + if not self.workspace_id: + raise ValueError("Workspace ID not set") + + return f"{self.account_base_url}/workspaces/{self.workspace_id}" + async def api_healthcheck(self): """ Attempts to connect to the Cloud API and raises the encountered exception if not @@ -86,11 +110,36 @@ async def read_workspaces(self) -> List[Workspace]: return workspaces async def read_worker_metadata(self) -> Dict[str, Any]: - configured_url = prefect.settings.PREFECT_API_URL.value() - account_id, workspace_id = re.findall(PARSE_API_URL_REGEX, configured_url)[0] - return await self.get( - f"accounts/{account_id}/workspaces/{workspace_id}/collections/work_pool_types" + response = await self.get( + f"{self.workspace_base_url}/collections/work_pool_types" ) + return cast(Dict[str, Any], response) + + async def read_account_settings(self) -> Dict[str, Any]: + response = await self.get(f"{self.account_base_url}/settings") + return cast(Dict[str, Any], response) + + async def update_account_settings(self, settings: Dict[str, Any]): + await self.request( + "PATCH", + f"{self.account_base_url}/settings", + json=settings, + ) + + async def read_account_ip_allowlist(self) -> IPAllowlist: + response = await self.get(f"{self.account_base_url}/ip_allowlist") + return IPAllowlist.model_validate(response) + + async def update_account_ip_allowlist(self, updated_allowlist: IPAllowlist): + await self.request( + "PUT", + f"{self.account_base_url}/ip_allowlist", + json=updated_allowlist.model_dump(mode="json"), + ) + + async def check_ip_allowlist_access(self) -> IPAllowlistMyAccessResponse: + response = await self.get(f"{self.account_base_url}/ip_allowlist/my_access") + return IPAllowlistMyAccessResponse.model_validate(response) async def __aenter__(self): await self._client.__aenter__() @@ -120,7 +169,7 @@ async def request(self, method, route, **kwargs): status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, ): - raise CloudUnauthorizedError + raise CloudUnauthorizedError(str(exc)) from exc elif exc.response.status_code == status.HTTP_404_NOT_FOUND: raise ObjectNotFound(http_exc=exc) from exc else: diff --git a/src/prefect/client/collections.py b/src/prefect/client/collections.py index bd380fbcadb9..12285d50a3d1 100644 --- a/src/prefect/client/collections.py +++ b/src/prefect/client/collections.py @@ -29,6 +29,6 @@ def get_collections_metadata_client( """ orchestration_client = get_client(httpx_settings=httpx_settings) if orchestration_client.server_type == ServerType.CLOUD: - return get_cloud_client(httpx_settings=httpx_settings) + return get_cloud_client(httpx_settings=httpx_settings, infer_cloud_url=True) else: return orchestration_client diff --git a/src/prefect/client/schemas/objects.py b/src/prefect/client/schemas/objects.py index 2d439713bfc9..476785cdf497 100644 --- a/src/prefect/client/schemas/objects.py +++ b/src/prefect/client/schemas/objects.py @@ -19,6 +19,7 @@ ConfigDict, Field, HttpUrl, + IPvAnyNetwork, SerializationInfo, field_validator, model_serializer, @@ -854,6 +855,35 @@ def __hash__(self): return hash(self.handle) +class IPAllowlistEntry(PrefectBaseModel): + ip_network: IPvAnyNetwork + enabled: bool + description: Optional[str] = Field( + default=None, description="A description of the IP entry." + ) + last_seen: Optional[str] = Field( + default=None, + description="The last time this IP was seen accessing Prefect Cloud.", + ) + + +class IPAllowlist(PrefectBaseModel): + """ + A Prefect Cloud IP allowlist. + + Expected payload for an IP allowlist from the Prefect Cloud API. + """ + + entries: List[IPAllowlistEntry] + + +class IPAllowlistMyAccessResponse(PrefectBaseModel): + """Expected payload for an IP allowlist access response from the Prefect Cloud API.""" + + allowed: bool + detail: str + + class BlockType(ObjectBaseModel): """An ORM representation of a block type""" diff --git a/tests/cli/cloud/__init__.py b/tests/cli/cloud/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/cli/test_cloud.py b/tests/cli/cloud/test_cloud.py similarity index 71% rename from tests/cli/test_cloud.py rename to tests/cli/cloud/test_cloud.py index 6574e795de59..e0f1952226b9 100644 --- a/tests/cli/test_cloud.py +++ b/tests/cli/cloud/test_cloud.py @@ -1121,481 +1121,3 @@ def test_set_workspace_with_less_than_10_workspaces(respx_mock): PREFECT_API_URL: bar_workspace.api_url(), PREFECT_API_KEY: "fake-key", } - - -def test_cannot_get_webhook_if_you_are_not_logged_in(): - cloud_profile = "cloud-foo" - save_profiles( - ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) - ) - - with use_profile(cloud_profile): - invoke_and_assert( - ["cloud", "webhook", "get", str(uuid.uuid4())], - expected_code=1, - expected_output=( - f"Currently not authenticated in profile {cloud_profile!r}. " - "Please log in with `prefect cloud login`." - ), - ) - - -def test_get_webhook_by_id(respx_mock): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - - webhook_id = str(uuid.uuid4()) - webhook = { - "id": webhook_id, - "name": "foobar", - "enabled": True, - "template": ( - '{ "event": "your.event.name", "resource": { "prefect.resource.id":' - ' "your.resource.id" } }' - ), - "slug": "your-webhook-slug", - } - - respx_mock.get(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( - return_value=httpx.Response( - status.HTTP_200_OK, - json=webhook, - ) - ) - - with use_profile("logged-in-profile"): - invoke_and_assert( - ["cloud", "webhook", "get", webhook_id], - expected_code=0, - expected_output_contains=[webhook["name"]], - ) - - -def test_cannot_list_webhooks_if_you_are_not_logged_in(): - cloud_profile = "cloud-foo" - save_profiles( - ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) - ) - - with use_profile(cloud_profile): - invoke_and_assert( - ["cloud", "webhook", "ls"], - expected_code=1, - expected_output=( - f"Currently not authenticated in profile {cloud_profile!r}. " - "Please log in with `prefect cloud login`." - ), - ) - - -def test_list_webhooks(respx_mock): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - - webhook1 = { - "id": str(uuid.uuid4()), - "name": "foobar", - "enabled": True, - "template": ( - '{ "event": "your.event.name", "resource": { "prefect.resource.id":' - ' "your.resource.id" } }' - ), - "slug": "your-webhook-slug", - } - webhook2 = { - "id": str(uuid.uuid4()), - "name": "bazzbuzz", - "enabled": True, - "template": ( - '{ "event": "your.event2.name", "resource": { "prefect.resource.id":' - ' "your.resource.id" } }' - ), - "slug": "your-webhook2-slug", - } - - respx_mock.post(f"{foo_workspace.api_url()}/webhooks/filter").mock( - return_value=httpx.Response( - status.HTTP_200_OK, - json=[webhook1, webhook2], - ) - ) - - with use_profile("logged-in-profile"): - invoke_and_assert( - ["cloud", "webhook", "ls"], - expected_code=0, - expected_output_contains=[webhook1["name"], webhook2["name"]], - ) - - -def test_cannot_create_webhook_if_you_are_not_logged_in(): - cloud_profile = "cloud-foo" - save_profiles( - ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) - ) - - with use_profile(cloud_profile): - invoke_and_assert( - ["cloud", "webhook", "create", "foobar-webhook", "-t", "some-template"], - expected_code=1, - expected_output=( - f"Currently not authenticated in profile {cloud_profile!r}. " - "Please log in with `prefect cloud login`." - ), - ) - - -def test_cannot_create_webhook_without_template(): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - - with use_profile("logged-in-profile"): - invoke_and_assert( - ["cloud", "webhook", "create", "foobar-webhook"], - expected_code=1, - expected_output_contains=( - "Please provide a Jinja2 template expression in the --template flag" - ), - ) - - -def test_create_webhook(respx_mock): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - - with use_profile("logged-in-profile"): - webhook_to_create = { - "name": "whoopity-whoop-webhook", - "description": "we be webhookin'", - "template": "{}", - } - respx_mock.post( - f"{foo_workspace.api_url()}/webhooks/", json=webhook_to_create - ).mock( - return_value=httpx.Response( - status.HTTP_201_CREATED, - json=webhook_to_create, - ) - ) - invoke_and_assert( - [ - "cloud", - "webhook", - "create", - webhook_to_create["name"], - "-t", - webhook_to_create["template"], - "-d", - webhook_to_create["description"], - ], - expected_code=0, - expected_output=f"Successfully created webhook {webhook_to_create['name']}", - ) - - -def test_cannot_rotate_webhook_if_you_are_not_logged_in(): - cloud_profile = "cloud-foo" - save_profiles( - ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) - ) - - with use_profile(cloud_profile): - invoke_and_assert( - ["cloud", "webhook", "rotate", str(uuid.uuid4())], - expected_code=1, - expected_output=( - f"Currently not authenticated in profile {cloud_profile!r}. " - "Please log in with `prefect cloud login`." - ), - ) - - -def test_rotate_webhook(respx_mock): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - webhook_id = str(uuid.uuid4()) - webhook_slug = "webhook-slug-1234" - - respx_mock.post(f"{foo_workspace.api_url()}/webhooks/{webhook_id}/rotate").mock( - return_value=httpx.Response( - status.HTTP_200_OK, - json={"slug": webhook_slug}, - ) - ) - - with use_profile("logged-in-profile"): - invoke_and_assert( - ["cloud", "webhook", "rotate", webhook_id], - expected_code=0, - user_input="y" + readchar.key.ENTER, - expected_output_contains=( - f"Successfully rotated webhook URL to {webhook_slug}" - ), - ) - - -def test_cannot_toggle_webhook_if_you_are_not_logged_in(): - cloud_profile = "cloud-foo" - save_profiles( - ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) - ) - - with use_profile(cloud_profile): - invoke_and_assert( - ["cloud", "webhook", "toggle", str(uuid.uuid4())], - expected_code=1, - expected_output=( - f"Currently not authenticated in profile {cloud_profile!r}. " - "Please log in with `prefect cloud login`." - ), - ) - - -def test_toggle_webhook(respx_mock): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - webhook_id = str(uuid.uuid4()) - - respx_mock.get(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( - return_value=httpx.Response( - status.HTTP_200_OK, - json={"enabled": True}, - ) - ) - - respx_mock.patch( - f"{foo_workspace.api_url()}/webhooks/{webhook_id}", json={"enabled": False} - ).mock( - return_value=httpx.Response( - status.HTTP_204_NO_CONTENT, - ) - ) - - with use_profile("logged-in-profile"): - invoke_and_assert( - ["cloud", "webhook", "toggle", webhook_id], - expected_code=0, - expected_output_contains="Webhook is now disabled", - ) - - -def test_cannot_update_webhook_if_you_are_not_logged_in(): - cloud_profile = "cloud-foo" - save_profiles( - ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) - ) - - with use_profile(cloud_profile): - invoke_and_assert( - ["cloud", "webhook", "update", str(uuid.uuid4())], - expected_code=1, - expected_output=( - f"Currently not authenticated in profile {cloud_profile!r}. " - "Please log in with `prefect cloud login`." - ), - ) - - -def test_update_webhook(respx_mock): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - - webhook_id = str(uuid.uuid4()) - new_webhook_name = "wowza-webhooks" - existing_webhook = { - "name": "this will change", - "description": "this won't change", - "template": "neither will this", - } - respx_mock.get(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( - return_value=httpx.Response( - status.HTTP_200_OK, - json=existing_webhook, - ) - ) - - request_body = { - **existing_webhook, - "name": new_webhook_name, - } - respx_mock.put( - f"{foo_workspace.api_url()}/webhooks/{webhook_id}", json=request_body - ).mock( - return_value=httpx.Response( - status.HTTP_204_NO_CONTENT, - ) - ) - - with use_profile("logged-in-profile"): - invoke_and_assert( - ["cloud", "webhook", "update", webhook_id, "--name", new_webhook_name], - expected_code=0, - expected_output=f"Successfully updated webhook {webhook_id}", - ) - - -def test_cannot_delete_webhook_if_you_are_not_logged_in(): - cloud_profile = "cloud-foo" - save_profiles( - ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) - ) - - with use_profile(cloud_profile): - invoke_and_assert( - ["cloud", "webhook", "delete", str(uuid.uuid4())], - expected_code=1, - expected_output=( - f"Currently not authenticated in profile {cloud_profile!r}. " - "Please log in with `prefect cloud login`." - ), - ) - - -def test_delete_webhook(respx_mock): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - webhook_id = str(uuid.uuid4()) - - respx_mock.delete(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( - return_value=httpx.Response( - status.HTTP_204_NO_CONTENT, - ) - ) - - with use_profile("logged-in-profile"): - invoke_and_assert( - ["cloud", "webhook", "delete", webhook_id], - expected_code=0, - user_input="y" + readchar.key.ENTER, - expected_output_contains=f"Successfully deleted webhook {webhook_id}", - ) - - -def test_webhook_methods_with_invalid_uuid(): - foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") - save_profiles( - ProfilesCollection( - [ - Profile( - name="logged-in-profile", - settings={ - PREFECT_API_URL: foo_workspace.api_url(), - PREFECT_API_KEY: "foo", - }, - ) - ], - active=None, - ) - ) - bad_webhook_id = "invalid_uuid" - - with use_profile("logged-in-profile"): - for cmd in ["delete", "toggle", "update", "rotate", "get"]: - invoke_and_assert( - ["cloud", "webhook", cmd, bad_webhook_id], - expected_code=2, - ) diff --git a/tests/cli/cloud/test_ip_allowlist.py b/tests/cli/cloud/test_ip_allowlist.py new file mode 100644 index 000000000000..56c29ee375b3 --- /dev/null +++ b/tests/cli/cloud/test_ip_allowlist.py @@ -0,0 +1,555 @@ +import ipaddress +from datetime import datetime +from typing import Optional + +import httpx +import pytest +from starlette import status +from tests.cli.cloud.test_cloud import gen_test_workspace + +from prefect.client.schemas.objects import IPAllowlist, IPAllowlistEntry +from prefect.context import use_profile +from prefect.settings import ( + PREFECT_API_KEY, + PREFECT_API_URL, + PREFECT_CLOUD_API_URL, + Profile, + ProfilesCollection, + save_profiles, +) +from prefect.testing.cli import invoke_and_assert + +SAMPLE_ALLOWLIST = IPAllowlist( + entries=[ + IPAllowlistEntry( + ip_network="192.168.1.1", # type: ignore + enabled=True, + description="Perimeter81 Gateway", + last_seen=str(datetime(2024, 8, 13, 16)), + ), + IPAllowlistEntry( + ip_network="192.168.1.0/24", # type: ignore + enabled=True, + description="CIDR block for 192.168.0.0 to 192.168.1.255", + ), + IPAllowlistEntry( + ip_network="2001:0db8:85a3:0000:0000:8a2e:0370:7334", # type: ignore + enabled=False, + description="An IPv6 address", + ), + IPAllowlistEntry( + ip_network="2001:0db8:85a3::/64", # type: ignore + enabled=True, + description="An IPv6 CIDR block", + ), + ] +) + + +@pytest.fixture +def workspace_with_logged_in_profile(): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + profile_name = "logged-in-profile" + save_profiles( + ProfilesCollection( + [ + Profile( + name=profile_name, + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + + return foo_workspace, profile_name + + +@pytest.fixture +def account_with_ip_allowlisting_enabled(respx_mock, workspace_with_logged_in_profile): + workspace, profile = workspace_with_logged_in_profile + respx_mock.get( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/settings" + ).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + # presence of the setting key means account has access to it. enable/disable toggles the value + json={"enforce_ip_allowlist": True}, + ) + ) + + return workspace, profile + + +def test_ip_allowlist_requires_access_to_ip_allowlisting( + respx_mock, workspace_with_logged_in_profile +): + workspace, profile = workspace_with_logged_in_profile + respx_mock.get( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/settings" + ).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + # absence of the setting key means account does not have access to it + json={}, + ) + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "enable"], + expected_code=1, + expected_output_contains="IP allowlisting is not available for this account", + ) + + +def test_ip_allowlist_enable(respx_mock, workspace_with_logged_in_profile): + workspace, profile = workspace_with_logged_in_profile + respx_mock.get( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/settings" + ).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"enforce_ip_allowlist": False}, + ) + ) + respx_mock.get( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist/my_access" + ).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"allowed": True, "detail": "You're in."}, + ) + ) + + respx_mock.patch( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/settings", + json={"enforce_ip_allowlist": True}, + ).mock( + return_value=httpx.Response( + status.HTTP_204_NO_CONTENT, + ) + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "enable"], + expected_code=0, + user_input="y", + expected_output_contains="IP allowlist enabled", + ) + + +def test_ip_allowlist_enable_already_enabled( + respx_mock, workspace_with_logged_in_profile +): + workspace, profile = workspace_with_logged_in_profile + respx_mock.get( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/settings" + ).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"enforce_ip_allowlist": True}, + ) + ) + + # should not make the PATCH request to enable it + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "enable"], + expected_code=0, + expected_output_contains="IP allowlist is already enabled", + ) + + +def test_ip_allowlist_enable_aborts_if_would_block_current_user( + respx_mock, workspace_with_logged_in_profile +): + workspace, profile = workspace_with_logged_in_profile + respx_mock.get( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/settings" + ).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"enforce_ip_allowlist": False}, + ) + ) + respx_mock.get( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist/my_access" + ).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"allowed": False, "detail": "You're not in."}, + ) + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "enable"], + expected_code=1, + expected_output_contains="Error enabling IP allowlist: You're not in.", + ) + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_disable(respx_mock, workspace_with_logged_in_profile): + workspace, profile = workspace_with_logged_in_profile + respx_mock.patch( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/settings", + json={"enforce_ip_allowlist": False}, + ).mock( + return_value=httpx.Response( + status.HTTP_204_NO_CONTENT, + ) + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "disable"], + expected_code=0, + expected_output_contains="IP allowlist disabled", + ) + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_ls(respx_mock, workspace_with_logged_in_profile): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=SAMPLE_ALLOWLIST.model_dump(mode="json"), + ) + ) + + every_expected_entry_ip = [ + str(entry.ip_network) for entry in SAMPLE_ALLOWLIST.entries + ] + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "ls"], + expected_code=0, + expected_output_contains=every_expected_entry_ip, + ) + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_ls_empty_list(respx_mock, workspace_with_logged_in_profile): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"entries": []}, + ) + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "ls"], + expected_code=0, + expected_output_contains="IP allowlist is empty. Add an entry", + ) + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +@pytest.mark.parametrize("description", [None, "some short description"]) +def test_ip_allowlist_add( + description: Optional[str], workspace_with_logged_in_profile, respx_mock +): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=SAMPLE_ALLOWLIST.model_dump(mode="json"), + ) + ) + expected_update_request = IPAllowlist( + entries=SAMPLE_ALLOWLIST.entries + + [ + IPAllowlistEntry( + ip_network="192.188.1.5", + enabled=True, + description=description, # type: ignore + ) + ] + ) + mocked_put_request = respx_mock.put( + url, json=expected_update_request.model_dump(mode="json") + ).mock(return_value=httpx.Response(status.HTTP_204_NO_CONTENT)) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "add", "192.188.1.5"] + + (["-d", description] if description else []), + expected_code=0, + ) + + assert mocked_put_request.called + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_add_invalid_ip(workspace_with_logged_in_profile, respx_mock): + _, profile = workspace_with_logged_in_profile + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "add", "258.235.432.234"], + expected_code=2, + expected_output_contains="Invalid value for 'IP address or range'", + ) + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_add_existing_ip_entry( + workspace_with_logged_in_profile, respx_mock +): + allowlist = IPAllowlist( + entries=[ + IPAllowlistEntry( + ip_network="192.168.1.1", # type: ignore + enabled=True, + description="original description", + ) + ] + ) + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, json=allowlist.model_dump(mode="json") + ) + ) + expected_update_request = allowlist.model_dump(mode="json") + expected_update_request["entries"][0]["description"] = "updated description" + mocked_put_request = respx_mock.put(url, json=expected_update_request).mock( + return_value=httpx.Response(status.HTTP_204_NO_CONTENT) + ) + + with use_profile(profile): + invoke_and_assert( + [ + "cloud", + "ip-allowlist", + "add", + str(SAMPLE_ALLOWLIST.entries[0].ip_network), + "-d", + "updated description", + ], + user_input="y", + expected_code=0, + ) + + assert mocked_put_request.called + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_remove(workspace_with_logged_in_profile, respx_mock): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=SAMPLE_ALLOWLIST.model_dump(mode="json"), + ) + ) + + entry_to_remove = SAMPLE_ALLOWLIST.entries[0] + expected_update_request = IPAllowlist( + entries=[ + entry + for entry in SAMPLE_ALLOWLIST.entries + if entry.ip_network != entry_to_remove.ip_network + ] + ) + mocked_put_request = respx_mock.put( + url, json=expected_update_request.model_dump(mode="json") + ).mock(return_value=httpx.Response(status.HTTP_204_NO_CONTENT)) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "remove", str(entry_to_remove.ip_network)], + expected_code=0, + ) + + assert mocked_put_request.called + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_handles_422_on_removal_of_own_ip_address( + workspace_with_logged_in_profile, respx_mock +): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + my_ip_entry = IPAllowlistEntry( + ip_network="127.0.0.1", # type: ignore + enabled=True, + description="My IP - cannot remove", + ) + allowlist = IPAllowlist( + entries=[ + my_ip_entry, + IPAllowlistEntry( + ip_network="192.168.1.0/24", # type: ignore + enabled=True, + description="CIDR block for 192.168.0.0 to 192.168.1.255", + ), + ] + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=allowlist.model_dump(mode="json"), + ) + ) + + allowlist.entries.remove(my_ip_entry) + expected_server_error = "Your current IP address (127.0.0.1) must be included in the IP allowlist to prevent account lockout." + respx_mock.put(url, json=allowlist.model_dump(mode="json")).mock( + return_value=httpx.Response( + status.HTTP_422_UNPROCESSABLE_ENTITY, + json={"detail": expected_server_error}, + ) + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "remove", "127.0.0.1"], + expected_code=1, + expected_output_contains=expected_server_error, + expected_line_count=1, + ) + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_toggle(workspace_with_logged_in_profile, respx_mock): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=SAMPLE_ALLOWLIST.model_dump(mode="json"), + ) + ) + + entry_to_toggle = SAMPLE_ALLOWLIST.entries[0] + expected_update_request = IPAllowlist( + entries=[ + IPAllowlistEntry( + ip_network=entry.ip_network, + enabled=not entry.enabled, + description=entry.description, + last_seen=entry.last_seen, + ) + if entry.ip_network == entry_to_toggle.ip_network + else entry + for entry in SAMPLE_ALLOWLIST.entries + ] + ) + mocked_put_request = respx_mock.put( + url, json=expected_update_request.model_dump(mode="json") + ).mock(return_value=httpx.Response(status.HTTP_204_NO_CONTENT)) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "toggle", str(entry_to_toggle.ip_network)], + expected_code=0, + ) + + assert mocked_put_request.called + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_toggle_nonexistent_entry( + workspace_with_logged_in_profile, respx_mock +): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=SAMPLE_ALLOWLIST.model_dump(mode="json"), + ) + ) + + nonexistent_ip = "192.168.0.1" + assert not any( + entry.ip_network == ipaddress.ip_network(nonexistent_ip) + for entry in SAMPLE_ALLOWLIST.entries + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "toggle", nonexistent_ip], + expected_code=1, + expected_output_contains=f"No entry found with IP address `{nonexistent_ip}`", + ) + + +@pytest.mark.usefixtures("account_with_ip_allowlisting_enabled") +def test_ip_allowlist_toggle_handles_422_on_disable_of_own_ip_address( + workspace_with_logged_in_profile, respx_mock +): + workspace, profile = workspace_with_logged_in_profile + url = ( + f"{PREFECT_CLOUD_API_URL.value()}/accounts/{workspace.account_id}/ip_allowlist" + ) + my_ip_entry = IPAllowlistEntry( + ip_network="127.0.0.1", # type: ignore + enabled=True, + description="My IP - cannot remove", + ) + allowlist = IPAllowlist( + entries=[ + my_ip_entry, + IPAllowlistEntry( + ip_network="192.168.1.0/24", # type: ignore + enabled=True, + description="CIDR block for 192.168.0.0 to 192.168.1.255", + ), + ] + ) + respx_mock.get(url).mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=allowlist.model_dump(mode="json"), + ) + ) + + my_ip_entry.enabled = False + expected_server_error = "Your current IP address (127.0.0.1) must be included in the IP allowlist to prevent account lockout." + respx_mock.put(url, json=allowlist.model_dump(mode="json")).mock( + return_value=httpx.Response( + status.HTTP_422_UNPROCESSABLE_ENTITY, + json={"detail": expected_server_error}, + ) + ) + + with use_profile(profile): + invoke_and_assert( + ["cloud", "ip-allowlist", "toggle", "127.0.0.1"], + expected_code=1, + expected_output_contains=expected_server_error, + expected_line_count=1, + ) diff --git a/tests/cli/cloud/test_webhook.py b/tests/cli/cloud/test_webhook.py new file mode 100644 index 000000000000..818e19b1b942 --- /dev/null +++ b/tests/cli/cloud/test_webhook.py @@ -0,0 +1,494 @@ +import uuid + +import httpx +import readchar +from starlette import status +from tests.cli.cloud.test_cloud import gen_test_workspace + +from prefect.context import use_profile +from prefect.settings import ( + PREFECT_API_KEY, + PREFECT_API_URL, + Profile, + ProfilesCollection, + save_profiles, +) +from prefect.testing.cli import invoke_and_assert + + +def test_cannot_get_webhook_if_you_are_not_logged_in(): + cloud_profile = "cloud-foo" + save_profiles( + ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) + ) + + with use_profile(cloud_profile): + invoke_and_assert( + ["cloud", "webhook", "get", str(uuid.uuid4())], + expected_code=1, + expected_output=( + f"Currently not authenticated in profile {cloud_profile!r}. " + "Please log in with `prefect cloud login`." + ), + ) + + +def test_get_webhook_by_id(respx_mock): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + + webhook_id = str(uuid.uuid4()) + webhook = { + "id": webhook_id, + "name": "foobar", + "enabled": True, + "template": ( + '{ "event": "your.event.name", "resource": { "prefect.resource.id":' + ' "your.resource.id" } }' + ), + "slug": "your-webhook-slug", + } + + respx_mock.get(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=webhook, + ) + ) + + with use_profile("logged-in-profile"): + invoke_and_assert( + ["cloud", "webhook", "get", webhook_id], + expected_code=0, + expected_output_contains=[webhook["name"]], + ) + + +def test_cannot_list_webhooks_if_you_are_not_logged_in(): + cloud_profile = "cloud-foo" + save_profiles( + ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) + ) + + with use_profile(cloud_profile): + invoke_and_assert( + ["cloud", "webhook", "ls"], + expected_code=1, + expected_output=( + f"Currently not authenticated in profile {cloud_profile!r}. " + "Please log in with `prefect cloud login`." + ), + ) + + +def test_list_webhooks(respx_mock): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + + webhook1 = { + "id": str(uuid.uuid4()), + "name": "foobar", + "enabled": True, + "template": ( + '{ "event": "your.event.name", "resource": { "prefect.resource.id":' + ' "your.resource.id" } }' + ), + "slug": "your-webhook-slug", + } + webhook2 = { + "id": str(uuid.uuid4()), + "name": "bazzbuzz", + "enabled": True, + "template": ( + '{ "event": "your.event2.name", "resource": { "prefect.resource.id":' + ' "your.resource.id" } }' + ), + "slug": "your-webhook2-slug", + } + + respx_mock.post(f"{foo_workspace.api_url()}/webhooks/filter").mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=[webhook1, webhook2], + ) + ) + + with use_profile("logged-in-profile"): + invoke_and_assert( + ["cloud", "webhook", "ls"], + expected_code=0, + expected_output_contains=[webhook1["name"], webhook2["name"]], + ) + + +def test_cannot_create_webhook_if_you_are_not_logged_in(): + cloud_profile = "cloud-foo" + save_profiles( + ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) + ) + + with use_profile(cloud_profile): + invoke_and_assert( + ["cloud", "webhook", "create", "foobar-webhook", "-t", "some-template"], + expected_code=1, + expected_output=( + f"Currently not authenticated in profile {cloud_profile!r}. " + "Please log in with `prefect cloud login`." + ), + ) + + +def test_cannot_create_webhook_without_template(): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + + with use_profile("logged-in-profile"): + invoke_and_assert( + ["cloud", "webhook", "create", "foobar-webhook"], + expected_code=1, + expected_output_contains=( + "Please provide a Jinja2 template expression in the --template flag" + ), + ) + + +def test_create_webhook(respx_mock): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + + with use_profile("logged-in-profile"): + webhook_to_create = { + "name": "whoopity-whoop-webhook", + "description": "we be webhookin'", + "template": "{}", + } + respx_mock.post( + f"{foo_workspace.api_url()}/webhooks/", json=webhook_to_create + ).mock( + return_value=httpx.Response( + status.HTTP_201_CREATED, + json=webhook_to_create, + ) + ) + invoke_and_assert( + [ + "cloud", + "webhook", + "create", + webhook_to_create["name"], + "-t", + webhook_to_create["template"], + "-d", + webhook_to_create["description"], + ], + expected_code=0, + expected_output=f"Successfully created webhook {webhook_to_create['name']}", + ) + + +def test_cannot_rotate_webhook_if_you_are_not_logged_in(): + cloud_profile = "cloud-foo" + save_profiles( + ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) + ) + + with use_profile(cloud_profile): + invoke_and_assert( + ["cloud", "webhook", "rotate", str(uuid.uuid4())], + expected_code=1, + expected_output=( + f"Currently not authenticated in profile {cloud_profile!r}. " + "Please log in with `prefect cloud login`." + ), + ) + + +def test_rotate_webhook(respx_mock): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + webhook_id = str(uuid.uuid4()) + webhook_slug = "webhook-slug-1234" + + respx_mock.post(f"{foo_workspace.api_url()}/webhooks/{webhook_id}/rotate").mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"slug": webhook_slug}, + ) + ) + + with use_profile("logged-in-profile"): + invoke_and_assert( + ["cloud", "webhook", "rotate", webhook_id], + expected_code=0, + user_input="y" + readchar.key.ENTER, + expected_output_contains=( + f"Successfully rotated webhook URL to {webhook_slug}" + ), + ) + + +def test_cannot_toggle_webhook_if_you_are_not_logged_in(): + cloud_profile = "cloud-foo" + save_profiles( + ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) + ) + + with use_profile(cloud_profile): + invoke_and_assert( + ["cloud", "webhook", "toggle", str(uuid.uuid4())], + expected_code=1, + expected_output=( + f"Currently not authenticated in profile {cloud_profile!r}. " + "Please log in with `prefect cloud login`." + ), + ) + + +def test_toggle_webhook(respx_mock): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + webhook_id = str(uuid.uuid4()) + + respx_mock.get(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json={"enabled": True}, + ) + ) + + respx_mock.patch( + f"{foo_workspace.api_url()}/webhooks/{webhook_id}", json={"enabled": False} + ).mock( + return_value=httpx.Response( + status.HTTP_204_NO_CONTENT, + ) + ) + + with use_profile("logged-in-profile"): + invoke_and_assert( + ["cloud", "webhook", "toggle", webhook_id], + expected_code=0, + expected_output_contains="Webhook is now disabled", + ) + + +def test_cannot_update_webhook_if_you_are_not_logged_in(): + cloud_profile = "cloud-foo" + save_profiles( + ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) + ) + + with use_profile(cloud_profile): + invoke_and_assert( + ["cloud", "webhook", "update", str(uuid.uuid4())], + expected_code=1, + expected_output=( + f"Currently not authenticated in profile {cloud_profile!r}. " + "Please log in with `prefect cloud login`." + ), + ) + + +def test_update_webhook(respx_mock): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + + webhook_id = str(uuid.uuid4()) + new_webhook_name = "wowza-webhooks" + existing_webhook = { + "name": "this will change", + "description": "this won't change", + "template": "neither will this", + } + respx_mock.get(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( + return_value=httpx.Response( + status.HTTP_200_OK, + json=existing_webhook, + ) + ) + + request_body = { + **existing_webhook, + "name": new_webhook_name, + } + respx_mock.put( + f"{foo_workspace.api_url()}/webhooks/{webhook_id}", json=request_body + ).mock( + return_value=httpx.Response( + status.HTTP_204_NO_CONTENT, + ) + ) + + with use_profile("logged-in-profile"): + invoke_and_assert( + ["cloud", "webhook", "update", webhook_id, "--name", new_webhook_name], + expected_code=0, + expected_output=f"Successfully updated webhook {webhook_id}", + ) + + +def test_cannot_delete_webhook_if_you_are_not_logged_in(): + cloud_profile = "cloud-foo" + save_profiles( + ProfilesCollection([Profile(name=cloud_profile, settings={})], active=None) + ) + + with use_profile(cloud_profile): + invoke_and_assert( + ["cloud", "webhook", "delete", str(uuid.uuid4())], + expected_code=1, + expected_output=( + f"Currently not authenticated in profile {cloud_profile!r}. " + "Please log in with `prefect cloud login`." + ), + ) + + +def test_delete_webhook(respx_mock): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + webhook_id = str(uuid.uuid4()) + + respx_mock.delete(f"{foo_workspace.api_url()}/webhooks/{webhook_id}").mock( + return_value=httpx.Response( + status.HTTP_204_NO_CONTENT, + ) + ) + + with use_profile("logged-in-profile"): + invoke_and_assert( + ["cloud", "webhook", "delete", webhook_id], + expected_code=0, + user_input="y" + readchar.key.ENTER, + expected_output_contains=f"Successfully deleted webhook {webhook_id}", + ) + + +def test_webhook_methods_with_invalid_uuid(): + foo_workspace = gen_test_workspace(account_handle="test", workspace_handle="foo") + save_profiles( + ProfilesCollection( + [ + Profile( + name="logged-in-profile", + settings={ + PREFECT_API_URL: foo_workspace.api_url(), + PREFECT_API_KEY: "foo", + }, + ) + ], + active=None, + ) + ) + bad_webhook_id = "invalid_uuid" + + with use_profile("logged-in-profile"): + for cmd in ["delete", "toggle", "update", "rotate", "get"]: + invoke_and_assert( + ["cloud", "webhook", cmd, bad_webhook_id], + expected_code=2, + )