Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add AsyncCatalogProvider helpers for asynchronous catalogs #13800

Merged
merged 6 commits into from
Jan 9, 2025

Conversation

westonpace
Copy link
Member

Which issue does this PR close?

Closes #10339 .

Rationale for this change

As discussed in #13582 we do not actually want to make the schema providers asynchronous (the downstream changes are significant). Instead a cache-then-plan approach was outlined in #13714. This PR adds helpers which make it easier for users to follow the cache-then-plan approach.

This is hopefully just a first step. Eventually I would like to integrate these into SessionContext itself so that we can have methods like register_async_catalog_list and SessionContext will keep track of a list of asynchronous providers and take care of calling the resolve method for the user. The entire process can then be entirely hidden from the user.

What changes are included in this PR?

Adds helpers, which are exposed in datafusion_catalog but not yet integrated into SessionContext. Users can use them following the example outlined in #13714.

Are these changes tested?

Yes.

Are there any user-facing changes?

New APIs only. No breaking changes or modifications to existing APIs.

@findepi
Copy link
Member

findepi commented Dec 17, 2024

Instead a cache-then-plan approach was outlined in #13714.

What's cache-then-plan approach? (The linked page doesn't include "cache").
How did we solve cold cache problem?


/// A schema provider that looks up tables in a cache
///
/// This is created by the [`AsyncSchemaProvider::resolve`] method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that mean the code auto generated ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I have changed the comment to Instances are created by.... Is this more clear?

Err(DataFusionError::Execution(format!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported")))
}

fn table_exist(&self, name: &str) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn table_exist(&self, name: &str) -> bool {
fn table_exists(&self, name: &str) -> bool {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method name is defined by the SchemaProvider trait. Renaming it would be a breaking change and I don't think it is justified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can consider a rename in a future separate PR , but I agree we shouldn't mess with the SchemaProvider trait in this PR

let Some(schema) = schema else { continue };

if !schema.cached_tables.contains_key(reference.table()) {
let resolved_table =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part can be factored out into separate helper method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace
Copy link
Member Author

What's cache-then-plan approach? (The linked page doesn't include "cache").
How did we solve cold cache problem?

@findepi

Perhaps I should avoid using the word cache. This is not a long lived multi-query cache. This is a single query cache meant to be thrown away after the query has completed. It is a very short-lived cache that is designed to avoid repeated lookups during multiple planning passes. Every query is still a "cold" query. It would be possible to create another longer-lived caching layer on top of this but I am not trying to solve that problem at the moment.

I'm thinking if we use the cached tables should we have a tests for that? I mean that cached tables should reflect the most recent catalog state, if the table added/modified/dropped it should be reflected in the caches

@comphead

There is no concern for cache eviction / staleness here because this cache should not be kept longer than a single query. There is some possibility for a catalog change to happen in between reference lookups (resolve) and query execution. However, this will always be possible when using a remote catalog. The query execution should return an error from the remote endpoint saying "no database/schema/table found" or "query does not match schema". I'm not sure we can avoid this without some kind of synchronization mechanism with a remote catalog and I don't think there has been much work in that regard (but I admittedly haven't examined the APIs in great depth).

@findepi
Copy link
Member

findepi commented Dec 18, 2024

Perhaps I should avoid using the word cache. This is not a long lived multi-query cache. This is a single query cache meant to be thrown away after the query has completed

@westonpace
thanks for explaining. I think the use of cache is justified in this context and easier to understand than eg 'working set'. I agree this is important to have a notion of query-level information for two reasons. Performance is the obvious one: we should not repeatedly compute info we already knew. Second is correctness (consistency). If a query eg self-joins an Iceberg table T, the table may need to be read twice, but the reads should come from the same snapshot of T.

So we agree on the need for this.
The question is who's responsible for providing this consistency. Is this a catalog or table provider (eg it should self-wrap in ResolvedCatalogProvider), or is it the engine itself (then the question is how exactly this is impl'd)

Ok(self.cached_tables.get(name).cloned())
}

#[allow(unused_variables)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid #[allow attributes. (and if one is really needed, add a code comment why)

Copy link
Member Author

@westonpace westonpace Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace
Copy link
Member Author

The question is who's responsible for providing this consistency. Is this a catalog or table provider (eg it should self-wrap in ResolvedCatalogProvider), or is it the engine itself (then the question is how exactly this is impl'd)

I'm not sure I understand what you mean by it should self-wrap in ResolvedCatalogProvider.

I would personally expect a planner to cache lookups in the same way I expect a compiler to optimize away repeated calls to a constant method. Though I understand this is not how the synchronous planner works today.

This is an optimization that benefits all engines and should work equally for all so it seems useful for the resolve method to provide it. Is there some advantage of having every engine reimplement this pattern? Is there some functionality, customization or capability we are taking away from engines by doing this here?

@findepi
Copy link
Member

findepi commented Dec 19, 2024

I would personally expect a planner to cache lookups in the same way I expect a compiler to optimize away repeated calls to a constant method.

Agreed

Is there some functionality, customization or capability we are taking away from engines by doing this here?

IIUC, this PR adds new code only, so it's not taking away any capability, and it's also not adding any new behavior.
it adds building blocks for a desired behavior to be implemented later

Is there some advantage of having every engine reimplement this pattern?

When i said "engine" i meant datafusion core. I would want the core to do what you described as "expect a planner to cache lookups"
You mean "engines" in plural. How this new code is going to be used? In datafusion and/or elsewhere?

@alamb alamb changed the title feat: add helpers for users with asynchornous catalogs feat: add AsyncCatalogProvider wrappers to permit asynchronous catalogs Dec 19, 2024
@alamb
Copy link
Contributor

alamb commented Dec 19, 2024

What's cache-then-plan approach? (The linked page doesn't include "cache").
How did we solve cold cache problem?

