Skip to content

Commit

Permalink
Merge pull request #94 from scale-vector/rfix/adds-dlt-config-default…
Browse files Browse the repository at this point in the history
…-literals

adds dlt config default literals
  • Loading branch information
rudolfix authored Nov 24, 2022
2 parents d77e363 + 3f6a72e commit 8931b10
Show file tree
Hide file tree
Showing 35 changed files with 584 additions and 354 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ build-library:
poetry build

publish-library: build-library
poetry publish -u __token__
# provide the token via poetry config pypi-token.pypi your-api-token
poetry publish

build-image-tags:
@echo ${IMG}
Expand Down
1 change: 1 addition & 0 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def init_command_wrapper(pipeline_name: str, destination_name: str, branch: str)
init_command(pipeline_name, destination_name, branch)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
# TODO: display stack trace if with debug flag


def main() -> None:
Expand Down
7 changes: 3 additions & 4 deletions dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import tomlkit
from tomlkit.items import Table as TOMLTable

from dlt.common.configuration.resolve import extract_inner_hint, is_base_configuration_hint
from dlt.common.configuration.specs.base_configuration import BaseConfiguration
from dlt.common.configuration.resolve import extract_inner_hint
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, is_base_configuration_hint
from dlt.common.typing import AnyType, is_final_type, is_optional_type


Expand Down Expand Up @@ -41,9 +41,8 @@ def write_spec(toml_table: TOMLTable, config: BaseConfiguration) -> None:


def write_values(toml: tomlkit.TOMLDocument, values: Iterable[WritableConfigValue]) -> None:
# print(values)
toml_table: TOMLTable = toml # type: ignore
for value in values:
toml_table: TOMLTable = toml # type: ignore
for namespace in value.namespaces:
if namespace not in toml_table:
inner_table = tomlkit.table(True)
Expand Down
97 changes: 37 additions & 60 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@

from dlt.common.git import clone_repo
from dlt.common.configuration.providers.toml import ConfigTomlProvider, SecretsTomlProvider
from dlt.common.configuration.resolve import is_secret_hint
from dlt.common.configuration.specs.base_configuration import is_secret_hint
from dlt.common.configuration.accessors import DLT_SECRETS_VALUE, DLT_CONFIG_VALUE
from dlt.common.exceptions import DltException
from dlt.common.logger import DLT_PKG_NAME
from dlt.common.normalizers.names.snake_case import normalize_schema_name
from dlt.common.destination import DestinationReference
from dlt.common.reflection.utils import set_ast_parents
from dlt.common.schema.exceptions import InvalidSchemaName
from dlt.common.storages.file_storage import FileStorage
from dlt.common.typing import AnyType, is_optional_type
Expand All @@ -30,14 +32,13 @@
from dlt.cli.config_toml_writer import WritableConfigValue, write_values



REQUIREMENTS_TXT = "requirements.txt"
PYPROJECT_TOML = "pyproject.toml"


def _clone_init_repo(branch: str) -> Tuple[FileStorage, List[str], str]:
# return tuple is (file storage for cloned repo, list of template files to copy, the default pipeline template script)
# template_dir = "/tmp/tmptz2omtdf" # tempfile.mkdtemp()
# template_dir = "~/src/python-dlt-init-template"
template_dir = tempfile.mkdtemp()
clone_repo("https://github.com/scale-vector/python-dlt-init-template.git", template_dir, branch=branch)

