Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache schema for selected models #4860

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20220316-003847.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: Add `--selected-schema-cache` flag to cache schema object of selected models
only.
time: 2022-03-16T00:38:47.8468296-05:00
custom:
Author: karunpoudel
Issue: "4688"
PR: "4860"
10 changes: 6 additions & 4 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,12 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
# databases
return info_schema_name_map

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
def _relations_cache_for_schemas(self, manifest: Manifest, cache_schemas: Set[BaseRelation] = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are here, let's just remove this function and move all of the logic into set_relations_cache

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these methods are defined in the adapters module, these are potentially breaking changes to the adapter interface. E.g. the Postgres plugin reimplements this method (currently with two arguments), and would require updating:

def _relations_cache_for_schemas(self, manifest):
super()._relations_cache_for_schemas(manifest)
self._link_cached_relations(manifest)

"""Populate the relations cache for the given schemas. Returns an
iterable of the schemas populated, as strings.
"""
cache_schemas = self._get_cache_schemas(manifest)
if not cache_schemas:
cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = []
for cache_schema in cache_schemas:
Expand All @@ -367,14 +368,15 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

def set_relations_cache(self, manifest: Manifest, clear: bool = False) -> None:
def set_relations_cache(self, manifest: Manifest, clear: bool = False,
required_schemas: Set[BaseRelation] = None) -> None:
"""Run a query that gets a populated cache of the relations in the
database and set the cache on this adapter.
"""
with self.cache.lock:
if clear:
self.cache.clear()
self._relations_cache_for_schemas(manifest)
self._relations_cache_for_schemas(manifest, required_schemas)

@available
def cache_added(self, relation: Optional[BaseRelation]) -> str:
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
use_experimental_parser: Optional[bool] = None
static_parser: Optional[bool] = None
indirect_selection: Optional[str] = None
selected_schema_cache: Optional[bool] = None


@dataclass
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
LOG_CACHE_EVENTS = None
EVENT_BUFFER_SIZE = 100000
QUIET = None
SELECTED_SCHEMA_CACHE = None

# Global CLI defaults. These flags are set from three places:
# CLI args, environment variables, and user_config (profiles.yml).
Expand All @@ -57,6 +58,7 @@
"LOG_CACHE_EVENTS": False,
"EVENT_BUFFER_SIZE": 100000,
"QUIET": False,
"SELECTED_SCHEMA_CACHE": False,
}


Expand Down Expand Up @@ -106,7 +108,7 @@ def set_from_args(args, user_config):
global STRICT_MODE, FULL_REFRESH, WARN_ERROR, USE_EXPERIMENTAL_PARSER, STATIC_PARSER
global WRITE_JSON, PARTIAL_PARSE, USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT
global INDIRECT_SELECTION, VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET, SELECTED_SCHEMA_CACHE

STRICT_MODE = False # backwards compatibility
# cli args without user_config or env var option
Expand All @@ -132,6 +134,7 @@ def set_from_args(args, user_config):
LOG_CACHE_EVENTS = get_flag_value("LOG_CACHE_EVENTS", args, user_config)
EVENT_BUFFER_SIZE = get_flag_value("EVENT_BUFFER_SIZE", args, user_config)
QUIET = get_flag_value("QUIET", args, user_config)
SELECTED_SCHEMA_CACHE = get_flag_value('SELECTED_SCHEMA_CACHE', args, user_config)


def get_flag_value(flag, args, user_config):
Expand Down
21 changes: 21 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,27 @@ def parse_args(args, cls=DBTArgumentParser):
""",
)

schema_cache_flag = p.add_mutually_exclusive_group()
schema_cache_flag.add_argument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fairly new to all of the command arguments, would like to hear why you chose to add two arguments that would change the same dest vs the default behavior is not use this feature and a just one flag to enable the feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could set selected_schema_cache: true in profile.yml as general config for their project but may choose to override that with --no-selected-schema-cache in cli for specific scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right on! The inheritance order for "global" configs is CLI flag > env var (DBT_{config}) > profile config

I would vote for naming this config in a way that's slightly more generic (not every database calls it "schema," not every database caches at the schema level). Perhaps CACHE_SELECTED_ONLY?

'--selected-schema-cache',
action='store_const',
const=True,
default=None,
dest='selected_schema_cache',
help='''
Pre cache objects of schema relevant to selected resource only.
'''
)
schema_cache_flag.add_argument(
'--no-selected-schema-cache',
action='store_const',
const=False,
dest='selected_schema_cache',
help='''
Pre cache objects of all schema.
'''
)

subs = p.add_subparsers(title="Available sub-commands")

base_subparser = _build_base_subparser()
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,9 @@ def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.create_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter)
required_schemas = self.get_model_schemas(adapter, selected_uids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtcohen6 get_model_schemas would remove all of the node that is not in selected_uids, is there a case we need adapter cache to contain more nodes than selected_nodes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenyuLInx I'm not sure! That's the critical question animating in this issue/PR. I will say, having this PR makes it a lot easier to test.

  • Will dbt return incorrect results, if it fails to cache an unselected relation? I don't think so, as long as caching happens consistently at the schema level. (That's an open question for us right now on Spark: [CT-302] [Spike] Benchmark perf for show tables, show views dbt-spark#296)
  • How will this work for defer? When --defer is enabled, it's necessary to call adapter.get_relation for all unselected parents of selected models, to figure out whether to use a "dev" version or defer to the "prod" version. In my local testing, this still works! dbt will just record a cache miss, and run the query independently:
08:35:23.500589 [debug] [MainThread]: On "master": cache miss for schema "jerco.dbt_jcohen_schema_a", this is inefficient

It's likely that this will be less efficient. So, I think it's important that the behavior remains configurable. I'm not opposed to moving forward with this, in order to test the waters. We may someday wish to turn it on by default.

self.create_schemas(adapter, required_schemas)
self.populate_adapter_cache(adapter, required_schemas)
self.defer_to_manifest(adapter, selected_uids)
self.safe_run_hooks(adapter, RunHookType.Start, {})

Expand Down
10 changes: 6 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,11 @@ def _mark_dependent_errors(self, node_id, result, cause):
for dep_node_id in self.graph.get_dependent_nodes(node_id):
self._skipped_children[dep_node_id] = cause

def populate_adapter_cache(self, adapter):
adapter.set_relations_cache(self.manifest)
def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] = None):
if flags.SELECTED_SCHEMA_CACHE is True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like we can just remove this function and call adapter.set_relations_cache in places that call populate_adapter_cache

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

populate_adapter_cache here in GraphRunnableTask has been inherited by RunTask where it is mostly used. Moving this logic to before_run function would duplicate the same logic in before_run function in GraphRunnableTask and RunTask class.
If there is room to reorganize stuffs around, might make sense to do by someone more experienced with these code then me. This is the first time I am touching dbt-core :-)

adapter.set_relations_cache(self.manifest, required_schemas=required_schemas)
else:
adapter.set_relations_cache(self.manifest)

def before_hooks(self, adapter):
pass
Expand Down Expand Up @@ -489,8 +492,7 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe

return result

def create_schemas(self, adapter, selected_uids: Iterable[str]):
required_schemas = self.get_model_schemas(adapter, selected_uids)
def create_schemas(self, adapter, required_schemas: Set[BaseRelation]):
# we want the string form of the information schema database
required_databases: Set[BaseRelation] = set()
for required in required_schemas:
Expand Down