Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache schema for selected models #4860

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20220316-003847.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: Add `--cache_selected_only` flag to cache schema object of selected models
only.
time: 2022-03-16T00:38:47.8468296-05:00
custom:
Author: karunpoudel
Issue: "4688"
PR: "4860"
13 changes: 9 additions & 4 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,14 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
# databases
return info_schema_name_map

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Set[BaseRelation] = None
) -> None:
"""Populate the relations cache for the given schemas. Returns an
iterable of the schemas populated, as strings.
"""
cache_schemas = self._get_cache_schemas(manifest)
if not cache_schemas:
cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = []
for cache_schema in cache_schemas:
Expand All @@ -367,14 +370,16 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

def set_relations_cache(self, manifest: Manifest, clear: bool = False) -> None:
def set_relations_cache(
self, manifest: Manifest, clear: bool = False, required_schemas: Set[BaseRelation] = None
) -> None:
"""Run a query that gets a populated cache of the relations in the
database and set the cache on this adapter.
"""
with self.cache.lock:
if clear:
self.cache.clear()
self._relations_cache_for_schemas(manifest)
self._relations_cache_for_schemas(manifest, required_schemas)

@available
def cache_added(self, relation: Optional[BaseRelation]) -> str:
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
use_experimental_parser: Optional[bool] = None
static_parser: Optional[bool] = None
indirect_selection: Optional[str] = None
cache_selected_only: Optional[bool] = None


@dataclass
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
LOG_CACHE_EVENTS = None
EVENT_BUFFER_SIZE = 100000
QUIET = None
CACHE_SELECTED_ONLY = None

# Global CLI defaults. These flags are set from three places:
# CLI args, environment variables, and user_config (profiles.yml).
Expand All @@ -57,6 +58,7 @@
"LOG_CACHE_EVENTS": False,
"EVENT_BUFFER_SIZE": 100000,
"QUIET": False,
"CACHE_SELECTED_ONLY": False,
}


Expand Down Expand Up @@ -106,7 +108,7 @@ def set_from_args(args, user_config):
global STRICT_MODE, FULL_REFRESH, WARN_ERROR, USE_EXPERIMENTAL_PARSER, STATIC_PARSER
global WRITE_JSON, PARTIAL_PARSE, USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT
global INDIRECT_SELECTION, VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET, CACHE_SELECTED_ONLY

STRICT_MODE = False # backwards compatibility
# cli args without user_config or env var option
Expand All @@ -132,6 +134,7 @@ def set_from_args(args, user_config):
LOG_CACHE_EVENTS = get_flag_value("LOG_CACHE_EVENTS", args, user_config)
EVENT_BUFFER_SIZE = get_flag_value("EVENT_BUFFER_SIZE", args, user_config)
QUIET = get_flag_value("QUIET", args, user_config)
CACHE_SELECTED_ONLY = get_flag_value("CACHE_SELECTED_ONLY", args, user_config)