Expand All @@ -58,7 +59,8 @@ def _clone_init_repo(branch: str) -> Tuple[FileStorage, List[str], str]:
def _parse_init_script(script_source: str, init_script_name: str) -> PipelineScriptVisitor:
# parse the script first
tree = ast.parse(source=script_source)
visitor = PipelineScriptVisitor(script_source, add_parents=True)
set_ast_parents(tree)
visitor = PipelineScriptVisitor(script_source)
visitor.visit(tree)
if len(visitor.mod_aliases) == 0:
raise CliCommandException("init", f"The pipeline script {init_script_name} does not import dlt or has bizarre import structure")
Expand Down Expand Up @@ -94,57 +96,34 @@ def _find_argument_nodes_to_replace(visitor: PipelineScriptVisitor, replace_node
raise CliCommandException("init", f"The pipeline script {init_script_name} is not explicitly passing the '{t_arg_name}' argument to 'pipeline' or 'run' function. In init script the default and configured values are not accepted.")
return transformed_nodes


def _detect_required_configs(visitor: PipelineScriptVisitor, script_module: ModuleType, init_script_name: str) -> Tuple[Dict[str, WritableConfigValue], Dict[str, WritableConfigValue]]:
# all detected secrets with namespaces
required_secrets: Dict[str, WritableConfigValue] = {}
# all detected configs with namespaces
required_config: Dict[str, WritableConfigValue] = {}

# skip sources without spec. those are not imported and most probably are inner functions
known_imported_calls = {name: calls for name, calls in visitor.known_source_calls.items() if name in _SOURCES}
# skip sources without spec. those are not imported and most probably are inner functions. also skip the sources that are not called
# also skip the sources that are called from functions, the parent of call object to the source must be None (no outer function)
known_imported_sources = {name: _SOURCES[name] for name in visitor.known_sources
if name in _SOURCES and name in visitor.known_source_calls and any(call.parent is None for call in visitor.known_source_calls[name])} # type: ignore

for pipeline_name, call_nodes in known_imported_calls.items():
source_config = _SOURCES.get(pipeline_name).SPEC()
for source_name, source_info in known_imported_sources.items():
source_config = source_info.SPEC()
spec_fields = source_config.get_resolvable_fields()
source_sig = inspect.signature(getattr(script_module, pipeline_name))
# bind all calls
for call_node in call_nodes:
try:
bound_args = source_sig.bind(*call_node.args, **{str(kwd.arg):kwd.value for kwd in call_node.keywords})
bound_args.apply_defaults()
except TypeError as ty_ex:
call_info = visitor.source_segment(call_node)
raise CliCommandException("init", f"In {init_script_name} the source/resource {pipeline_name} call {call_info} looks wrong: {ty_ex}")
# find all the arguments that are not sufficiently bound
for arg_name, arg_node in bound_args.arguments.items():
# check if argument is in spec and is not optional. optional arguments won't be added to config/secrets
arg_type = spec_fields.get(arg_name)
if arg_type and not is_optional_type(arg_type):
value_provided = True
from_placeholder = False
from_secrets = is_secret_hint(arg_type)
if isinstance(arg_node, ast.Constant):
value_provided = ast.literal_eval(arg_node) is not None
if isinstance(arg_node, ast.Attribute) and arg_node.attr == "value":
attr_source = visitor.source_segment(arg_node)
if attr_source.endswith("config.value"):
value_provided = False
from_placeholder = True
if from_secrets:
raise CliCommandException("init", f"The pipeline script {init_script_name} calls source/resource {pipeline_name} where argument {arg_name} is a secret but it requests it via {attr_source}")
if attr_source.endswith("secrets.value"):
value_provided = False
from_placeholder = True
from_secrets = True
# was value provided in the call args?
if not value_provided:
# do we have sufficient information if arg_name is config or secret?
if arg_type is AnyType and not from_placeholder:
raise CliCommandException("init", f"The pipeline script {init_script_name} in source/resource '{pipeline_name}' does not provide enough information if argument '{arg_name}' is a secret or a config value. Use 'dlt.config.value' or 'dlt.secret.value' or (strongly suggested) type the source/resource function signature.")
val_store = required_secrets if from_secrets else required_config
# use full namespaces if we have many sources
namespaces = () if len(known_imported_calls) == 1 else ("sources", pipeline_name)
val_store[pipeline_name + ":" + arg_name] = WritableConfigValue(arg_name, arg_type, namespaces)
for field_name, field_type in spec_fields.items():
val_store = None
# all secrets must go to secrets.toml
if is_secret_hint(field_type):
val_store = required_secrets
# all configs that are required and do not have a default value must go to config.toml
elif not is_optional_type(field_type) and getattr(source_config, field_name) is None:
val_store = required_config

if val_store is not None:
# we are sure that all resources come from single file so we can put them in single namespace
# namespaces = () if len(known_imported_sources) == 1 else ("sources", source_name)
val_store[source_name + ":" + field_name] = WritableConfigValue(field_name, field_type, ())

return required_secrets, required_config

Expand Down Expand Up @@ -229,7 +208,7 @@ def init_command(pipeline_name: str, destination_name: str, branch: str) -> None
# find all arguments in all calls to replace
transformed_nodes = _find_argument_nodes_to_replace(
visitor,
[("destination", destination_name), ("pipeline_name", pipeline_name)],
[("destination", destination_name), ("pipeline_name", pipeline_name), ("dataset_name", pipeline_name + "_data")],
init_script_name
)

Expand All @@ -238,8 +217,10 @@ def init_command(pipeline_name: str, destination_name: str, branch: str) -> None

if len(_SOURCES) == 0:
raise CliCommandException("init", f"The pipeline script {init_script_name} is not creating or importing any sources or resources")

for source_q_name, source_config in _SOURCES.items():
if source_q_name not in visitor.known_sources:
print(visitor.known_sources)
raise CliCommandException("init", f"The pipeline script {init_script_name} imports a source/resource {source_config.f.__name__} from module {source_config.module.__name__}. In init scripts you must declare all sources and resources in single file.")

# detect all the required secrets and configs that should go into tomls files
Expand All @@ -251,12 +232,6 @@ def init_command(pipeline_name: str, destination_name: str, branch: str) -> None
# modify the script
dest_script_source = _rewrite_script(visitor.source, transformed_nodes)

# generate tomls with comments
secrets_prov = SecretsTomlProvider()
write_values(secrets_prov._toml, required_secrets.values())
config_prov = ConfigTomlProvider()
write_values(config_prov._toml, required_config.values())

# welcome message
click.echo()
click.echo("Your new pipeline %s is ready to be customized!" % fmt.bold(pipeline_name))
Expand All @@ -281,20 +256,22 @@ def init_command(pipeline_name: str, destination_name: str, branch: str) -> None
if dest_storage.has_file(REQUIREMENTS_TXT):
click.echo("Your python dependencies are kept in %s. Please add the dependency for %s as follows:" % (fmt.bold(REQUIREMENTS_TXT), fmt.bold(DLT_PKG_NAME)))
click.echo(req_dep_line)
click.echo("To install dlt with the %s extra using pip:" % fmt.bold(destination_name))
else:
if click.confirm("%s not found. Should I create one?" % REQUIREMENTS_TXT):
requirements_txt = req_dep_line
click.echo("* %s created. Install it with:\npip3 install -r %s" % (fmt.bold(REQUIREMENTS_TXT), REQUIREMENTS_TXT))
else:
click.echo("Do not forget to install dlt with the %s extra using:")
click.echo(f"pip3 install {DLT_PKG_NAME}[{destination_name}]")
requirements_txt = req_dep_line
click.echo("* %s created. Install it with:\npip3 install -r %s" % (fmt.bold(REQUIREMENTS_TXT), REQUIREMENTS_TXT))

# copy files at the very end
for file_name in TEMPLATE_FILES + toml_files:
shutil.copy(clone_storage.make_full_path(file_name), dest_storage.make_full_path(file_name))

# create script
dest_storage.save(dest_pipeline_script, dest_script_source)
# generate tomls with comments
secrets_prov = SecretsTomlProvider()
write_values(secrets_prov._toml, required_secrets.values())
config_prov = ConfigTomlProvider()
write_values(config_prov._toml, required_config.values())
# write toml files
secrets_prov._write_toml()
config_prov._write_toml()
Expand Down
7 changes: 4 additions & 3 deletions dlt/common/configuration/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.resolve import deserialize_value
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.configuration.specs.base_configuration import is_base_configuration_hint
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.schema.utils import coerce_value
from dlt.common.typing import AnyType, ConfigValue


DLT_SECRETS_VALUE = "secrets.value"
DLT_CONFIG_VALUE = "config.value"
TConfigAny = TypeVar("TConfigAny", bound=Any)

class _Accessor(abc.ABC):
Expand All @@ -33,7 +34,7 @@ def get(self, field: str, expected_type: Type[TConfigAny] = None) -> TConfigAny:
return None
# cast to required type
if expected_type:
if inspect.isclass(expected_type) and issubclass(expected_type, BaseConfiguration):
if is_base_configuration_hint(expected_type):
c = expected_type()
if isinstance(value, dict):
c.update(value)
Expand Down
61 changes: 6 additions & 55 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
import re
import inspect
from makefun import wraps
from types import ModuleType
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload
from inspect import Signature, Parameter

from dlt.common.typing import AnyType, DictStrAny, StrAny, TFun, AnyFun
from dlt.common.typing import DictStrAny, StrAny, TFun, AnyFun
from dlt.common.configuration.resolve import resolve_configuration, inject_namespace
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, is_valid_hint, configspec
from dlt.common.configuration.specs.base_configuration import BaseConfiguration
from dlt.common.configuration.specs.config_namespace_context import ConfigNamespacesContext
from dlt.common.utils import get_callable_name
from dlt.common.reflection.spec import spec_from_signature


# [^.^_]+ splits by . or _
_SLEEPING_CAT_SPLIT = re.compile("[^.^_]+")
_LAST_DLT_CONFIG = "_dlt_config"
_ORIGINAL_ARGS = "_dlt_orig_args"
TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration)
# keep a registry of all the decorated functions
_FUNC_SPECS: Dict[int, Type[BaseConfiguration]] = {}



