Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NADE][Bugfix] Use resolved app warehouse for regionless query #1292

Merged
merged 8 commits into from
Jul 15, 2024
29 changes: 29 additions & 0 deletions src/snowflake/cli/api/sql_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,35 @@ def use_role(self, new_role: str):
if is_different_role:
self._execute_query(f"use role {prev_role}")

@contextmanager
def use_warehouse(self, new_wh: str):
"""
Switches to a different warehouse for a while, then switches back.
This is a no-op if the requested warehouse is already active.
If there is no default warehouse in the account, it will throw an error.
"""

wh_result = self._execute_query(
f"select current_warehouse()", cursor_class=DictCursor
).fetchone()
# If user has an assigned default warehouse, prev_wh will contain a value even if the warehouse is suspended.
try:
prev_wh = wh_result["CURRENT_WAREHOUSE()"]
except:
prev_wh = None

# new_wh is not None, and should already be a valid identifier, no additional check is performed here.
is_different_wh = new_wh != prev_wh
try:
if is_different_wh:
self._log.debug("Using warehouse: %s", new_wh)
self.use(object_type=ObjectType.WAREHOUSE, name=new_wh)
yield
finally:
if prev_wh and is_different_wh:
self._log.debug("Switching back to warehouse: %s", prev_wh)
self.use(object_type=ObjectType.WAREHOUSE, name=prev_wh)

def create_password_secret(
self, name: str, username: str, password: str
) -> SnowflakeCursor:
Expand Down
54 changes: 42 additions & 12 deletions src/snowflake/cli/plugins/nativeapp/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import json
import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from functools import cached_property
from pathlib import Path
from textwrap import dedent
Expand Down Expand Up @@ -218,10 +219,40 @@ def stage_schema(self) -> Optional[str]:
def package_warehouse(self) -> Optional[str]:
return self.na_project.package_warehouse

@contextmanager
def use_package_warehouse(self):
if self.package_warehouse:
with self.use_warehouse(self.package_warehouse):
yield
else:
raise ClickException(
dedent(
f"""\
Application package warehouse cannot be empty.
Please provide a value for it in your connection information or your project definition file.
"""
)
)

@property
def application_warehouse(self) -> Optional[str]:
return self.na_project.application_warehouse

@contextmanager
def use_application_warehouse(self):
if self.application_warehouse:
with self.use_warehouse(self.application_warehouse):
yield
else:
raise ClickException(
dedent(
f"""\
Application warehouse cannot be empty.
Please provide a value for it in your connection information or your project definition file.
"""
)
)

@property
def project_identifier(self) -> str:
return self.na_project.project_identifier
Expand Down Expand Up @@ -484,7 +515,8 @@ def _application_object_to_str(self, obj: ApplicationOwnedObject) -> str:
def get_snowsight_url(self) -> str:
"""Returns the URL that can be used to visit this app via Snowsight."""
name = identifier_for_url(self.app_name)
return make_snowsight_url(self._conn, f"/#/apps/application/{name}")
with self.use_application_warehouse():
return make_snowsight_url(self._conn, f"/#/apps/application/{name}")

