Skip to content

Commit

Permalink
fix(cubesql): Support new QuickSight meta queries
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou authored Sep 17, 2024
1 parent df53c51 commit 148e4cf
Show file tree
Hide file tree
Showing 15 changed files with 676 additions and 19 deletions.
20 changes: 16 additions & 4 deletions rust/cubesql/cubesql/src/compile/engine/context_postgresql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::information_schema::postgres::{
InfoSchemaRoleColumnGrantsProvider as PostgresInfoSchemaRoleColumnGrantsProvider,
InfoSchemaRoleTableGrantsProvider as PostgresInfoSchemaRoleTableGrantsProvider,
InfoSchemaSqlImplementationInfoProvider as PostgresInfoSchemaSqlImplementationInfoProvider,
InfoSchemaSqlSizingProvider as PostgresInfoSchemaSqlSizingProvider,
InfoSchemaTestingBlockingProvider, InfoSchemaTestingDatasetProvider, PgCatalogAmProvider,
PgCatalogAttrdefProvider, PgCatalogAttributeProvider, PgCatalogClassProvider,
PgCatalogConstraintProvider, PgCatalogDatabaseProvider, PgCatalogDependProvider,
Expand All @@ -37,10 +38,10 @@ use crate::{
};

use super::information_schema::redshift::{
RedshiftLateBindingViewUnpackedTableProvider, RedshiftStlDdltextProvider,
RedshiftStlQueryProvider, RedshiftStlQuerytextProvider,
RedshiftSvvExternalSchemasTableProvider, RedshiftSvvTableInfoProvider,
RedshiftSvvTablesTableProvider,
RedshiftLateBindingViewUnpackedTableProvider, RedshiftPgExternalSchemaProvider,
RedshiftStlDdltextProvider, RedshiftStlQueryProvider, RedshiftStlQuerytextProvider,
RedshiftStvSlicesProvider, RedshiftSvvExternalSchemasTableProvider,
RedshiftSvvTableInfoProvider, RedshiftSvvTablesTableProvider,
};

impl DatabaseProtocol {
Expand Down Expand Up @@ -75,6 +76,8 @@ impl DatabaseProtocol {
any.downcast_ref::<PostgresInfoSchemaSqlImplementationInfoProvider>()
{
"information_schema.sql_implementation_info".to_string()
} else if let Some(_) = any.downcast_ref::<PostgresInfoSchemaSqlSizingProvider>() {
"information_schema.sql_sizing".to_string()
} else if let Some(_) = any.downcast_ref::<PgCatalogTableProvider>() {
"pg_catalog.pg_tables".to_string()
} else if let Some(_) = any.downcast_ref::<PgCatalogTypeProvider>() {
Expand Down Expand Up @@ -133,12 +136,16 @@ impl DatabaseProtocol {
"pg_catalog.pg_views".to_string()
} else if let Some(_) = any.downcast_ref::<PgCatalogStatUserTablesProvider>() {
"pg_catalog.pg_stat_user_tables".to_string()
} else if let Some(_) = any.downcast_ref::<RedshiftPgExternalSchemaProvider>() {
"pg_catalog.pg_external_schema".to_string()
} else if let Some(_) = any.downcast_ref::<RedshiftSvvTablesTableProvider>() {
"public.svv_tables".to_string()
} else if let Some(_) = any.downcast_ref::<RedshiftSvvExternalSchemasTableProvider>() {
"public.svv_external_schemas".to_string()
} else if let Some(_) = any.downcast_ref::<RedshiftSvvTableInfoProvider>() {
"public.svv_table_info".to_string()
} else if let Some(_) = any.downcast_ref::<RedshiftStvSlicesProvider>() {
"public.stv_slices".to_string()
} else if let Some(_) = any.downcast_ref::<RedshiftStlDdltextProvider>() {
"public.stl_ddltext".to_string()
} else if let Some(_) = any.downcast_ref::<RedshiftStlQueryProvider>() {
Expand Down Expand Up @@ -235,6 +242,7 @@ impl DatabaseProtocol {
&context.meta.tables,
)))
}
"stv_slices" => return Some(Arc::new(RedshiftStvSlicesProvider::new())),
"stl_ddltext" => return Some(Arc::new(RedshiftStlDdltextProvider::new())),
"stl_query" => return Some(Arc::new(RedshiftStlQueryProvider::new())),
"stl_querytext" => return Some(Arc::new(RedshiftStlQuerytextProvider::new())),
Expand Down Expand Up @@ -299,6 +307,7 @@ impl DatabaseProtocol {
PostgresInfoSchemaSqlImplementationInfoProvider::new(),
))
}
"sql_sizing" => return Some(Arc::new(PostgresInfoSchemaSqlSizingProvider::new())),
#[cfg(debug_assertions)]
"testing_dataset" => {
return Some(Arc::new(InfoSchemaTestingDatasetProvider::new(5, 1000)))
Expand Down Expand Up @@ -392,6 +401,9 @@ impl DatabaseProtocol {
&context.meta.tables,
)))
}
"pg_external_schema" => {
return Some(Arc::new(RedshiftPgExternalSchemaProvider::new()))
}
_ => return None,
},
_ => return None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod key_column_usage;
pub mod referential_constraints;
pub mod schemata;
pub mod sql_implementation_info;
pub mod sql_sizing;
pub mod table_constraints;
pub mod tables;
pub mod views;
Expand Down Expand Up @@ -79,5 +80,6 @@ pub use pg_views::*;
pub use role_column_grants::*;
pub use role_table_grants::*;
pub use sql_implementation_info::*;
pub use sql_sizing::*;
pub use testing_blocking::*;
pub use testing_dataset::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use std::{any::Any, sync::Arc};