@findepi

Perhaps I should avoid using the word cache. This is not a long lived multi-query cache. This is a single query cache meant to be thrown away after the query has completed.

My understanding is that this PR adds a kind of "basic implementation of a remote catalog" that will almost certainly not be used for all systems (due the varying needs of caching as @findepi mentions mong others )

So perhaps we can update the documentation on the traits to make it clear that they provide a basic implementation for implementing a remote catalog that must be accessd asynchronously and that for more complex usecases such as more sophisticated caching, users can build their own implementation using the same CatalogProvider APIs?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @westonpace -- I think this PR looks great. 🏆 🏅

Also thank you @comphead and @findepi for the reviews

I spent a few minutes trying to adapt #13722 to use this API and while I did not finish, it was going quite well

All I think this PR needs is

  1. Update the remote_catalog.rs example to use these new helpers
  2. Add a link in the docs of AsyncCatalog, etc to the remote_catalog.rs example

///
/// If a table reference's catalog name does not match this name then the reference will be ignored
/// when calculating the cached set of tables (this allows other providers to supply the table)
fn catalog_name(&self) -> &str;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When trying this API out I didn't fully understand this API (or what i should return) -- maybe if it is optional / has an advanced usecase we could provide a default implementation

Suggested change
fn catalog_name(&self) -> &str;
fn catalog_name(&self) -> Option<&str> { return None }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was a bit torn on this anyways. The problem arises when the user wants to use an AsyncSchemaProvider or AsyncCatalogProvider as the top level catalog. In these cases it isn't clear what we should do with full / partial table references.

For example, if the user adds an AsyncSchemaProvider and then tries to resolve the query "SELECT * FROM weston.public.my_table" what should we do?

  • We could just assume that all table references are intended for us. This works as long as this schema provider is the only provider registered. If there are multiple providers registered then we need to know which to use for a given table reference somehow.
  • We could assume we don't match the table reference and we will only match bare references.
  • Or we can require schema providers to supply their own name and the catalog name so that we can filter references that apply (this is what I do)

The main problem with the current approach is that users whose top level is an AsyncCatalogProviderList have to implement these methods even though they are meaningless (we will do the filtering in the higher level resolve function).

We should probably do whatever the synchronous planner does in this case but I just didn't know. If I register a schema provider with a SessionContext and then call sql with a full (not bare) table reference then does it apply the provider I registered or not?

datafusion/catalog/src/async.rs Show resolved Hide resolved
@alamb alamb changed the title feat: add AsyncCatalogProvider wrappers to permit asynchronous catalogs feat: add AsyncCatalogProvider helpers for asynchronous catalogs Dec 19, 2024
@westonpace
Copy link
Member Author

When i said "engine" i meant datafusion core. I would want the core to do what you described as "expect a planner to cache lookups"

@findepi Ok, I understand what you mean by engine now. I agree that we could maybe move this kind of caching into the planner itself. If we did so I think we could keep the traits and just let the resolve method deprecate and eventually go away.

You mean "engines" in plural. How this new code is going to be used? In datafusion and/or elsewhere?

I'm currently using a copy of these traits / structs in some LanceDB stuff. Our enterprise / cloud product has a simple catalog. We are not generally stressed too much about queries-per-second so my main goal has been adapting our (asynchronous) catalog into datafusion.

