diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 2763cc0a24..850b40a4f3 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -31,6 +31,12 @@ * Add ability to list release channels through `snow app release-channel list` command * Add ability to add and remove accounts from release channels through `snow app release-channel add-accounts` and snow app release-channel remove-accounts` commands. * Add ability to add/remove versions to/from release channels through `snow app release-channel add-version` and `snow app release-channel remove-version` commands. +* Add `snow spcs service events` command to retrieve service-specific events: + * Supports filtering by service name, container name, instance ID, time intervals (`--since`, `--until`), and pagination (`--first`, `--last`). + * Use `--all` to fetch all columns. +* Add `snow spcs service metrics` command to fetch service metrics: + * Supports filtering by service name, container name, instance ID, and time intervals (`--since`, `--until`). + * Use `--all` to fetch all columns. ## Fixes and improvements * Fixed crashes with older x86_64 Intel CPUs. diff --git a/src/snowflake/cli/_plugins/spcs/common.py b/src/snowflake/cli/_plugins/spcs/common.py index 100bdf6d82..6855a34419 100644 --- a/src/snowflake/cli/_plugins/spcs/common.py +++ b/src/snowflake/cli/_plugins/spcs/common.py @@ -14,7 +14,9 @@ from __future__ import annotations +import json import sys +from datetime import datetime from typing import TextIO from click import ClickException @@ -23,6 +25,22 @@ from snowflake.cli.api.project.util import unquote_identifier from snowflake.connector.errors import ProgrammingError +EVENT_COLUMN_NAMES = [ + "TIMESTAMP", + "START_TIMESTAMP", + "OBSERVED_TIMESTAMP", + "TRACE", + "RESOURCE", + "RESOURCE_ATTRIBUTES", + "SCOPE", + "SCOPE_ATTRIBUTES", + "RECORD_TYPE", + "RECORD", + "RECORD_ATTRIBUTES", + "VALUE", + "EXEMPLARS", +] + if not sys.stdout.closed and sys.stdout.isatty(): GREEN = "\033[32m" BLUE = "\033[34m" @@ -124,5 +142,116 @@ def new_logs_only(prev_log_records: list[str], new_log_records: list[str]) -> li return new_log_records_sorted +def build_resource_clause( + service_name: str, instance_id: str, container_name: str +) -> str: + resource_filters = [] + if service_name: + resource_filters.append( + f"resource_attributes:\"snow.service.name\" = '{service_name}'" + ) + if instance_id: + resource_filters.append( + f"(resource_attributes:\"snow.service.instance\" = '{instance_id}' " + f"OR resource_attributes:\"snow.service.container.instance\" = '{instance_id}')" + ) + if container_name: + resource_filters.append( + f"resource_attributes:\"snow.service.container.name\" = '{container_name}'" + ) + return " and ".join(resource_filters) if resource_filters else "1=1" + + +def build_time_clauses( + since: str | datetime | None, until: str | datetime | None +) -> tuple[str, str]: + since_clause = "" + until_clause = "" + + if isinstance(since, datetime): + since_clause = f"and timestamp >= '{since}'" + elif isinstance(since, str) and since: + since_clause = f"and timestamp >= sysdate() - interval '{since}'" + + if isinstance(until, datetime): + until_clause = f"and timestamp <= '{until}'" + elif isinstance(until, str) and until: + until_clause = f"and timestamp <= sysdate() - interval '{until}'" + + return since_clause, until_clause + + +def format_event_row(event_dict: dict) -> dict: + try: + resource_attributes = json.loads(event_dict.get("RESOURCE_ATTRIBUTES", "{}")) + record_attributes = json.loads(event_dict.get("RECORD_ATTRIBUTES", "{}")) + record = json.loads(event_dict.get("RECORD", "{}")) + + database_name = resource_attributes.get("snow.database.name", "N/A") + schema_name = resource_attributes.get("snow.schema.name", "N/A") + service_name = resource_attributes.get("snow.service.name", "N/A") + instance_name = resource_attributes.get("snow.service.instance", "N/A") + container_name = resource_attributes.get("snow.service.container.name", "N/A") + event_name = record_attributes.get("event.name", "Unknown Event") + event_value = event_dict.get("VALUE", "Unknown Value") + severity = record.get("severity_text", "Unknown Severity") + + return { + "TIMESTAMP": event_dict.get("TIMESTAMP", "N/A"), + "DATABASE NAME": database_name, + "SCHEMA NAME": schema_name, + "SERVICE NAME": service_name, + "INSTANCE ID": instance_name, + "CONTAINER NAME": container_name, + "SEVERITY": severity, + "EVENT NAME": event_name, + "EVENT VALUE": event_value, + } + except (json.JSONDecodeError, KeyError) as e: + raise RecordProcessingError(f"Error processing event row.") + + +def format_metric_row(metric_dict: dict) -> dict: + try: + resource_attributes = json.loads(metric_dict["RESOURCE_ATTRIBUTES"]) + record = json.loads(metric_dict["RECORD"]) + + database_name = resource_attributes.get("snow.database.name", "N/A") + schema_name = resource_attributes.get("snow.schema.name", "N/A") + service_name = resource_attributes.get("snow.service.name", "N/A") + instance_name = resource_attributes.get( + "snow.service.container.instance", "N/A" + ) + container_name = resource_attributes.get("snow.service.container.name", "N/A") + + metric_name = record["metric"].get("name", "Unknown Metric") + metric_value = metric_dict.get("VALUE", "Unknown Value") + + return { + "TIMESTAMP": metric_dict.get("TIMESTAMP", "N/A"), + "DATABASE NAME": database_name, + "SCHEMA NAME": schema_name, + "SERVICE NAME": service_name, + "INSTANCE ID": instance_name, + "CONTAINER NAME": container_name, + "METRIC NAME": metric_name, + "METRIC VALUE": metric_value, + } + except (json.JSONDecodeError, KeyError) as e: + raise RecordProcessingError(f"Error processing metric row.") + + +class RecordProcessingError(ClickException): + """Raised when processing an event or metric record fails due to invalid data.""" + + pass + + +class SPCSEventTableError(ClickException): + """Raised when there is an issue related to the SPCS event table.""" + + pass + + class NoPropertiesProvidedError(ClickException): pass diff --git a/src/snowflake/cli/_plugins/spcs/services/commands.py b/src/snowflake/cli/_plugins/spcs/services/commands.py index a5d9fdcba0..b8ede471d5 100644 --- a/src/snowflake/cli/_plugins/spcs/services/commands.py +++ b/src/snowflake/cli/_plugins/spcs/services/commands.py @@ -44,6 +44,7 @@ from snowflake.cli.api.feature_flags import FeatureFlag from snowflake.cli.api.identifiers import FQN from snowflake.cli.api.output.types import ( + CollectionResult, CommandResult, MessageResult, QueryJsonValueResult, @@ -59,6 +60,38 @@ short_help="Manages services.", ) +# Define common options +container_name_option = typer.Option( + ..., + "--container-name", + help="Name of the container.", + show_default=False, +) + +instance_id_option = typer.Option( + ..., + "--instance-id", + help="ID of the service instance, starting with 0.", + show_default=False, +) + +since_option = typer.Option( + default="", + help="Fetch events that are newer than this time ago, in Snowflake interval syntax.", +) + +until_option = typer.Option( + default="", + help="Fetch events that are older than this time ago, in Snowflake interval syntax.", +) + +show_all_columns_option = typer.Option( + False, + "--all", + is_flag=True, + help="Fetch all columns.", +) + def _service_name_callback(name: FQN) -> FQN: if not is_valid_object_name(name.identifier, max_depth=2, allow_quoted=False): @@ -213,18 +246,8 @@ def status(name: FQN = ServiceNameArgument, **options) -> CommandResult: @app.command(requires_connection=True) def logs( name: FQN = ServiceNameArgument, - container_name: str = typer.Option( - ..., - "--container-name", - help="Name of the container.", - show_default=False, - ), - instance_id: str = typer.Option( - ..., - "--instance-id", - help="ID of the service instance, starting with 0.", - show_default=False, - ), + container_name: str = container_name_option, + instance_id: str = instance_id_option, num_lines: int = typer.Option( DEFAULT_NUM_LINES, "--num-lines", help="Number of lines to retrieve." ), @@ -297,6 +320,99 @@ def logs( return StreamResult(cast(Generator[CommandResult, None, None], stream)) +@app.command(requires_connection=True) +def events( + name: FQN = ServiceNameArgument, + container_name: str = container_name_option, + instance_id: str = instance_id_option, + since: str = since_option, + until: str = until_option, + first: int = typer.Option( + default=None, + show_default=False, + help="Fetch only the first N events. Cannot be used with --last.", + ), + last: int = typer.Option( + default=None, + show_default=False, + help="Fetch only the last N events. Cannot be used with --first.", + ), + show_all_columns: bool = show_all_columns_option, + **options, +): + """ + Retrieve platform events for a service container. + """ + if FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_disabled(): + raise FeatureNotEnabledError( + "ENABLE_SPCS_SERVICE_EVENTS", + "Service events collection from SPCS event table is disabled.", + ) + + if first is not None and last is not None: + raise IncompatibleParametersError(["--first", "--last"]) + + manager = ServiceManager() + events = manager.get_events( + service_name=name.identifier, + container_name=container_name, + instance_id=instance_id, + since=since, + until=until, + first=first, + last=last, + show_all_columns=show_all_columns, + ) + + if not events: + return MessageResult("No events found.") + + return CollectionResult(events) + + +@app.command(requires_connection=True) +def metrics( + name: FQN = ServiceNameArgument, + container_name: str = container_name_option, + instance_id: str = instance_id_option, + since: str = since_option, + until: str = until_option, + show_all_columns: bool = show_all_columns_option, + **options, +): + """ + Retrieve platform metrics for a service container. + """ + if FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_disabled(): + raise FeatureNotEnabledError( + "ENABLE_SPCS_SERVICE_METRICS", + "Service metrics collection from SPCS event table is disabled.", + ) + + manager = ServiceManager() + if since or until: + metrics = manager.get_all_metrics( + service_name=name.identifier, + container_name=container_name, + instance_id=instance_id, + since=since, + until=until, + show_all_columns=show_all_columns, + ) + else: + metrics = manager.get_latest_metrics( + service_name=name.identifier, + container_name=container_name, + instance_id=instance_id, + show_all_columns=show_all_columns, + ) + + if not metrics: + return MessageResult("No metrics found.") + + return CollectionResult(metrics) + + @app.command(requires_connection=True) def upgrade( name: FQN = ServiceNameArgument, diff --git a/src/snowflake/cli/_plugins/spcs/services/manager.py b/src/snowflake/cli/_plugins/spcs/services/manager.py index 504fc4cc81..86d8dad9a5 100644 --- a/src/snowflake/cli/_plugins/spcs/services/manager.py +++ b/src/snowflake/cli/_plugins/spcs/services/manager.py @@ -16,14 +16,21 @@ import json import time +from datetime import datetime from pathlib import Path from typing import List, Optional import yaml from snowflake.cli._plugins.object.common import Tag from snowflake.cli._plugins.spcs.common import ( + EVENT_COLUMN_NAMES, NoPropertiesProvidedError, + SPCSEventTableError, + build_resource_clause, + build_time_clauses, filter_log_timestamp, + format_event_row, + format_metric_row, handle_object_already_exists, new_logs_only, strip_empty_lines, @@ -31,7 +38,7 @@ from snowflake.cli.api.constants import DEFAULT_SIZE_LIMIT_MB, ObjectType from snowflake.cli.api.secure_path import SecurePath from snowflake.cli.api.sql_execution import SqlExecutionMixin -from snowflake.connector.cursor import SnowflakeCursor +from snowflake.connector.cursor import DictCursor, SnowflakeCursor from snowflake.connector.errors import ProgrammingError @@ -199,6 +206,167 @@ def stream_logs( except KeyboardInterrupt: return + def get_account_event_table(self): + query = "show parameters like 'event_table' in account" + results = self.execute_query(query, cursor_class=DictCursor) + event_table = next( + (r["value"] for r in results if r["key"] == "EVENT_TABLE"), "" + ) + if not event_table: + raise SPCSEventTableError("No SPCS event table configured in the account.") + return event_table + + def get_events( + self, + service_name: str, + instance_id: str, + container_name: str, + since: str | datetime | None = None, + until: str | datetime | None = None, + first: Optional[int] = None, + last: Optional[int] = None, + show_all_columns: bool = False, + ): + + account_event_table = self.get_account_event_table() + resource_clause = build_resource_clause( + service_name, instance_id, container_name + ) + since_clause, until_clause = build_time_clauses(since, until) + + first_clause = f"limit {first}" if first is not None else "" + last_clause = f"limit {last}" if last is not None else "" + + query = f"""\ + select * + from ( + select * + from {account_event_table} + where ( + {resource_clause} + {since_clause} + {until_clause} + ) + and record_type = 'LOG' + and scope['name'] = 'snow.spcs.platform' + order by timestamp desc + {last_clause} + ) + order by timestamp asc + {first_clause} + """ + + cursor = self.execute_query(query) + raw_events = cursor.fetchall() + if not raw_events: + return [] + + if show_all_columns: + return [dict(zip(EVENT_COLUMN_NAMES, event)) for event in raw_events] + + formatted_events = [] + for raw_event in raw_events: + event_dict = dict(zip(EVENT_COLUMN_NAMES, raw_event)) + formatted = format_event_row(event_dict) + formatted_events.append(formatted) + + return formatted_events + + def get_all_metrics( + self, + service_name: str, + instance_id: str, + container_name: str, + since: str | datetime | None = None, + until: str | datetime | None = None, + show_all_columns: bool = False, + ): + + account_event_table = self.get_account_event_table() + resource_clause = build_resource_clause( + service_name, instance_id, container_name + ) + since_clause, until_clause = build_time_clauses(since, until) + + query = f"""\ + select * + from {account_event_table} + where ( + {resource_clause} + {since_clause} + {until_clause} + ) + and record_type = 'METRIC' + and scope['name'] = 'snow.spcs.platform' + order by timestamp desc + """ + + cursor = self.execute_query(query) + raw_metrics = cursor.fetchall() + if not raw_metrics: + return [] + + if show_all_columns: + return [dict(zip(EVENT_COLUMN_NAMES, metric)) for metric in raw_metrics] + + formatted_metrics = [] + for raw_metric in raw_metrics: + metric_dict = dict(zip(EVENT_COLUMN_NAMES, raw_metric)) + formatted = format_metric_row(metric_dict) + formatted_metrics.append(formatted) + + return formatted_metrics + + def get_latest_metrics( + self, + service_name: str, + instance_id: str, + container_name: str, + show_all_columns: bool = False, + ): + + account_event_table = self.get_account_event_table() + resource_clause = build_resource_clause( + service_name, instance_id, container_name + ) + + query = f""" + with rankedmetrics as ( + select + *, + row_number() over ( + partition by record['metric']['name'] + order by timestamp desc + ) as rank + from {account_event_table} + where + record_type = 'METRIC' + and scope['name'] = 'snow.spcs.platform' + and {resource_clause} + and timestamp > dateadd('hour', -1, current_timestamp) + ) + select * + from rankedmetrics + where rank = 1 + order by timestamp desc; + """ + + cursor = self.execute_query(query) + raw_metrics = cursor.fetchall() + if not raw_metrics: + return [] + + if show_all_columns: + return [dict(zip(EVENT_COLUMN_NAMES, metric)) for metric in raw_metrics] + + formatted_metrics = [] + for raw_metric in raw_metrics: + metric_dict = dict(zip(EVENT_COLUMN_NAMES, raw_metric)) + formatted = format_metric_row(metric_dict) + formatted_metrics.append(formatted) + + return formatted_metrics + def upgrade_spec(self, service_name: str, spec_path: Path): spec = self._read_yaml(spec_path) query = f"alter service {service_name} from specification $$ {spec} $$" diff --git a/src/snowflake/cli/api/feature_flags.py b/src/snowflake/cli/api/feature_flags.py index 2a56458083..2aefe4893e 100644 --- a/src/snowflake/cli/api/feature_flags.py +++ b/src/snowflake/cli/api/feature_flags.py @@ -64,3 +64,5 @@ class FeatureFlag(FeatureFlagMixin): "ENABLE_STREAMLIT_VERSIONED_STAGE", False ) ENABLE_SPCS_LOG_STREAMING = BooleanFlag("ENABLE_SPCS_LOG_STREAMING", False) + ENABLE_SPCS_SERVICE_EVENTS = BooleanFlag("ENABLE_SPCS_SERVICE_EVENTS", False) + ENABLE_SPCS_SERVICE_METRICS = BooleanFlag("ENABLE_SPCS_SERVICE_METRICS", False) diff --git a/tests/__snapshots__/test_help_messages.ambr b/tests/__snapshots__/test_help_messages.ambr index 9e8d9b808d..c47c99e1cc 100644 --- a/tests/__snapshots__/test_help_messages.ambr +++ b/tests/__snapshots__/test_help_messages.ambr @@ -7625,6 +7625,111 @@ +------------------------------------------------------------------------------+ + ''' +# --- +# name: test_help_messages[spcs.service.events] + ''' + + Usage: default spcs service events [OPTIONS] NAME + + Retrieve platform events for a service container. + + +- Arguments ------------------------------------------------------------------+ + | * name TEXT Identifier of the service; for example: my_service | + | [required] | + +------------------------------------------------------------------------------+ + +- Options --------------------------------------------------------------------+ + | * --container-name TEXT Name of the container. | + | [required] | + | * --instance-id TEXT ID of the service instance, starting | + | with 0. | + | [required] | + | --since TEXT Fetch events that are newer than this | + | time ago, in Snowflake interval | + | syntax. | + | --until TEXT Fetch events that are older than this | + | time ago, in Snowflake interval | + | syntax. | + | --first INTEGER Fetch only the first N events. Cannot | + | be used with --last. | + | --last INTEGER Fetch only the last N events. Cannot | + | be used with --first. | + | --all Fetch all columns. | + | --help -h Show this message and exit. | + +------------------------------------------------------------------------------+ + +- Connection configuration ---------------------------------------------------+ + | --connection,--environment -c TEXT Name of the connection, as | + | defined in your config.toml | + | file. Default: default. | + | --host TEXT Host address for the | + | connection. Overrides the | + | value specified for the | + | connection. | + | --port INTEGER Port for the connection. | + | Overrides the value | + | specified for the | + | connection. | + | --account,--accountname TEXT Name assigned to your | + | Snowflake account. Overrides | + | the value specified for the | + | connection. | + | --user,--username TEXT Username to connect to | + | Snowflake. Overrides the | + | value specified for the | + | connection. | + | --password TEXT Snowflake password. | + | Overrides the value | + | specified for the | + | connection. | + | --authenticator TEXT Snowflake authenticator. | + | Overrides the value | + | specified for the | + | connection. | + | --private-key-file,--privateā€¦ TEXT Snowflake private key file | + | path. Overrides the value | + | specified for the | + | connection. | + | --token-file-path TEXT Path to file with an OAuth | + | token that should be used | + | when connecting to Snowflake | + | --database,--dbname TEXT Database to use. Overrides | + | the value specified for the | + | connection. | + | --schema,--schemaname TEXT Database schema to use. | + | Overrides the value | + | specified for the | + | connection. | + | --role,--rolename TEXT Role to use. Overrides the | + | value specified for the | + | connection. | + | --warehouse TEXT Warehouse to use. Overrides | + | the value specified for the | + | connection. | + | --temporary-connection -x Uses connection defined with | + | command line parameters, | + | instead of one defined in | + | config | + | --mfa-passcode TEXT Token to use for | + | multi-factor authentication | + | (MFA) | + | --enable-diag Run Python connector | + | diagnostic test | + | --diag-log-path TEXT Diagnostic report path | + | --diag-allowlist-path TEXT Diagnostic report path to | + | optional allowlist | + +------------------------------------------------------------------------------+ + +- Global configuration -------------------------------------------------------+ + | --format [TABLE|JSON] Specifies the output format. | + | [default: TABLE] | + | --verbose -v Displays log entries for log levels info | + | and higher. | + | --debug Displays log entries for log levels debug | + | and higher; debug logs contain additional | + | information. | + | --silent Turns off intermediate output to console. | + +------------------------------------------------------------------------------+ + + ''' # --- # name: test_help_messages[spcs.service.execute-job] @@ -8287,6 +8392,105 @@ +------------------------------------------------------------------------------+ + ''' +# --- +# name: test_help_messages[spcs.service.metrics] + ''' + + Usage: default spcs service metrics [OPTIONS] NAME + + Retrieve platform metrics for a service container. + + +- Arguments ------------------------------------------------------------------+ + | * name TEXT Identifier of the service; for example: my_service | + | [required] | + +------------------------------------------------------------------------------+ + +- Options --------------------------------------------------------------------+ + | * --container-name TEXT Name of the container. | + | [required] | + | * --instance-id TEXT ID of the service instance, starting with | + | 0. | + | [required] | + | --since TEXT Fetch events that are newer than this | + | time ago, in Snowflake interval syntax. | + | --until TEXT Fetch events that are older than this | + | time ago, in Snowflake interval syntax. | + | --all Fetch all columns. | + | --help -h Show this message and exit. | + +------------------------------------------------------------------------------+ + +- Connection configuration ---------------------------------------------------+ + | --connection,--environment -c TEXT Name of the connection, as | + | defined in your config.toml | + | file. Default: default. | + | --host TEXT Host address for the | + | connection. Overrides the | + | value specified for the | + | connection. | + | --port INTEGER Port for the connection. | + | Overrides the value | + | specified for the | + | connection. | + | --account,--accountname TEXT Name assigned to your | + | Snowflake account. Overrides | + | the value specified for the | + | connection. | + | --user,--username TEXT Username to connect to | + | Snowflake. Overrides the | + | value specified for the | + | connection. | + | --password TEXT Snowflake password. | + | Overrides the value | + | specified for the | + | connection. | + | --authenticator TEXT Snowflake authenticator. | + | Overrides the value | + | specified for the | + | connection. | + | --private-key-file,--privateā€¦ TEXT Snowflake private key file | + | path. Overrides the value | + | specified for the | + | connection. | + | --token-file-path TEXT Path to file with an OAuth | + | token that should be used | + | when connecting to Snowflake | + | --database,--dbname TEXT Database to use. Overrides | + | the value specified for the | + | connection. | + | --schema,--schemaname TEXT Database schema to use. | + | Overrides the value | + | specified for the | + | connection. | + | --role,--rolename TEXT Role to use. Overrides the | + | value specified for the | + | connection. | + | --warehouse TEXT Warehouse to use. Overrides | + | the value specified for the | + | connection. | + | --temporary-connection -x Uses connection defined with | + | command line parameters, | + | instead of one defined in | + | config | + | --mfa-passcode TEXT Token to use for | + | multi-factor authentication | + | (MFA) | + | --enable-diag Run Python connector | + | diagnostic test | + | --diag-log-path TEXT Diagnostic report path | + | --diag-allowlist-path TEXT Diagnostic report path to | + | optional allowlist | + +------------------------------------------------------------------------------+ + +- Global configuration -------------------------------------------------------+ + | --format [TABLE|JSON] Specifies the output format. | + | [default: TABLE] | + | --verbose -v Displays log entries for log levels info | + | and higher. | + | --debug Displays log entries for log levels debug | + | and higher; debug logs contain additional | + | information. | + | --silent Turns off intermediate output to console. | + +------------------------------------------------------------------------------+ + + ''' # --- # name: test_help_messages[spcs.service.resume] @@ -8898,6 +9102,8 @@ | schema. | | describe Provides description of service. | | drop Drops service with given name. | + | events Retrieve platform events for a service | + | container. | | execute-job Creates and executes a job service in the | | current schema. | | list Lists all available services. | @@ -8907,6 +9113,8 @@ | list-roles Lists all service roles in a service. | | logs Retrieves local logs from a service | | container. | + | metrics Retrieve platform metrics for a service | + | container. | | resume Resumes the service from a SUSPENDED state. | | set Sets one or more properties for the | | service. | @@ -11001,6 +11209,8 @@ | schema. | | describe Provides description of service. | | drop Drops service with given name. | + | events Retrieve platform events for a service | + | container. | | execute-job Creates and executes a job service in the | | current schema. | | list Lists all available services. | @@ -11010,6 +11220,8 @@ | list-roles Lists all service roles in a service. | | logs Retrieves local logs from a service | | container. | + | metrics Retrieve platform metrics for a service | + | container. | | resume Resumes the service from a SUSPENDED state. | | set Sets one or more properties for the | | service. | diff --git a/tests/spcs/test_services.py b/tests/spcs/test_services.py index b48dedd1ce..d06fbb86d9 100644 --- a/tests/spcs/test_services.py +++ b/tests/spcs/test_services.py @@ -13,6 +13,7 @@ # limitations under the License. import itertools import json +import re from datetime import datetime from pathlib import Path from textwrap import dedent @@ -699,6 +700,405 @@ def test_logs_streaming_disabled(mock_is_disabled, runner): ), f"Expected formatted output not found: {result.output}" +@patch( + "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_disabled" +) +def test_service_events_disabled(mock_is_disabled, runner): + mock_is_disabled.return_value = True + result = runner.invoke( + [ + "spcs", + "service", + "events", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--since", + "1 minute", + ] + ) + assert ( + result.exit_code != 0 + ), "Expected a non-zero exit code due to feature flag being disabled" + + expected_output = ( + "+- Error ----------------------------------------------------------------------+\n" + "| Service events collection from SPCS event table is disabled. To enable it, |\n" + "| add 'ENABLE_SPCS_SERVICE_EVENTS = true' to '[cli.features]' section of your |\n" + "| configuration file. |\n" + "+------------------------------------------------------------------------------+\n" + ) + assert ( + result.output == expected_output + ), f"Expected formatted output not found: {result.output}" + + +@patch( + "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_disabled" +) +@patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") +def test_events_all_filters(mock_execute_query, mock_is_disabled, runner): + mock_is_disabled.return_value = False + mock_execute_query.side_effect = [ + [ + { + "key": "EVENT_TABLE", + "value": "event_table_db.data_schema.snowservices_logs", + } + ], + Mock( + fetchall=lambda: [ + ( + "2024-12-14 22:27:25.420", + None, + "2024-12-14 22:27:25.420", + None, + None, + json.dumps( + { + "snow.compute_pool.id": 230, + "snow.compute_pool.name": "MY_POOL", + "snow.database.id": 5, + "snow.database.name": "TESTDB", + "snow.schema.id": 5, + "snow.schema.name": "PUBLIC", + "snow.service.container.name": "log-printer", + "snow.service.id": 1568, + "snow.service.instance": "0", + "snow.service.name": "LOG_EVENT", + "snow.service.type": "SERVICE", + } + ), + json.dumps({"name": "snow.spcs.platform"}), + None, + "LOG", + json.dumps({"severity_text": "INFO"}), + json.dumps({"event.name": "CONTAINER.STATUS_CHANGE"}), + json.dumps({"message": "Running", "status": "READY"}), + None, + ) + ] + ), + ] + + result = runner.invoke( + [ + "spcs", + "service", + "events", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--since", + "2 hours", + "--until", + "1 hour", + "--last", + "10", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ] + ) + + assert result.exit_code == 0, f"Command failed with output: {result.output}" + + call_0 = mock_execute_query.mock_calls[0].args[0] + assert ( + call_0 == "show parameters like 'event_table' in account" + ), f"Unexpected query in Call 0: {call_0}" + + actual_query = mock_execute_query.mock_calls[1].args[0] + expected_query = ( + " select *\n" + " from (\n" + " select *\n" + " from event_table_db.data_schema.snowservices_logs\n" + " where (\n" + " resource_attributes:\"snow.service.name\" = 'LOG_EVENT' and (resource_attributes:\"snow.service.instance\" = '0' OR resource_attributes:\"snow.service.container.instance\" = '0') and resource_attributes:\"snow.service.container.name\" = 'log-printer'\n" + " and timestamp >= sysdate() - interval '2 hours'\n" + " and timestamp <= sysdate() - interval '1 hour'\n" + " )\n" + " and record_type = 'LOG'\n" + " and scope['name'] = 'snow.spcs.platform'\n" + " order by timestamp desc\n" + " limit 10\n" + " )\n" + " order by timestamp asc\n" + " \n" + " " + ) + + assert ( + actual_query == expected_query + ), f"Generated query does not match expected query.\n\nActual:\n{actual_query}\n\nExpected:\n{expected_query}" + + +@patch( + "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_disabled" +) +def test_events_first_last_incompatibility(mock_is_disabled, runner): + mock_is_disabled.return_value = False + result = runner.invoke( + [ + "spcs", + "service", + "events", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--first", + "10", + "--last", + "5", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ] + ) + + assert result.exit_code != 0, result.output + + expected_error = "Parameters '--first' and '--last' are incompatible" + assert expected_error in result.output + + +@patch( + "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_disabled" +) +def test_service_metrics_disabled(mock_is_disabled, runner): + mock_is_disabled.return_value = True + result = runner.invoke( + [ + "spcs", + "service", + "metrics", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--since", + "1 minute", + ] + ) + assert ( + result.exit_code != 0 + ), "Expected a non-zero exit code due to feature flag being disabled" + + expected_output = ( + "+- Error ----------------------------------------------------------------------+\n" + "| Service metrics collection from SPCS event table is disabled. To enable it, |\n" + "| add 'ENABLE_SPCS_SERVICE_METRICS = true' to '[cli.features]' section of your |\n" + "| configuration file. |\n" + "+------------------------------------------------------------------------------+\n" + ) + assert ( + result.output == expected_output + ), f"Expected formatted output not found: {result.output}" + + +@patch( + "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_disabled" +) +@patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") +def test_latest_metrics(mock_execute_query, mock_is_disabled, runner): + mock_is_disabled.return_value = False + mock_execute_query.side_effect = [ + [ + { + "key": "EVENT_TABLE", + "value": "event_table_db.data_schema.snowservices_logs", + } + ], + Mock( + fetchall=lambda: [ + ( + datetime(2024, 12, 10, 18, 53, 21, 809000), + datetime(2024, 12, 10, 18, 52, 51, 809000), + None, + None, + None, + json.dumps( + { + "snow.account.name": "XACCOUNTTEST1", + "snow.compute_pool.id": 20641, + "snow.compute_pool.name": "MY_POOL", + "snow.service.container.name": "log-printer", + "snow.service.name": "LOG_EVENT", + } + ), + json.dumps({"name": "snow.spcs.platform"}), + None, + "METRIC", + json.dumps( + {"metric": {"name": "container.cpu.usage", "unit": "cpu"}} + ), + None, + "0.0005007168666666691", + None, + ) + ] + ), + ] + + result = runner.invoke( + [ + "spcs", + "service", + "metrics", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ] + ) + + assert result.exit_code == 0, f"Command failed with output: {result.output}" + + call_0 = mock_execute_query.mock_calls[0].args[0] + assert ( + call_0 == "show parameters like 'event_table' in account" + ), f"Unexpected query in Call 0: {call_0}" + + actual_query = mock_execute_query.mock_calls[1].args[0] + expected_query = ( + "\n" + " with rankedmetrics as (\n" + " select \n" + " *,\n" + " row_number() over (\n" + " partition by record['metric']['name'] \n" + " order by timestamp desc\n" + " ) as rank\n" + " from event_table_db.data_schema.snowservices_logs\n" + " where \n" + " record_type = 'METRIC'\n" + " and scope['name'] = 'snow.spcs.platform'\n" + " and resource_attributes:\"snow.service.name\" = 'LOG_EVENT' and (resource_attributes:\"snow.service.instance\" = '0' OR resource_attributes:\"snow.service.container.instance\" = '0') and resource_attributes:\"snow.service.container.name\" = 'log-printer' \n" + " and timestamp > dateadd('hour', -1, current_timestamp) \n" + " )\n" + " select *\n" + " from rankedmetrics\n" + " where rank = 1\n" + " order by timestamp desc;\n" + " " + ) + + actual_normalized = normalize_query(actual_query) + expected_normalized = normalize_query(expected_query) + + assert actual_normalized == expected_normalized, ( + f"Generated query does not match expected query.\n\n" + f"Actual:\n{actual_query}\n\nExpected:\n{expected_query}" + ) + + +@patch( + "snowflake.cli.api.feature_flags.FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_disabled" +) +@patch("snowflake.cli._plugins.spcs.services.manager.ServiceManager.execute_query") +def test_metrics_all_filters(mock_execute_query, mock_is_disabled, runner): + mock_is_disabled.return_value = False + mock_execute_query.side_effect = [ + [ + { + "key": "EVENT_TABLE", + "value": "event_table_db.data_schema.snowservices_logs", + } + ], + Mock( + fetchall=lambda: [ + ( + datetime(2024, 12, 10, 18, 53, 21, 809000), + datetime(2024, 12, 10, 18, 52, 51, 809000), + None, + None, + None, + json.dumps( + { + "snow.account.name": "XACCOUNTTEST1", + "snow.compute_pool.id": 20641, + "snow.compute_pool.name": "MY_POOL", + "snow.service.container.name": "log-printer", + "snow.service.name": "LOG_EVENT", + } + ), + json.dumps({"name": "snow.spcs.platform"}), + None, + "METRIC", + json.dumps( + {"metric": {"name": "container.cpu.usage", "unit": "cpu"}} + ), + None, + "0.0005007168666666691", + None, + ) + ] + ), + ] + + result = runner.invoke( + [ + "spcs", + "service", + "metrics", + "LOG_EVENT", + "--container-name", + "log-printer", + "--instance-id", + "0", + "--since", + "2 hour", + "--until", + "1 hour", + "--warehouse", + "XSMALL", + "--role", + "sysadmin", + ] + ) + + assert result.exit_code == 0, f"Command failed with output: {result.output}" + + call_0 = mock_execute_query.mock_calls[0].args[0] + assert ( + call_0 == "show parameters like 'event_table' in account" + ), f"Unexpected query in Call 0: {call_0}" + + actual_query = mock_execute_query.mock_calls[1].args[0] + expected_query = ( + " select *\n" + " from event_table_db.data_schema.snowservices_logs\n" + " where (\n" + " resource_attributes:\"snow.service.name\" = 'LOG_EVENT' and (resource_attributes:\"snow.service.instance\" = '0' OR resource_attributes:\"snow.service.container.instance\" = '0') and resource_attributes:\"snow.service.container.name\" = 'log-printer'\n" + " and timestamp >= sysdate() - interval '2 hour'\n" + " and timestamp <= sysdate() - interval '1 hour'\n" + " )\n" + " and record_type = 'METRIC'\n" + " and scope['name'] = 'snow.spcs.platform'\n" + " order by timestamp desc\n" + " " + ) + + assert ( + actual_query == expected_query + ), f"Generated query does not match expected query.\n\nActual:\n{actual_query}\n\nExpected:\n{expected_query}" + + def test_read_yaml(other_directory): tmp_dir = Path(other_directory) spec_path = tmp_dir / "spec.yml" @@ -1152,3 +1552,8 @@ def test_command_aliases(mock_connector, runner, mock_ctx, command, parameters): queries = ctx.get_queries() assert queries[0] == queries[1] + + +def normalize_query(query): + """Normalize SQL query by stripping extra whitespace and formatting.""" + return re.sub(r"\s+", " ", query.strip())