use async_trait::async_trait;
use datafusion::{
arrow::{
array::{Array, ArrayRef, StringBuilder, UInt32Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
},
datasource::{datasource::TableProviderFilterPushDown, TableProvider, TableType},
error::DataFusionError,
logical_plan::Expr,
physical_plan::{memory::MemoryExec, ExecutionPlan},
};

struct InfoSchemaSqlSizingBuilder {
sizing_id: UInt32Builder,
sizing_name: StringBuilder,
supported_value: UInt32Builder,
comments: StringBuilder,
}

impl InfoSchemaSqlSizingBuilder {
fn new(capacity: usize) -> Self {
Self {
sizing_id: UInt32Builder::new(capacity),
sizing_name: StringBuilder::new(capacity),
supported_value: UInt32Builder::new(capacity),
comments: StringBuilder::new(capacity),
}
}

fn add_info(
&mut self,
sizing_id: u32,
sizing_name: impl AsRef<str>,
supported_value: Option<u32>,
comments: Option<&str>,
) {
self.sizing_id.append_value(sizing_id).unwrap();
self.sizing_name.append_value(sizing_name).unwrap();
self.supported_value.append_option(supported_value).unwrap();
self.comments.append_option(comments).unwrap();
}

fn finish(mut self) -> Vec<Arc<dyn Array>> {
let columns: Vec<Arc<dyn Array>> = vec![
Arc::new(self.sizing_id.finish()),
Arc::new(self.sizing_name.finish()),
Arc::new(self.supported_value.finish()),
Arc::new(self.comments.finish()),
];

columns
}
}

pub struct InfoSchemaSqlSizingProvider {
data: Arc<Vec<ArrayRef>>,
}

impl InfoSchemaSqlSizingProvider {
pub fn new() -> Self {
let mut builder = InfoSchemaSqlSizingBuilder::new(11);

builder.add_info(97, "MAXIMUM COLUMNS IN GROUP BY", Some(0), None);
builder.add_info(99, "MAXIMUM COLUMNS IN ORDER BY", Some(0), None);
builder.add_info(100, "MAXIMUM COLUMNS IN SELECT", Some(1664), None);
builder.add_info(101, "MAXIMUM COLUMNS IN TABLE", Some(1600), None);
builder.add_info(
34,
"MAXIMUM CATALOG NAME LENGTH",
Some(63),
Some("Might be less, depending on character set."),
);
builder.add_info(
30,
"MAXIMUM COLUMN NAME LENGTH",
Some(63),
Some("Might be less, depending on character set."),
);
builder.add_info(
31,
"MAXIMUM CURSOR NAME LENGTH",
Some(63),
Some("Might be less, depending on character set."),
);
builder.add_info(
10005,
"MAXIMUM IDENTIFIER LENGTH",
Some(63),
Some("Might be less, depending on character set."),
);
builder.add_info(
32,
"MAXIMUM SCHEMA NAME LENGTH",
Some(63),
Some("Might be less, depending on character set."),
);
builder.add_info(
35,
"MAXIMUM TABLE NAME LENGTH",
Some(63),
Some("Might be less, depending on character set."),
);
builder.add_info(
107,
"MAXIMUM USER NAME LENGTH",
Some(63),
Some("Might be less, depending on character set."),
);

Self {
data: Arc::new(builder.finish()),
}
}
}

#[async_trait]
impl TableProvider for InfoSchemaSqlSizingProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_type(&self) -> TableType {
TableType::View
}

fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("sizing_id", DataType::UInt32, false),
Field::new("sizing_name", DataType::Utf8, false),
Field::new("supported_value", DataType::UInt32, true),
Field::new("comments", DataType::Utf8, true),
]))
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let batch = RecordBatch::try_new(self.schema(), self.data.to_vec())?;

