Skip to content

Commit

Permalink
spcs service events&metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ashen committed Dec 13, 2024
1 parent daa2e22 commit 0700514
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 1 deletion.
147 changes: 147 additions & 0 deletions src/snowflake/cli/_plugins/spcs/services/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from snowflake.cli.api.exceptions import IncompatibleParametersError
from snowflake.cli.api.identifiers import FQN
from snowflake.cli.api.output.types import (
CollectionResult,
CommandResult,
MessageResult,
QueryJsonValueResult,
Expand Down Expand Up @@ -288,6 +289,152 @@ def logs(
return StreamResult(cast(Generator[CommandResult, None, None], stream))


@app.command(requires_connection=True)
def events(
name: FQN = ServiceNameArgument,
container_name: str = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
),
instance_id: str = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
),
since: str = typer.Option(
default="",
help="Fetch events that are newer than this time ago, in Snowflake interval syntax.",
),
until: str = typer.Option(
default="",
help="Fetch events that are older than this time ago, in Snowflake interval syntax.",
),
first: int = typer.Option(
default=-1,
show_default=False,
help="Fetch only the first N events. Cannot be used with --last.",
),
last: int = typer.Option(
default=-1,
show_default=False,
help="Fetch only the last N events. Cannot be used with --first.",
),
**options,
):
"""Fetches events for this app from the event table configured in Snowflake."""
if first >= 0 and last >= 0:
raise IncompatibleParametersError(["--first", "--last"])

manager = ServiceManager()
column_names = [
"TIMESTAMP",
"START_TIMESTAMP",
"OBSERVED_TIMESTAMP",
"TRACE",
"RESOURCE",
"RESOURCE_ATTRIBUTES",
"SCOPE",
"SCOPE_ATTRIBUTES",
"RECORD_TYPE",
"RECORD",
"RECORD_ATTRIBUTES",
"VALUE",
"EXEMPLARS",
]

events = manager.get_events(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
first=first,
last=last,
)
transformed_events = [dict(zip(column_names, event)) for event in events]

if not transformed_events:
return MessageResult("No events found.")

return CollectionResult(transformed_events)


@app.command(requires_connection=True)
def metrics(
name: FQN = ServiceNameArgument,
container_name: str = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
),
instance_id: str = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
),
since: str = typer.Option(
default="",
help="Fetch events that are newer than this time ago, in Snowflake interval syntax.",
),
until: str = typer.Option(
default="",
help="Fetch events that are older than this time ago, in Snowflake interval syntax.",
),
first: int = typer.Option(
default=-1,
show_default=False,
help="Fetch only the first N events. Cannot be used with --last.",
),
last: int = typer.Option(
default=-1,
show_default=False,
help="Fetch only the last N events. Cannot be used with --first.",
),
**options,
):
"""Fetches events for this app from the event table configured in Snowflake."""
if first >= 0 and last >= 0:
raise IncompatibleParametersError(["--first", "--last"])

manager = ServiceManager()
column_names = [
"TIMESTAMP",
"START_TIMESTAMP",
"OBSERVED_TIMESTAMP",
"TRACE",
"RESOURCE",
"RESOURCE_ATTRIBUTES",
"SCOPE",
"SCOPE_ATTRIBUTES",
"RECORD_TYPE",
"RECORD",
"RECORD_ATTRIBUTES",
"VALUE",
"EXEMPLARS",
]

metrics = manager.get_metrics(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
first=first,
last=last,
)
transformed_metrics = [dict(zip(column_names, metrix)) for metrix in metrics]

if not transformed_metrics:
return MessageResult("No metrics found.")

return CollectionResult(transformed_metrics)