def create_app_package(self) -> None:
"""
Expand Down Expand Up @@ -557,17 +589,15 @@ def _apply_package_scripts(self) -> None:
raise InvalidPackageScriptError(relpath, e)

# once we're sure all the templates expanded correctly, execute all of them
try:
if self.package_warehouse:
self._execute_query(f"use warehouse {self.package_warehouse}")

for i, queries in enumerate(queued_queries):
cc.step(f"Applying package script: {self.package_scripts[i]}")
self._execute_queries(queries)
except ProgrammingError as err:
generic_sql_error_handler(
err, role=self.package_role, warehouse=self.package_warehouse
)
with self.use_package_warehouse():
try:
for i, queries in enumerate(queued_queries):
cc.step(f"Applying package script: {self.package_scripts[i]}")
self._execute_queries(queries)
except ProgrammingError as err:
generic_sql_error_handler(
err, role=self.package_role, warehouse=self.package_warehouse
)

def deploy(
self,
Expand Down
12 changes: 8 additions & 4 deletions src/snowflake/cli/plugins/nativeapp/project_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,20 @@ def stage_schema(self) -> Optional[str]:
@cached_property
def package_warehouse(self) -> Optional[str]:
if self.definition.package and self.definition.package.warehouse:
return self.definition.package.warehouse
return to_identifier(self.definition.package.warehouse)
else:
return cli_context.connection.warehouse
if cli_context.connection.warehouse:
return to_identifier(cli_context.connection.warehouse)
return None

@cached_property
def application_warehouse(self) -> Optional[str]:
if self.definition.application and self.definition.application.warehouse:
return self.definition.application.warehouse
return to_identifier(self.definition.application.warehouse)
else:
return cli_context.connection.warehouse
if cli_context.connection.warehouse:
return to_identifier(cli_context.connection.warehouse)
return None

@cached_property
def project_identifier(self) -> str:
Expand Down
156 changes: 76 additions & 80 deletions src/snowflake/cli/plugins/nativeapp/run_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,19 @@ def __init__(self, project_definition: NativeApp, project_root: Path):
def _execute_sql_script(self, sql_script_path):
"""
Executing the SQL script in the provided file path after expanding template variables.
"use warehouse" and "use database" will be executed first if they are set in definition file or in the current connection.
This assumes that a relevant warehouse is already active.
Consequently, "use database" will be executed first if it is set in definition file or in the current connection.
"""
with open(sql_script_path) as f:
sql_script = f.read()
try:
if self.application_warehouse:
self._execute_query(f"use warehouse {self.application_warehouse}")
sfc-gh-bgoel marked this conversation as resolved.
Show resolved Hide resolved
if self._conn.database:
self._execute_query(f"use database {self._conn.database}")
sql_script = snowflake_sql_jinja_render(content=sql_script)
self._execute_queries(sql_script)
except ProgrammingError as err:
generic_sql_error_handler(err)

try:
if self._conn.database:
self._execute_query(f"use database {self._conn.database}")
sql_script = snowflake_sql_jinja_render(content=sql_script)
self._execute_queries(sql_script)
except ProgrammingError as err:
generic_sql_error_handler(err)

def _execute_post_deploy_hooks(self):
post_deploy_script_hooks = self.app_post_deploy_hooks
Expand Down Expand Up @@ -267,89 +267,85 @@ def create_or_upgrade_app(
with self.use_role(self.app_role):

# 1. Need to use a warehouse to create an application object
try:
if self.application_warehouse:
self._execute_query(f"use warehouse {self.application_warehouse}")
except ProgrammingError as err:
generic_sql_error_handler(
err=err, role=self.app_role, warehouse=self.application_warehouse
)
with self.use_warehouse(self.application_warehouse):
sfc-gh-cgorrie marked this conversation as resolved.
Show resolved Hide resolved

# 2. Check for an existing application by the same name
show_app_row = self.get_existing_app_info()

# 2. Check for an existing application by the same name
show_app_row = self.get_existing_app_info()
# 3. If existing application is found, perform a few validations and upgrade the application object.
if show_app_row:

# 3. If existing application is found, perform a few validations and upgrade the application object.
if show_app_row:
install_method.ensure_app_usable(self._na_project, show_app_row)

install_method.ensure_app_usable(self._na_project, show_app_row)
# If all the above checks are in order, proceed to upgrade
try:
cc.step(
f"Upgrading existing application object {self.app_name}."
)
using_clause = install_method.using_clause(self._na_project)
self._execute_query(
f"alter application {self.app_name} upgrade {using_clause}"
)

if install_method.is_dev_mode:
# if debug_mode is present (controlled), ensure it is up-to-date
if self.debug_mode is not None:
self._execute_query(
f"alter application {self.app_name} set debug_mode = {self.debug_mode}"
)

# hooks always executed after a create or upgrade
self._execute_post_deploy_hooks()
return

except ProgrammingError as err:
if err.errno not in UPGRADE_RESTRICTION_CODES:
generic_sql_error_handler(err=err)
else: # The existing application object was created from a different process.
cc.warning(err.msg)
self.drop_application_before_upgrade(policy, is_interactive)

# 4. With no (more) existing application objects, create an application object using the release directives
cc.step(f"Creating new application object {self.app_name} in account.")

if self.app_role != self.package_role:
with self.use_role(self.package_role):
self._execute_query(
f"grant install, develop on application package {self.package_name} to role {self.app_role}"
)
self._execute_query(
f"grant usage on schema {self.package_name}.{self.stage_schema} to role {self.app_role}"
)
self._execute_query(
f"grant read on stage {self.stage_fqn} to role {self.app_role}"
)

# If all the above checks are in order, proceed to upgrade
try:
cc.step(f"Upgrading existing application object {self.app_name}.")
# by default, applications are created in debug mode when possible;
# this can be overridden in the project definition
debug_mode_clause = ""
if install_method.is_dev_mode:
initial_debug_mode = (
self.debug_mode if self.debug_mode is not None else True
)
debug_mode_clause = f"debug_mode = {initial_debug_mode}"

using_clause = install_method.using_clause(self._na_project)
self._execute_query(
f"alter application {self.app_name} upgrade {using_clause}"
dedent(
f"""\
create application {self.app_name}
from application package {self.package_name} {using_clause} {debug_mode_clause}
comment = {SPECIAL_COMMENT}
"""
)
)

if install_method.is_dev_mode:
# if debug_mode is present (controlled), ensure it is up-to-date
if self.debug_mode is not None:
self._execute_query(
f"alter application {self.app_name} set debug_mode = {self.debug_mode}"
)

# hooks always executed after a create or upgrade
self._execute_post_deploy_hooks()
return

except ProgrammingError as err:
if err.errno not in UPGRADE_RESTRICTION_CODES:
generic_sql_error_handler(err=err)
else: # The existing application object was created from a different process.
cc.warning(err.msg)
self.drop_application_before_upgrade(policy, is_interactive)

# 4. With no (more) existing application objects, create an application object using the release directives
cc.step(f"Creating new application object {self.app_name} in account.")

if self.app_role != self.package_role:
with self.use_role(self.package_role):
self._execute_query(
f"grant install, develop on application package {self.package_name} to role {self.app_role}"
)
self._execute_query(
f"grant usage on schema {self.package_name}.{self.stage_schema} to role {self.app_role}"
)
self._execute_query(
f"grant read on stage {self.stage_fqn} to role {self.app_role}"
)

try:
# by default, applications are created in debug mode when possible;
# this can be overridden in the project definition
debug_mode_clause = ""
if install_method.is_dev_mode:
initial_debug_mode = (
self.debug_mode if self.debug_mode is not None else True
)
debug_mode_clause = f"debug_mode = {initial_debug_mode}"

using_clause = install_method.using_clause(self._na_project)
self._execute_query(
dedent(
f"""\
create application {self.app_name}
from application package {self.package_name} {using_clause} {debug_mode_clause}
comment = {SPECIAL_COMMENT}
"""
)
)

# hooks always executed after a create or upgrade
self._execute_post_deploy_hooks()

except ProgrammingError as err:
generic_sql_error_handler(err)
generic_sql_error_handler(err)

def process(
self,
Expand Down
Loading
Loading