def get_fun_spec(f: AnyFun) -> Type[BaseConfiguration]:
return _FUNC_SPECS.get(id(f))

Expand Down Expand Up @@ -50,7 +48,7 @@ def decorator(f: TFun) -> TFun:
namespace_context = ConfigNamespacesContext()

if spec is None:
SPEC = _spec_from_signature(_get_spec_name_from_f(f), inspect.getmodule(f), sig, only_kw)
SPEC = spec_from_signature(f, sig, only_kw)
else:
SPEC = spec

Expand Down Expand Up @@ -129,50 +127,3 @@ def last_config(**kwargs: Any) -> BaseConfiguration:

def get_orig_args(**kwargs: Any) -> Tuple[Tuple[Any], DictStrAny]:
return kwargs[_ORIGINAL_ARGS] # type: ignore


def _get_spec_name_from_f(f: AnyFun) -> str:
func_name = get_callable_name(f, "__qualname__").replace("<locals>.", "") # func qual name contains position in the module, separated by dots

def _first_up(s: str) -> str:
return s[0].upper() + s[1:]

return "".join(map(_first_up, _SLEEPING_CAT_SPLIT.findall(func_name))) + "Configuration"


def _spec_from_signature(name: str, module: ModuleType, sig: Signature, kw_only: bool = False) -> Type[BaseConfiguration]:
# synthesize configuration from the signature
fields: Dict[str, Any] = {}
annotations: Dict[str, Any] = {}

