Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1843926: SPCS service events & metrics #1954

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
* Add ability to list release channels through `snow app release-channel list` command
* Add ability to add and remove accounts from release channels through `snow app release-channel add-accounts` and snow app release-channel remove-accounts` commands.
* Add ability to add/remove versions to/from release channels through `snow app release-channel add-version` and `snow app release-channel remove-version` commands.
* Add `snow spcs service events` command to retrieve service-specific events:
* Supports filtering by service name, container name, instance ID, time intervals (`--since`, `--until`), and pagination (`--first`, `--last`).
* Use `--all` to fetch all columns.
* Add `snow spcs service metrics` command to fetch service metrics:
* Supports filtering by service name, container name, instance ID, and time intervals (`--since`, `--until`).
* Use `--all` to fetch all columns.

## Fixes and improvements
* Fixed crashes with older x86_64 Intel CPUs.
Expand Down
129 changes: 129 additions & 0 deletions src/snowflake/cli/_plugins/spcs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

from __future__ import annotations

import json
sfc-gh-ashen marked this conversation as resolved.
Show resolved Hide resolved
import sys
from datetime import datetime
from typing import TextIO

from click import ClickException
Expand All @@ -23,6 +25,22 @@
from snowflake.cli.api.project.util import unquote_identifier
from snowflake.connector.errors import ProgrammingError

EVENT_COLUMN_NAMES = [
"TIMESTAMP",
"START_TIMESTAMP",
"OBSERVED_TIMESTAMP",
"TRACE",
"RESOURCE",
"RESOURCE_ATTRIBUTES",
"SCOPE",
"SCOPE_ATTRIBUTES",
"RECORD_TYPE",
"RECORD",
"RECORD_ATTRIBUTES",
"VALUE",
"EXEMPLARS",
]

if not sys.stdout.closed and sys.stdout.isatty():
GREEN = "\033[32m"
BLUE = "\033[34m"
Expand Down Expand Up @@ -124,5 +142,116 @@ def new_logs_only(prev_log_records: list[str], new_log_records: list[str]) -> li
return new_log_records_sorted


def build_resource_clause(
service_name: str, instance_id: str, container_name: str
) -> str:
resource_filters = []
if service_name:
resource_filters.append(
f"resource_attributes:\"snow.service.name\" = '{service_name}'"
)
if instance_id:
resource_filters.append(
f"(resource_attributes:\"snow.service.instance\" = '{instance_id}' "
f"OR resource_attributes:\"snow.service.container.instance\" = '{instance_id}')"
)
if container_name:
resource_filters.append(
f"resource_attributes:\"snow.service.container.name\" = '{container_name}'"
)
return " and ".join(resource_filters) if resource_filters else "1=1"


def build_time_clauses(
since: str | datetime | None, until: str | datetime | None
) -> tuple[str, str]:
since_clause = ""
until_clause = ""

if isinstance(since, datetime):
since_clause = f"and timestamp >= '{since}'"
elif isinstance(since, str) and since:
since_clause = f"and timestamp >= sysdate() - interval '{since}'"

if isinstance(until, datetime):
until_clause = f"and timestamp <= '{until}'"
elif isinstance(until, str) and until:
until_clause = f"and timestamp <= sysdate() - interval '{until}'"

return since_clause, until_clause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since_clause and until_clause are always used together - how about returning them as a single string (" AND ".join([since_clause, until_clause]))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we want customers have the flexibility of using these individually, Hence separated clauses



def format_event_row(event_dict: dict) -> dict:
try:
resource_attributes = json.loads(event_dict.get("RESOURCE_ATTRIBUTES", "{}"))
record_attributes = json.loads(event_dict.get("RECORD_ATTRIBUTES", "{}"))
record = json.loads(event_dict.get("RECORD", "{}"))

database_name = resource_attributes.get("snow.database.name", "N/A")
schema_name = resource_attributes.get("snow.schema.name", "N/A")
service_name = resource_attributes.get("snow.service.name", "N/A")
instance_name = resource_attributes.get("snow.service.instance", "N/A")
container_name = resource_attributes.get("snow.service.container.name", "N/A")
event_name = record_attributes.get("event.name", "Unknown Event")
event_value = event_dict.get("VALUE", "Unknown Value")
severity = record.get("severity_text", "Unknown Severity")

return {
"TIMESTAMP": event_dict.get("TIMESTAMP", "N/A"),
"DATABASE NAME": database_name,
"SCHEMA NAME": schema_name,
"SERVICE NAME": service_name,
"INSTANCE ID": instance_name,
"CONTAINER NAME": container_name,
"SEVERITY": severity,
"EVENT NAME": event_name,
"EVENT VALUE": event_value,
}
except (json.JSONDecodeError, KeyError) as e:
raise RecordProcessingError(f"Error processing event row.")


def format_metric_row(metric_dict: dict) -> dict:
try:
resource_attributes = json.loads(metric_dict["RESOURCE_ATTRIBUTES"])
record = json.loads(metric_dict["RECORD"])

database_name = resource_attributes.get("snow.database.name", "N/A")
schema_name = resource_attributes.get("snow.schema.name", "N/A")
service_name = resource_attributes.get("snow.service.name", "N/A")
instance_name = resource_attributes.get(
"snow.service.container.instance", "N/A"
)
container_name = resource_attributes.get("snow.service.container.name", "N/A")

metric_name = record["metric"].get("name", "Unknown Metric")
metric_value = metric_dict.get("VALUE", "Unknown Value")

return {
"TIMESTAMP": metric_dict.get("TIMESTAMP", "N/A"),
"DATABASE NAME": database_name,
"SCHEMA NAME": schema_name,
"SERVICE NAME": service_name,
"INSTANCE ID": instance_name,
"CONTAINER NAME": container_name,
"METRIC NAME": metric_name,
"METRIC VALUE": metric_value,
}
except (json.JSONDecodeError, KeyError) as e:
raise RecordProcessingError(f"Error processing metric row.")


class RecordProcessingError(ClickException):
"""Raised when processing an event or metric record fails due to invalid data."""

pass


class SPCSEventTableError(ClickException):
"""Raised when there is an issue related to the SPCS event table."""

pass


class NoPropertiesProvidedError(ClickException):
pass
140 changes: 128 additions & 12 deletions src/snowflake/cli/_plugins/spcs/services/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from snowflake.cli.api.feature_flags import FeatureFlag
from snowflake.cli.api.identifiers import FQN
from snowflake.cli.api.output.types import (
CollectionResult,
CommandResult,
MessageResult,
QueryJsonValueResult,
Expand All @@ -59,6 +60,38 @@
short_help="Manages services.",
)

# Define common options
container_name_option = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
)

instance_id_option = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
)

since_option = typer.Option(
default="",
help="Fetch events that are newer than this time ago, in Snowflake interval syntax.",
)

until_option = typer.Option(
default="",
help="Fetch events that are older than this time ago, in Snowflake interval syntax.",
)

show_all_columns_option = typer.Option(
False,
"--all",
is_flag=True,
help="Fetch all columns.",
)


def _service_name_callback(name: FQN) -> FQN:
if not is_valid_object_name(name.identifier, max_depth=2, allow_quoted=False):
Expand Down Expand Up @@ -213,18 +246,8 @@ def status(name: FQN = ServiceNameArgument, **options) -> CommandResult:
@app.command(requires_connection=True)
def logs(
name: FQN = ServiceNameArgument,
container_name: str = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
),
instance_id: str = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
),
container_name: str = container_name_option,
instance_id: str = instance_id_option,
num_lines: int = typer.Option(
DEFAULT_NUM_LINES, "--num-lines", help="Number of lines to retrieve."
),
Expand Down Expand Up @@ -297,6 +320,99 @@ def logs(
return StreamResult(cast(Generator[CommandResult, None, None], stream))


@app.command(requires_connection=True)
def events(
name: FQN = ServiceNameArgument,
sfc-gh-tkommineni marked this conversation as resolved.
Show resolved Hide resolved
container_name: str = container_name_option,
instance_id: str = instance_id_option,
since: str = since_option,
until: str = until_option,
first: int = typer.Option(
default=None,
show_default=False,
help="Fetch only the first N events. Cannot be used with --last.",
),
last: int = typer.Option(
default=None,
show_default=False,
help="Fetch only the last N events. Cannot be used with --first.",
),
show_all_columns: bool = show_all_columns_option,
**options,
):
"""
Retrieve platform events for a service container.
"""
if FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_disabled():
raise FeatureNotEnabledError(
"ENABLE_SPCS_SERVICE_EVENTS",
"Service events collection from SPCS event table is disabled.",
)

if first is not None and last is not None:
raise IncompatibleParametersError(["--first", "--last"])

manager = ServiceManager()
events = manager.get_events(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
first=first,
last=last,
show_all_columns=show_all_columns,
)

if not events:
return MessageResult("No events found.")

return CollectionResult(events)


@app.command(requires_connection=True)
def metrics(
name: FQN = ServiceNameArgument,
container_name: str = container_name_option,
instance_id: str = instance_id_option,
since: str = since_option,
until: str = until_option,
show_all_columns: bool = show_all_columns_option,
**options,
):
"""
Retrieve platform metrics for a service container.
"""
if FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_disabled():
raise FeatureNotEnabledError(
"ENABLE_SPCS_SERVICE_METRICS",
"Service metrics collection from SPCS event table is disabled.",
)

manager = ServiceManager()
if since or until:
metrics = manager.get_all_metrics(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
show_all_columns=show_all_columns,
)
else:
metrics = manager.get_latest_metrics(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
show_all_columns=show_all_columns,
)

if not metrics:
sfc-gh-tkommineni marked this conversation as resolved.
Show resolved Hide resolved
return MessageResult("No metrics found.")

return CollectionResult(metrics)


@app.command(requires_connection=True)
def upgrade(
name: FQN = ServiceNameArgument,
Expand Down
Loading