Skip to content

Commit

Permalink
Address PR reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Dec 18, 2024
1 parent 372c920 commit bbc6908
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions datafusion/catalog/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_common::{
error::{DataFusionError, Result},
HashMap, TableReference,
};
use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference};
use datafusion_execution::config::SessionConfig;

use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};

/// A schema provider that looks up tables in a cache
///
/// This is created by the [`AsyncSchemaProvider::resolve`] method
/// Instances are created by the [`AsyncSchemaProvider::resolve`] method
#[derive(Debug)]
struct ResolvedSchemaProvider {
owner_name: Option<String>,
Expand Down Expand Up @@ -58,14 +55,14 @@ impl SchemaProvider for ResolvedSchemaProvider {
name: String,
_table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(format!(
not_impl_err!(
"Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported"
)))
)
}

#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(format!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported")))
not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported")
}

fn table_exist(&self, name: &str) -> bool {
Expand All @@ -88,6 +85,15 @@ impl ResolvedSchemaProviderBuilder {
}
}

async fn resolve_table(&mut self, table_name: &str) -> Result<()> {
if !self.cached_tables.contains_key(table_name) {
let resolved_table = self.async_provider.table(table_name).await?;
self.cached_tables
.insert(table_name.to_string(), resolved_table);
}
Ok(())
}

fn finish(self) -> Arc<dyn SchemaProvider> {
let cached_tables = self
.cached_tables
Expand All @@ -103,7 +109,7 @@ impl ResolvedSchemaProviderBuilder {

/// A catalog provider that looks up schemas in a cache
///
/// This is created by the [`AsyncCatalogProvider::resolve`] method
/// Instances are created by the [`AsyncCatalogProvider::resolve`] method
#[derive(Debug)]
struct ResolvedCatalogProvider {
cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
Expand Down Expand Up @@ -148,7 +154,7 @@ impl ResolvedCatalogProviderBuilder {

/// A catalog provider list that looks up catalogs in a cache
///
/// This is created by the [`AsyncCatalogProviderList::resolve`] method
/// Instances are created by the [`AsyncCatalogProviderList::resolve`] method
#[derive(Debug)]
struct ResolvedCatalogProviderList {
cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>,
Expand Down Expand Up @@ -205,6 +211,9 @@ pub trait AsyncSchemaProvider: Send + Sync {
/// This method will walk through the references and look them up once, creating a cache of table
/// providers. This cache will be returned as a synchronous TableProvider that can be used to plan
/// and execute a query containing the given references.
///
/// This cache is intended to be short-lived for the execution of a single query. There is no mechanism
/// for refresh or eviction of stale entries.
async fn resolve(
&self,
references: &[TableReference],
Expand Down Expand Up @@ -271,6 +280,9 @@ pub trait AsyncCatalogProvider: Send + Sync {
/// providers (each with their own cache of table providers). This cache will be returned as a
/// synchronous CatalogProvider that can be used to plan and execute a query containing the given
/// references.
///
/// This cache is intended to be short-lived for the execution of a single query. There is no mechanism
/// for refresh or eviction of stale entries.
async fn resolve(
&self,
references: &[TableReference],
Expand Down Expand Up @@ -310,13 +322,7 @@ pub trait AsyncCatalogProvider: Send + Sync {
// If we can't find the catalog don't bother checking the table
let Some(schema) = schema else { continue };

if !schema.cached_tables.contains_key(reference.table()) {
let resolved_table =
schema.async_provider.table(reference.table()).await?;
schema
.cached_tables
.insert(reference.table().to_string(), resolved_table);
}
schema.resolve_table(reference.table()).await?;
}

let cached_schemas = cached_schemas
Expand Down Expand Up @@ -346,6 +352,9 @@ pub trait AsyncCatalogProviderList: Send + Sync {
/// providers, schema providers, and table providers. This cache will be returned as a
/// synchronous CatalogProvider that can be used to plan and execute a query containing the given
/// references.
///
/// This cache is intended to be short-lived for the execution of a single query. There is no mechanism
/// for refresh or eviction of stale entries.
async fn resolve(
&self,
references: &[TableReference],
Expand Down Expand Up @@ -405,13 +414,7 @@ pub trait AsyncCatalogProviderList: Send + Sync {
// If we can't find the catalog don't bother checking the table
let Some(schema) = schema else { continue };

if !schema.cached_tables.contains_key(reference.table()) {
let resolved_table =
schema.async_provider.table(reference.table()).await?;
schema
.cached_tables
.insert(reference.table().to_string(), resolved_table);
}
schema.resolve_table(reference.table()).await?;
}

// Build the cached catalog provider list
Expand Down

0 comments on commit bbc6908

Please sign in to comment.