for p in sig.parameters.values():
# skip *args and **kwargs, skip typical method params and if kw_only flag is set: accept KEYWORD ONLY args
if p.kind not in (Parameter.VAR_KEYWORD, Parameter.VAR_POSITIONAL) and p.name not in ["self", "cls"] and \
(kw_only and p.kind == Parameter.KEYWORD_ONLY or not kw_only):
field_type = AnyType if p.annotation == Parameter.empty else p.annotation
if is_valid_hint(field_type):
field_default = None if p.default == Parameter.empty else p.default
# try to get type from default
if field_type is AnyType and field_default:
field_type = type(field_default)
# make type optional if explicit None is provided as default
if p.default is None:
field_type = Optional[field_type]
# set annotations
annotations[p.name] = field_type
# set field with default value
fields[p.name] = field_default

# new type goes to the module where sig was declared
fields["__module__"] = module.__name__
# set annotations so they are present in __dict__
fields["__annotations__"] = annotations
# synthesize type
T: Type[BaseConfiguration] = type(name, (BaseConfiguration,), fields)
# add to the module
setattr(module, name, T)
SPEC = configspec(init=False)(T)
# print(f"SYNTHESIZED {SPEC} in {inspect.getmodule(SPEC)} for sig {sig}")
# import dataclasses
# print("\n".join(map(str, dataclasses.fields(SPEC))))
return SPEC
15 changes: 1 addition & 14 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import ast
import inspect
from collections.abc import Mapping as C_Mapping
from typing import Any, Dict, ContextManager, List, Optional, Sequence, Tuple, Type, TypeVar, get_origin

Expand All @@ -8,7 +7,7 @@
from dlt.common.typing import AnyType, StrAny, TSecretValue, is_final_type, is_optional_type, extract_inner_type
from dlt.common.schema.utils import coerce_value, py_type_to_sc_type

from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, ContainerInjectableContext, get_config_if_union
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, is_secret_hint, get_config_if_union, is_base_configuration_hint, is_context_hint
from dlt.common.configuration.specs.config_namespace_context import ConfigNamespacesContext
from dlt.common.configuration.container import Container
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
Expand Down Expand Up @@ -68,18 +67,6 @@ def serialize_value(value: Any) -> Any:
return coerce_value("text", value_dt, value)


def is_secret_hint(hint: Type[Any]) -> bool:
return hint is TSecretValue or (inspect.isclass(hint) and issubclass(hint, CredentialsConfiguration))


def is_base_configuration_hint(hint: Type[Any]) -> bool:
return inspect.isclass(hint) and issubclass(hint, BaseConfiguration)


def is_context_hint(hint: Type[Any]) -> bool:
return inspect.isclass(hint) and issubclass(hint, ContainerInjectableContext)


def extract_inner_hint(hint: Type[Any]) -> Type[Any]:
# extract hint from Optional / Literal / NewType hints
inner_hint = extract_inner_type(hint)
Expand Down
Loading

0 comments on commit 8931b10

Please sign in to comment.