Ok(Arc::new(MemoryExec::try_new(
&[vec![batch]],
self.schema(),
projection.clone(),
)?))
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown, DataFusionError> {
Ok(TableProviderFilterPushDown::Unsupported)
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
pub mod late_binding_view_unpacked;
pub mod pg_external_schema;
pub mod stl_ddltext;
pub mod stl_query;
pub mod stl_querytext;
pub mod stv_slices;
pub mod svv_external_schemas;
pub mod svv_table_info;
pub mod svv_tables;

pub use late_binding_view_unpacked::*;
pub use pg_external_schema::*;
pub use stl_ddltext::*;
pub use stl_query::*;
pub use stl_querytext::*;
pub use stv_slices::*;
pub use svv_external_schemas::*;
pub use svv_table_info::*;
pub use svv_tables::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{any::Any, sync::Arc};

use async_trait::async_trait;
use datafusion::{
arrow::{
array::{Array, ArrayRef, Int32Builder, StringBuilder, UInt32Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
},
datasource::{datasource::TableProviderFilterPushDown, TableProvider, TableType},
error::DataFusionError,
logical_plan::Expr,
physical_plan::{memory::MemoryExec, ExecutionPlan},
};

struct RedshiftPgExternalSchemaBuilder {
esoid: UInt32Builder,
eskind: Int32Builder,
esdbname: StringBuilder,
esoptions: StringBuilder,
}

impl RedshiftPgExternalSchemaBuilder {
fn new(capacity: usize) -> Self {
Self {
esoid: UInt32Builder::new(capacity),
eskind: Int32Builder::new(capacity),
esdbname: StringBuilder::new(capacity),
esoptions: StringBuilder::new(capacity),
}
}

fn finish(mut self) -> Vec<Arc<dyn Array>> {
let columns: Vec<Arc<dyn Array>> = vec![
Arc::new(self.esoid.finish()),
Arc::new(self.eskind.finish()),
Arc::new(self.esdbname.finish()),
Arc::new(self.esoptions.finish()),
];

columns
}
}

pub struct RedshiftPgExternalSchemaProvider {
data: Arc<Vec<ArrayRef>>,
}

impl RedshiftPgExternalSchemaProvider {
pub fn new() -> Self {
let builder = RedshiftPgExternalSchemaBuilder::new(0);

Self {
data: Arc::new(builder.finish()),
}
}
}

#[async_trait]
impl TableProvider for RedshiftPgExternalSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_type(&self) -> TableType {
TableType::View
}

fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("esoid", DataType::UInt32, false),
Field::new("eskind", DataType::Int32, false),
Field::new("esdbname", DataType::Utf8, false),
Field::new("esoptions", DataType::Utf8, false),
]))
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let batch = RecordBatch::try_new(self.schema(), self.data.to_vec())?;

Ok(Arc::new(MemoryExec::try_new(
&[vec![batch]],
self.schema(),
projection.clone(),
)?))
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown, DataFusionError> {
Ok(TableProviderFilterPushDown::Unsupported)
}
}
Loading

0 comments on commit 148e4cf

Please sign in to comment.