-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update setup processor according to new result schema #1448
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ | |
|
||
from __future__ import annotations | ||
|
||
import copy | ||
import re | ||
from typing import Dict, Optional | ||
|
||
from snowflake.cli._plugins.nativeapp.bundle_context import BundleContext | ||
|
@@ -34,7 +36,7 @@ | |
) | ||
|
||
SNOWPARK_PROCESSOR = "snowpark" | ||
NA_SETUP_PROCESSOR = "native-app-setup" | ||
NA_SETUP_PROCESSOR = "native app setup" | ||
|
||
_REGISTERED_PROCESSORS_BY_NAME = { | ||
SNOWPARK_PROCESSOR: SnowparkAnnotationProcessor, | ||
|
@@ -110,7 +112,15 @@ def _try_create_processor( | |
# No registered processor with the specified name | ||
return None | ||
|
||
current_processor = processor_factory(self._bundle_ctx) | ||
processor_ctx = copy.copy(self._bundle_ctx) | ||
processor_subdirectory = re.sub(r"[^a-zA-Z0-9_$]", "_", processor_name) | ||
Comment on lines
+115
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the logic so that each processor uses its own subdirectory for its artifacts, enforced by the framework. Right now each processor was responsible for doing this, so I just centralized the same logic to facilitate the implementation of new processors. |
||
processor_ctx.bundle_root = ( | ||
self._bundle_ctx.bundle_root / processor_subdirectory | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if each processor gets its own directory, how do we chain processors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I only made that change for the bundle root (which is the processor's sandbox) and the generated directory, which is similarly processor-specific. The deploy root remains unchanged, as processors should always be reading and writing from/to the deploy root so they can be chained. |
||
) | ||
processor_ctx.generated_root = ( | ||
self._bundle_ctx.generated_root / processor_subdirectory | ||
) | ||
current_processor = processor_factory(processor_ctx) | ||
self.cached_processors[processor_name] = current_processor | ||
|
||
return current_processor | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,18 @@ | |
from __future__ import annotations | ||
|
||
import json | ||
import logging | ||
import os.path | ||
from pathlib import Path | ||
from typing import List, Optional | ||
|
||
import yaml | ||
from click import ClickException | ||
from snowflake.cli._plugins.nativeapp.artifacts import BundleMap, find_setup_script_file | ||
from snowflake.cli._plugins.nativeapp.artifacts import ( | ||
BundleMap, | ||
find_manifest_file, | ||
find_setup_script_file, | ||
) | ||
from snowflake.cli._plugins.nativeapp.codegen.artifact_processor import ( | ||
ArtifactProcessor, | ||
is_python_file_artifact, | ||
|
@@ -40,6 +46,32 @@ | |
DEFAULT_TIMEOUT = 30 | ||
DRIVER_PATH = Path(__file__).parent / "setup_driver.py.source" | ||
|
||
log = logging.getLogger(__name__) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wish we had named this |
||
|
||
|
||
def safe_set(d: dict, *keys: str, **kwargs) -> None: | ||
sfc-gh-cgorrie marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Sets a value in a nested dictionary structure, creating intermediate dictionaries as needed. | ||
Sample usage: | ||
|
||
d = {} | ||
safe_set(d, "a", "b", "c", value=42) | ||
|
||
d is now: | ||
{ | ||
"a": { | ||
"b": { | ||
"c": 42 | ||
} | ||
} | ||
} | ||
""" | ||
curr = d | ||
for k in keys[:-1]: | ||
curr = curr.setdefault(k, {}) | ||
|
||
curr[keys[-1]] = kwargs.get("value") | ||
|
||
|
||
class NativeAppSetupProcessor(ArtifactProcessor): | ||
def __init__(self, *args, **kwargs): | ||
|
@@ -73,18 +105,55 @@ def process( | |
) | ||
files_to_process.append(src_file) | ||
|
||
sql_files_mapping = self._execute_in_sandbox(files_to_process) | ||
self._generate_setup_sql(sql_files_mapping) | ||
result = self._execute_in_sandbox(files_to_process) | ||
if not result: | ||
return # nothing to do | ||
|
||
logs = result.get("logs", []) | ||
for msg in logs: | ||
log.debug(msg) | ||
|
||
warnings = result.get("warnings", []) | ||
for msg in warnings: | ||
cc.warning(msg) | ||
|
||
schema_version = result.get("schema_version") | ||
if schema_version != "1": | ||
raise ClickException( | ||
f"Unsupported schema version returned from snowflake-app-python library: {schema_version}" | ||
) | ||
|
||
setup_script_mods = [ | ||
mod | ||
for mod in result.get("modifications", []) | ||
if mod.get("target") == "native_app:setup_script" | ||
] | ||
if setup_script_mods: | ||
self._edit_setup_sql(setup_script_mods) | ||
|
||
manifest_mods = [ | ||
mod | ||
for mod in result.get("modifications", []) | ||
if mod.get("target") == "native_app:manifest" | ||
] | ||
if manifest_mods: | ||
self._edit_manifest(manifest_mods) | ||
|
||
def _execute_in_sandbox(self, py_files: List[Path]) -> dict: | ||
file_count = len(py_files) | ||
cc.step(f"Processing {file_count} setup file{'s' if file_count > 1 else ''}") | ||
|
||
manifest_path = find_manifest_file(deploy_root=self._bundle_ctx.deploy_root) | ||
|
||
generated_root = self._bundle_ctx.generated_root | ||
generated_root.mkdir(exist_ok=True, parents=True) | ||
|
||
env_vars = { | ||
"_SNOWFLAKE_CLI_PROJECT_PATH": str(self._bundle_ctx.project_root), | ||
"_SNOWFLAKE_CLI_SETUP_FILES": os.pathsep.join(map(str, py_files)), | ||
"_SNOWFLAKE_CLI_APP_NAME": str(self._bundle_ctx.package_name), | ||
"_SNOWFLAKE_CLI_SQL_DEST_DIR": str(self.generated_root), | ||
"_SNOWFLAKE_CLI_SQL_DEST_DIR": str(generated_root), | ||
"_SNOWFLAKE_CLI_MANIFEST_PATH": str(manifest_path), | ||
} | ||
|
||
try: | ||
|
@@ -102,56 +171,68 @@ def _execute_in_sandbox(self, py_files: List[Path]) -> dict: | |
) | ||
|
||
if result.returncode == 0: | ||
sql_file_mappings = json.loads(result.stdout) | ||
return sql_file_mappings | ||
return json.loads(result.stdout) | ||
else: | ||
raise ClickException( | ||
f"Failed to execute python setup script logic: {result.stderr}" | ||
) | ||
|
||
def _generate_setup_sql(self, sql_file_mappings: dict) -> None: | ||
if not sql_file_mappings: | ||
# Nothing to generate | ||
return | ||
|
||
generated_root = self.generated_root | ||
generated_root.mkdir(exist_ok=True, parents=True) | ||
|
||
def _edit_setup_sql(self, modifications: List[dict]) -> None: | ||
cc.step("Patching setup script") | ||
setup_file_path = find_setup_script_file( | ||
deploy_root=self._bundle_ctx.deploy_root | ||
) | ||
with self.edit_file(setup_file_path) as f: | ||
new_contents = [f.contents] | ||
|
||
if sql_file_mappings["schemas"]: | ||
schemas_file = generated_root / sql_file_mappings["schemas"] | ||
new_contents.insert( | ||
0, | ||
f"EXECUTE IMMEDIATE FROM '/{to_stage_path(schemas_file.relative_to(self._bundle_ctx.deploy_root))}';", | ||
) | ||
|
||
if sql_file_mappings["compute_pools"]: | ||
compute_pools_file = generated_root / sql_file_mappings["compute_pools"] | ||
new_contents.append( | ||
f"EXECUTE IMMEDIATE FROM '/{to_stage_path(compute_pools_file.relative_to(self._bundle_ctx.deploy_root))}';" | ||
) | ||
|
||
if sql_file_mappings["services"]: | ||
services_file = generated_root / sql_file_mappings["services"] | ||
new_contents.append( | ||
f"EXECUTE IMMEDIATE FROM '/{to_stage_path(services_file.relative_to(self._bundle_ctx.deploy_root))}';" | ||
) | ||
|
||
f.edited_contents = "\n".join(new_contents) | ||
with self.edit_file(setup_file_path) as f: | ||
prepended = [] | ||
appended = [] | ||
|
||
for mod in modifications: | ||
for inst in mod.get("instructions", []): | ||
if inst.get("type") == "insert": | ||
default_loc = inst.get("default_location") | ||
if default_loc == "end": | ||
appended.append(self._setup_mod_instruction_to_sql(inst)) | ||
elif default_loc == "start": | ||
prepended.append(self._setup_mod_instruction_to_sql(inst)) | ||
|
||
if prepended or appended: | ||
f.edited_contents = "\n".join(prepended + [f.contents] + appended) | ||
|
||
def _edit_manifest(self, modifications: List[dict]) -> None: | ||
cc.step("Patching manifest") | ||
manifest_path = find_manifest_file(deploy_root=self._bundle_ctx.deploy_root) | ||
|
||
with self.edit_file(manifest_path) as f: | ||
manifest = yaml.safe_load(f.contents) | ||
|
||
for mod in modifications: | ||
for inst in mod.get("instructions", []): | ||
if inst.get("type") == "set": | ||
payload = inst.get("payload") | ||
if payload: | ||
key = payload.get("key") | ||
value = payload.get("value") | ||
Comment on lines
+210
to
+215
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: too much nesting here, would be nice to factor this out if possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, I'll keep this in mind when I rework this logic in the near future. |
||
safe_set(manifest, *key.split("."), value=value) | ||
f.edited_contents = yaml.safe_dump(manifest, sort_keys=False) | ||
|
||
def _setup_mod_instruction_to_sql(self, mod_inst: dict) -> str: | ||
payload = mod_inst.get("payload") | ||
if not payload: | ||
raise ClickException("Unsupported instruction received: no payload found") | ||
|
||
payload_type = payload.get("type") | ||
if payload_type == "execute immediate": | ||
file_path = payload.get("file_path") | ||
if file_path: | ||
sql_file_path = self._bundle_ctx.generated_root / file_path | ||
return f"EXECUTE IMMEDIATE FROM '/{to_stage_path(sql_file_path.relative_to(self._bundle_ctx.deploy_root))}';" | ||
|
||
raise ClickException(f"Unsupported instruction type received: {payload_type}") | ||
|
||
@property | ||
def sandbox_root(self): | ||
return self._bundle_ctx.bundle_root / "setup_py_venv" | ||
|
||
@property | ||
def generated_root(self): | ||
return self._bundle_ctx.generated_root / "setup_py" | ||
return self._bundle_ctx.bundle_root / "venv" | ||
|
||
def _create_or_update_sandbox(self): | ||
sandbox_root = self.sandbox_root | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow I keep forgetting that natural strings like this are more natural in YAML.