There's some movement to add a (more polished) catalog to our OSS stuff as well. When it comes to SQL queries LanceDB is pretty much a frontend for datafusion and so whatever we use as a catalog we will need to integrate.

@alamb
Copy link
Contributor

alamb commented Dec 30, 2024

Just checking in on this PR -- as I understand it the remaining item is to update the async catalog example to use the new structures

@westonpace do you plan to do so?

@westonpace
Copy link
Member Author

I do, but it won't happen before Thursday, apologies (finishing up winter break).

@alamb
Copy link
Contributor

alamb commented Dec 31, 2024

I do, but it won't happen before Thursday, apologies (finishing up winter break).

Great! Thanks -- no worries. I was just trying to make sure the PRs didn't get stuck

@westonpace westonpace force-pushed the feat/async-catalog-helpers branch from e090deb to 29e8976 Compare January 3, 2025 01:30
@westonpace
Copy link
Member Author

westonpace commented Jan 3, 2025

@alamb I've rebased and updated the example. I think the only remaining issue is your comment here:

When trying this API out I didn't fully understand this API (or what i should return) -- maybe if it is optional / has an advanced usecase we could provide a default implementation

The basic problem I think is this. We are evaluating the table references directly ourselves and so we have to figure out which table references make sense for the schema provider. In the sync case this is not necessary because you do this:

ctx.catalog("my_catalog").unwrap().register_schema("my_schema", Arc::clone(&remote_schema))?;

As a result, by the time the query planner is even using your table provider, it already has done the lookup into my_catalog and my_schema. We, however, can't rely on that, because we're working with the table providers themselves. Or, to put it another way, I am replacing this part of your example:

    let table_names = references.iter().filter_map(|r| {
        if refers_to_schema("datafusion", "remote_schema", r) {
            Some(r.table())
        } else {
            None
        }
    });

I need to filter down the references to figure out which ones apply to the schema provider before I send the request out to the remote catalog (to allow the possibility that other requests are handled elsewhere).

The reason I don't exactly love my current solution is that I think these methods can eventually go away. If we add register_async_catalog / register_async_schema methods to the SessionContext and move the caching inside there then we can rely on the same lookup mechanism that exists there.

Still, I don't think these methods are onerous for the implementer and would rather just make progress. NVM, figured it out (below)

@westonpace
Copy link
Member Author

Ok, I just needed a good night's sleep. I converted the schema_name / catalog_name methods into parameters on the resolve function. Now each of the new traits has only a single method.

This is ready for another review.

@westonpace westonpace requested a review from alamb January 3, 2025 13:57
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @westonpace (and @comphead and @findepi ) -- I think this looks really nice and will make implementing remote catalogs much easier ❤️

I really like how it has abstracted away the "resolve references" dance that was previously required.

Let's leave this PR open for another day or so to get feedback from anyone else who may wish to comment

// Now continue planing the query after having fetched the remote table and
// it can run as normal
let plan = state.statement_to_plan(statement).await?;
// This resolved catalog only makes sense for this query and so we create a clone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice API 👍

Err(DataFusionError::Execution(format!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported")))
}

fn table_exist(&self, name: &str) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can consider a rename in a future separate PR , but I agree we shouldn't mess with the SchemaProvider trait in this PR

///
/// After implementing this trait you can call the [`AsyncSchemaProvider::resolve`] method to get an
/// `Arc<dyn SchemaProvider>` that contains a cached copy of the referenced tables. The `resolve`
/// method can be slow and asynchronous as it is only called once, before planning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add a reference to the example, like this (can be done as a follow on PR as well)

Suggested change
/// method can be slow and asynchronous as it is only called once, before planning.
/// method can be slow and asynchronous as it is only called once, before planning.
///
/// See the [remote_catalog.rs] for a full end to end example
///
/// [remote_catalog.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/remote_catalog.rs

/// and execute a query containing the given references.
///
/// This cache is intended to be short-lived for the execution of a single query. There is no mechanism
/// for refresh or eviction of stale entries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add a reference to the example, like this (can be done as a follow on PR as well)

Suggested change
/// for refresh or eviction of stale entries.
/// for refresh or eviction of stale entries.
////
/// See the [remote_catalog.rs] for a full end to end example
///
/// [remote_catalog.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/remote_catalog.rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
Copy link
Contributor

alamb commented Jan 9, 2025

🚀

@alamb
Copy link
Contributor

alamb commented Jan 9, 2025

Thanks again @westonpace , @comphead and @findepi

@alamb alamb merged commit f379719 into apache:main Jan 9, 2025
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make all SchemaProvider trait APIs async
4 participants