@app.command(requires_connection=True)
def upgrade(
name: FQN = ServiceNameArgument,
Expand Down
143 changes: 142 additions & 1 deletion src/snowflake/cli/_plugins/spcs/services/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import json
import time
from datetime import datetime
from pathlib import Path
from typing import List, Optional

Expand All @@ -31,7 +32,7 @@
from snowflake.cli.api.constants import DEFAULT_SIZE_LIMIT_MB, ObjectType
from snowflake.cli.api.secure_path import SecurePath
from snowflake.cli.api.sql_execution import SqlExecutionMixin
from snowflake.connector.cursor import SnowflakeCursor
from snowflake.connector.cursor import DictCursor, SnowflakeCursor
from snowflake.connector.errors import ProgrammingError


Expand Down Expand Up @@ -199,6 +200,146 @@ def stream_logs(
except KeyboardInterrupt:
return

def get_account_event_table(self) -> str:
query = "show parameters like 'event_table' in account"
results = self.execute_query(query, cursor_class=DictCursor)
return next((r["value"] for r in results if r["key"] == "EVENT_TABLE"), "")

def get_events(
self,
service_name: str,
instance_id: str,
container_name: str,
since: str | datetime | None = None,
until: str | datetime | None = None,
first: int = -1,
last: int = -1,
):

account_event_table = self.get_account_event_table()
if not account_event_table:
return []

resource_filters = []
if service_name:
resource_filters.append(
f"resource_attributes:\"snow.service.name\" = '{service_name}'"
)
if instance_id:
resource_filters.append(
f"(resource_attributes:\"snow.service.instance\" = '{instance_id}' OR resource_attributes:\"snow.service.container.instance\" = '{instance_id}')"
)
if container_name:
resource_filters.append(
f"resource_attributes:\"snow.service.container.name\" = '{container_name}'"
)

resource_clause = " and ".join(resource_filters) if resource_filters else "1=1"

if isinstance(since, datetime):
since_clause = f"and timestamp >= '{since}'"
elif isinstance(since, str) and since:
since_clause = f"and timestamp >= sysdate() - interval '{since}'"
else:
since_clause = ""

if isinstance(until, datetime):
until_clause = f"and timestamp <= '{until}'"
elif isinstance(until, str) and until:
until_clause = f"and timestamp <= sysdate() - interval '{until}'"
else:
until_clause = ""

first_clause = f"limit {first}" if first >= 0 else ""
last_clause = f"limit {last}" if last >= 0 else ""

query = self.execute_query(
f"""\
select * from (
select *
from {account_event_table}
where (
{resource_clause}
{since_clause}
{until_clause}
)
AND RECORD_TYPE = 'LOG'
AND SCOPE['name'] = 'snow.spcs.platform'
order by timestamp desc
{last_clause}
) order by timestamp asc
{first_clause}
"""
)
return query

def get_metrics(
self,
service_name: str,
instance_id: str,
container_name: str,
since: str | datetime | None = None,
until: str | datetime | None = None,
first: int = -1,
last: int = -1,
):
account_event_table = self.get_account_event_table()
if not account_event_table:
return []

resource_filters = []
if service_name:
resource_filters.append(
f"resource_attributes:\"snow.service.name\" = '{service_name}'"
)
if instance_id:
resource_filters.append(
f"(resource_attributes:\"snow.service.instance\" = '{instance_id}' OR resource_attributes:\"snow.service.container.instance\" = '{instance_id}')"
)
if container_name:
resource_filters.append(
f"resource_attributes:\"snow.service.container.name\" = '{container_name}'"
)

resource_clause = " and ".join(resource_filters) if resource_filters else "1=1"

if isinstance(since, datetime):
since_clause = f"and timestamp >= '{since}'"
elif isinstance(since, str) and since:
since_clause = f"and timestamp >= sysdate() - interval '{since}'"
else:
since_clause = ""

if isinstance(until, datetime):
until_clause = f"and timestamp <= '{until}'"
elif isinstance(until, str) and until:
until_clause = f"and timestamp <= sysdate() - interval '{until}'"
else:
until_clause = ""

first_clause = f"limit {first}" if first >= 0 else ""
last_clause = f"limit {last}" if last >= 0 else ""

query = self.execute_query(
f"""\
select * from (
select *
from {account_event_table}
where (
{resource_clause}
{since_clause}
{until_clause}
)
AND RECORD_TYPE = 'METRIC'
AND SCOPE['name'] = 'snow.spcs.platform'
order by timestamp desc
{last_clause}
) order by timestamp asc
{first_clause}
"""
)
return query

def upgrade_spec(self, service_name: str, spec_path: Path):
spec = self._read_yaml(spec_path)
query = f"alter service {service_name} from specification $$ {spec} $$"
Expand Down

0 comments on commit 0700514

Please sign in to comment.