Skip to content

Commit

Permalink
Port code to be in datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 14, 2023
1 parent 7ca1df0 commit c83cbdd
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 111 deletions.
2 changes: 0 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ bigdecimal = "0.4.1"
criterion = { version = "0.5", features = ["async_tokio"] }
csv = "1.1.6"
ctor = "0.2.0"
datafusion-sqllogictest = { path = "../sqllogictest", version = "29.0.0", features = ["postgres"] }
doc-comment = "0.3"
env_logger = "0.10"
half = "2.2.1"
Expand All @@ -111,7 +110,6 @@ postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] }
regex = "1.5.4"
rstest = "0.18.0"
rust_decimal = { version = "1.27.0", features = ["tokio-pg"] }
sqllogictest = "0.15.0"
test-utils = { path = "../../test-utils" }
thiserror = "1.0.37"
tokio-postgres = "0.7.7"
Expand Down
11 changes: 8 additions & 3 deletions datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,25 @@ rust_decimal = {version = "1.27.0"}
log = "^0.4"
sqllogictest = "0.15.0"
sqlparser.workspace = true
tempfile = "3"
thiserror = "1.0.44"
tokio = {version = "1.0"}
bytes = {version = "1.4.0", optional = true}
futures = {version = "0.3.28", optional = true}
futures = {version = "0.3.28"}
chrono = {version = "0.4.26", optional = true}
tokio-postgres = {version = "0.7.7", optional = true}
postgres-types = {version = "0.2.4", optional = true}
postgres-protocol = {version = "0.6.4", optional = true}

[features]
postgres = ["bytes", "futures", "chrono", "tokio-postgres", "postgres-types", "postgres-protocol"]
postgres = ["bytes", "chrono", "tokio-postgres", "postgres-types", "postgres-protocol"]

[dev-dependencies]
env_logger = "0.10"
num_cpus = "1.13.0"


[[test]]
harness = false
name = "sqllogictests"
path = "tests/sqllogictests/src/main.rs"
path = "bin/sqllogictests.rs"
110 changes: 12 additions & 98 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ use std::path::{Path, PathBuf};
#[cfg(target_family = "windows")]
use std::thread;

use datafusion_sqllogictest::{DataFusion, Postgres};
use datafusion_sqllogictest::{DataFusion, TestContext};
use futures::stream::StreamExt;
use log::info;
use sqllogictest::strict_column_validator;
use tempfile::TempDir;

use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{DataFusionError, Result};

mod setup;

const TEST_DIRECTORY: &str = "tests/sqllogictests/test_files/";
const TEST_DIRECTORY: &str = "test_files/";
const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";