def get_flag_value(flag, args, user_config):
Expand Down
21 changes: 21 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,27 @@ def parse_args(args, cls=DBTArgumentParser):
""",
)

schema_cache_flag = p.add_mutually_exclusive_group()
schema_cache_flag.add_argument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fairly new to all of the command arguments, would like to hear why you chose to add two arguments that would change the same dest vs the default behavior is not use this feature and a just one flag to enable the feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could set selected_schema_cache: true in profile.yml as general config for their project but may choose to override that with --no-selected-schema-cache in cli for specific scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right on! The inheritance order for "global" configs is CLI flag > env var (DBT_{config}) > profile config

I would vote for naming this config in a way that's slightly more generic (not every database calls it "schema," not every database caches at the schema level). Perhaps CACHE_SELECTED_ONLY?

"--cache_selected_only",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I just caught: for consistency with other flags, this should probably be --cache-selected-only (hyphens not underscores)

action="store_const",
const=True,
default=None,
dest="cache_selected_only",
help="""
Pre cache database objects relevant to selected resource only.
""",
)
schema_cache_flag.add_argument(
"--no-cache_selected_only",
action="store_const",
const=False,
dest="cache_selected_only",
help="""
Pre cache all database objects related to the project.
""",
)

subs = p.add_subparsers(title="Available sub-commands")

base_subparser = _build_base_subparser()
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,9 @@ def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.create_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter)
required_schemas = self.get_model_schemas(adapter, selected_uids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtcohen6 get_model_schemas would remove all of the node that is not in selected_uids, is there a case we need adapter cache to contain more nodes than selected_nodes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenyuLInx I'm not sure! That's the critical question animating in this issue/PR. I will say, having this PR makes it a lot easier to test.

  • Will dbt return incorrect results, if it fails to cache an unselected relation? I don't think so, as long as caching happens consistently at the schema level. (That's an open question for us right now on Spark: [CT-302] [Spike] Benchmark perf for show tables, show views dbt-spark#296)
  • How will this work for defer? When --defer is enabled, it's necessary to call adapter.get_relation for all unselected parents of selected models, to figure out whether to use a "dev" version or defer to the "prod" version. In my local testing, this still works! dbt will just record a cache miss, and run the query independently:
08:35:23.500589 [debug] [MainThread]: On "master": cache miss for schema "jerco.dbt_jcohen_schema_a", this is inefficient

It's likely that this will be less efficient. So, I think it's important that the behavior remains configurable. I'm not opposed to moving forward with this, in order to test the waters. We may someday wish to turn it on by default.

self.create_schemas(adapter, required_schemas)
self.populate_adapter_cache(adapter, required_schemas)
self.defer_to_manifest(adapter, selected_uids)
self.safe_run_hooks(adapter, RunHookType.Start, {})

Expand Down
10 changes: 6 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,12 @@ def _mark_dependent_errors(self, node_id, result, cause):
for dep_node_id in self.graph.get_dependent_nodes(node_id):
self._skipped_children[dep_node_id] = cause

def populate_adapter_cache(self, adapter):
def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] = None):
start_populate_cache = time.perf_counter()
adapter.set_relations_cache(self.manifest)
if flags.CACHE_SELECTED_ONLY is True:
adapter.set_relations_cache(self.manifest, required_schemas=required_schemas)
else:
adapter.set_relations_cache(self.manifest)
cache_populate_time = time.perf_counter() - start_populate_cache
if dbt.tracking.active_user is not None:
dbt.tracking.track_runnable_timing(
Expand Down Expand Up @@ -501,8 +504,7 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe

return result

def create_schemas(self, adapter, selected_uids: Iterable[str]):
required_schemas = self.get_model_schemas(adapter, selected_uids)
def create_schemas(self, adapter, required_schemas: Set[BaseRelation]):
# we want the string form of the information schema database
required_databases: Set[BaseRelation] = set()
for required in required_schemas:
Expand Down
4 changes: 2 additions & 2 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ def _link_cached_relations(self, manifest):

self._link_cached_database_relations(schemas)

def _relations_cache_for_schemas(self, manifest):
super()._relations_cache_for_schemas(manifest)
def _relations_cache_for_schemas(self, manifest, cache_schemas=None):
super()._relations_cache_for_schemas(manifest, cache_schemas)
self._link_cached_relations(manifest)

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
Expand Down
15 changes: 15 additions & 0 deletions test/unit/test_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,18 @@ def test__flags(self):
self.assertEqual(flags.QUIET, True)
# cleanup
self.user_config.quiet = None

# cache_selected_only
self.user_config.cache_selected_only = True
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, True)
os.environ['DBT_CACHE_SELECTED_ONLY'] = 'false'
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, False)
setattr(self.args, 'cache_selected_only', True)
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, True)
# cleanup
os.environ.pop('DBT_CACHE_SELECTED_ONLY')
delattr(self.args, 'cache_selected_only')
self.user_config.cache_selected_only = False