#[cfg(target_family = "windows")]
Expand Down Expand Up @@ -121,7 +117,7 @@ async fn run_test_file(test_file: TestFile) -> Result<()> {
relative_path,
} = test_file;
info!("Running with DataFusion runner: {}", path.display());
let Some(test_ctx) = context_for_test_file(&relative_path).await else {
let Some(test_ctx) = TestContext::try_new_for_test_file(&relative_path).await else {
info!("Skipping: {}", path.display());
return Ok(());
};
Expand All @@ -138,7 +134,9 @@ async fn run_test_file(test_file: TestFile) -> Result<()> {
.map_err(|e| DataFusionError::External(Box::new(e)))
}

#[cfg(feature = "postgres")]
async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
use datafusion_sqllogictest::Postgres;
let TestFile {
path,
relative_path,
Expand All @@ -154,6 +152,12 @@ async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
Ok(())
}

#[cfg(not(feature = "postgres"))]
async fn run_test_file_with_postgres(_test_file: TestFile) -> Result<()> {
use datafusion_common::plan_err;
plan_err!("Can not run with postgres as postgres feature is not enabled")
}

async fn run_complete_file(test_file: TestFile) -> Result<()> {
let TestFile {
path,
Expand All @@ -163,7 +167,7 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> {

info!("Using complete mode to complete: {}", path.display());

let Some(test_ctx) = context_for_test_file(&relative_path).await else {
let Some(test_ctx) = TestContext::try_new_for_test_file(&relative_path).await else {
info!("Skipping: {}", path.display());
return Ok(());
};
Expand Down Expand Up @@ -250,96 +254,6 @@ fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Box<dyn Iterator<Item = PathBu
)
}

/// Create a SessionContext, configured for the specific test, if
/// possible.
///
/// If `None` is returned (e.g. because some needed feature is not
/// enabled), the file should be skipped
async fn context_for_test_file(relative_path: &Path) -> Option<TestContext> {
let config = SessionConfig::new()
// hardcode target partitions so plans are deterministic
.with_target_partitions(4);

let test_ctx = TestContext::new(SessionContext::with_config(config));

let file_name = relative_path.file_name().unwrap().to_str().unwrap();
match file_name {
"scalar.slt" => {
info!("Registering scalar tables");
setup::register_scalar_tables(test_ctx.session_ctx()).await;
}
"information_schema_table_types.slt" => {
info!("Registering local temporary table");
setup::register_temp_table(test_ctx.session_ctx()).await;
}
"information_schema_columns.slt" => {
info!("Registering table with many types");
setup::register_table_with_many_types(test_ctx.session_ctx()).await;
}
"avro.slt" => {
#[cfg(feature = "avro")]
{
let mut test_ctx = test_ctx;
info!("Registering avro tables");
setup::register_avro_tables(&mut test_ctx).await;
return Some(test_ctx);
}
#[cfg(not(feature = "avro"))]
{
info!("Skipping {file_name} because avro feature is not enabled");
return None;
}
}
"joins.slt" => {
info!("Registering partition table tables");

let mut test_ctx = test_ctx;
setup::register_partition_table(&mut test_ctx).await;
return Some(test_ctx);
}
_ => {
info!("Using default SessionContext");
}
};
Some(test_ctx)
}

/// Context for running tests
pub struct TestContext {
/// Context for running queries
ctx: SessionContext,
/// Temporary directory created and cleared at the end of the test
test_dir: Option<TempDir>,
}

impl TestContext {
pub fn new(ctx: SessionContext) -> Self {
Self {
ctx,
test_dir: None,
}
}

/// Enables the test directory feature. If not enabled,
/// calling `testdir_path` will result in a panic.
pub fn enable_testdir(&mut self) {
if self.test_dir.is_none() {
self.test_dir = Some(TempDir::new().expect("failed to create testdir"));
}
}

/// Returns the path to the test directory. Panics if the test
/// directory feature is not enabled via `enable_testdir`.
pub fn testdir_path(&self) -> &Path {
self.test_dir.as_ref().expect("testdir not enabled").path()
}

/// Returns a reference to the internal SessionContext
fn session_ctx(&self) -> &SessionContext {
&self.ctx
}
}

/// Parsed command line options
struct Options {
// regex like
Expand Down
4 changes: 0 additions & 4 deletions datafusion/sqllogictest/src/engines/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ pub(crate) fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String {
)
}

pub(crate) fn decimal_to_str(value: Decimal) -> String {
big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
}

pub(crate) fn big_decimal_to_str(value: BigDecimal) -> String {
value.round(12).normalized().to_string()
}
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
// specific language governing permissions and limitations
// under the License.

///! DataFusion sqllogictest driver
mod engines;

pub use engines::DataFusion;

#[cfg(feature = "postgres")]
pub use engines::Postgres;

mod test_context;
pub use test_context::TestContext;
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,121 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::TimeUnit;
use async_trait::async_trait;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionConfig;
use datafusion::{
arrow::{
array::{
BinaryArray, Float64Array, Int32Array, LargeBinaryArray, LargeStringArray,
StringArray, TimestampNanosecondArray,
},
datatypes::{DataType, Field, Schema, SchemaRef},
datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
record_batch::RecordBatch,
},
catalog::{schema::MemorySchemaProvider, CatalogProvider, MemoryCatalogProvider},
datasource::{MemTable, TableProvider, TableType},
prelude::{CsvReadOptions, SessionContext},
};
use datafusion_common::DataFusionError;
use datafusion_expr::Expr;
use log::info;
use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;

use crate::TestContext;
/// Context for running tests
pub struct TestContext {
/// Context for running queries
ctx: SessionContext,
/// Temporary directory created and cleared at the end of the test
test_dir: Option<TempDir>,
}

impl TestContext {
pub fn new(ctx: SessionContext) -> Self {
Self {
ctx,
test_dir: None,
}
}

/// Create a SessionContext, configured for the specific test, if
/// possible.
///
/// If `None` is returned (e.g. because some needed feature is not
/// enabled), the file should be skipped
pub async fn try_new_for_test_file(relative_path: &Path) -> Option<Self> {
let config = SessionConfig::new()
// hardcode target partitions so plans are deterministic
.with_target_partitions(4);

let test_ctx = TestContext::new(SessionContext::with_config(config));

let file_name = relative_path.file_name().unwrap().to_str().unwrap();
match file_name {
"scalar.slt" => {
info!("Registering scalar tables");
register_scalar_tables(test_ctx.session_ctx()).await;
}
"information_schema_table_types.slt" => {
info!("Registering local temporary table");
register_temp_table(test_ctx.session_ctx()).await;
}
"information_schema_columns.slt" => {
info!("Registering table with many types");
register_table_with_many_types(test_ctx.session_ctx()).await;
}
"avro.slt" => {
#[cfg(feature = "avro")]
{
let mut test_ctx = test_ctx;
info!("Registering avro tables");
register_avro_tables(&mut test_ctx).await;
return Some(test_ctx);
}
#[cfg(not(feature = "avro"))]
{
info!("Skipping {file_name} because avro feature is not enabled");
return None;
}
}
"joins.slt" => {
info!("Registering partition table tables");

let mut test_ctx = test_ctx;
register_partition_table(&mut test_ctx).await;
return Some(test_ctx);
}
_ => {
info!("Using default SessionContext");
}
};
Some(test_ctx)
}

/// Enables the test directory feature. If not enabled,
/// calling `testdir_path` will result in a panic.
pub fn enable_testdir(&mut self) {
if self.test_dir.is_none() {
self.test_dir = Some(TempDir::new().expect("failed to create testdir"));
}
}

/// Returns the path to the test directory. Panics if the test
/// directory feature is not enabled via `enable_testdir`.
pub fn testdir_path(&self) -> &Path {
self.test_dir.as_ref().expect("testdir not enabled").path()
}

/// Returns a reference to the internal SessionContext
pub fn session_ctx(&self) -> &SessionContext {
&self.ctx
}
}

#[cfg(feature = "avro")]
pub async fn register_avro_tables(ctx: &mut crate::TestContext) {
Expand Down

0 comments on commit c83cbdd

Please sign in to comment.