Skip to content

Commit

Permalink
Consolidate dataframe example (apache#13410)
Browse files Browse the repository at this point in the history
* Consolidate dataframe example

* update readme

* Update datafusion-examples/examples/dataframe.rs

* Update datafusion-examples/README.md

---------

Co-authored-by: Oleks V <[email protected]>
  • Loading branch information
alamb and comphead authored Nov 14, 2024
1 parent a5d0563 commit e7f7a9b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 123 deletions.
3 changes: 1 addition & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ cargo run --example dataframe
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
Expand Down
146 changes: 85 additions & 61 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,90 +15,82 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::error::Result;
use datafusion::prelude::*;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use tempfile::tempdir;

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
/// This example demonstrates using DataFusion's DataFrame API to
///
/// * [read_parquet]: execute queries against parquet files
/// * [read_csv]: execute queries against csv files
/// * [read_memory]: execute queries against in-memory arrow data
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
Ok(())
}

/// Use DataFrame API to
/// 1. Read parquet files,
/// 2. Show the schema
/// 3. Select columns and rows
async fn read_parquet(ctx: &SessionContext) -> Result<()> {
// Find the local path of "alltypes_plain.parquet"
let testdata = datafusion::test_util::parquet_test_data();

let filename = &format!("{testdata}/alltypes_plain.parquet");

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// print the results
df.show().await?;

// create a csv file waiting to be written
let dir = tempdir()?;
let file_path = dir.path().join("example.csv");
let file = File::create(&file_path)?;
write_csv_file(file);

// Reading CSV file with inferred schema example
let csv_df =
example_read_csv_file_with_inferred_schema(file_path.to_str().unwrap()).await;
csv_df.show().await?;

// Reading CSV file with defined schema
let csv_df = example_read_csv_file_with_schema(file_path.to_str().unwrap()).await;
csv_df.show().await?;

// Reading PARQUET file and print describe
// Read the parquet files and show its schema using 'describe'
let parquet_df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?;
parquet_df.describe().await.unwrap().show().await?;

let dyn_ctx = ctx.enable_url_table();
let df = dyn_ctx
.sql(&format!("SELECT * FROM '{}'", file_path.to_str().unwrap()))
// show its schema using 'describe'
parquet_df.clone().describe().await?.show().await?;

// Select three columns and filter the results
// so that only rows where id > 1 are returned
parquet_df
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?
.show()
.await?;
df.show().await?;

Ok(())
}

// Function to create an test CSV file
fn write_csv_file(mut file: File) {
// Create the data to put into the csv file with headers
let content = r#"id,time,vote,unixtime,rating
a1,"10 6, 2013",3,1381017600,5.0
a2,"08 9, 2013",2,1376006400,4.5"#;
// write the data
file.write_all(content.as_ref())
.expect("Problem with writing file!");
}
/// Use the DataFrame API to
/// 1. Read CSV files
/// 2. Optionally specify schema
async fn read_csv(ctx: &SessionContext) -> Result<()> {
// create example.csv file in a temporary directory
let dir = tempdir()?;
let file_path = dir.path().join("example.csv");
{
let mut file = File::create(&file_path)?;
// write CSV data
file.write_all(
r#"id,time,vote,unixtime,rating
a1,"10 6, 2013",3,1381017600,5.0
a2,"08 9, 2013",2,1376006400,4.5"#
.as_bytes(),
)?;
} // scope closes the file
let file_path = file_path.to_str().unwrap();

// Example to read data from a csv file with inferred schema
async fn example_read_csv_file_with_inferred_schema(file_path: &str) -> DataFrame {
// Create a session context
let ctx = SessionContext::new();
// Register a lazy DataFrame using the context
ctx.read_csv(file_path, CsvReadOptions::default())
.await
.unwrap()
}
// You can read a CSV file and DataFusion will infer the schema automatically
let csv_df = ctx.read_csv(file_path, CsvReadOptions::default()).await?;
csv_df.show().await?;

// Example to read csv file with a defined schema for the csv file
async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame {
// Create a session context
let ctx = SessionContext::new();
// Define the schema
// If you know the types of your data you can specify them explicitly
let schema = Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("time", DataType::Utf8, false),
Expand All @@ -112,6 +104,38 @@ async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame {
schema: Some(&schema),
..Default::default()
};
// Register a lazy DataFrame by using the context and option provider
ctx.read_csv(file_path, csv_read_option).await.unwrap()
let csv_df = ctx.read_csv(file_path, csv_read_option).await?;
csv_df.show().await?;

// You can also create DataFrames from the result of sql queries
// and using the `enable_url_table` refer to local files directly
let dyn_ctx = ctx.clone().enable_url_table();
let csv_df = dyn_ctx
.sql(&format!("SELECT rating, unixtime FROM '{}'", file_path))
.await?;
csv_df.show().await?;

Ok(())
}

/// Use the DataFrame API to:
/// 1. Read in-memory data.
async fn read_memory(ctx: &SessionContext) -> Result<()> {
// define data in memory
let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;

// declare a table in memory. In Apache Spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL
let filter = col("b").eq(lit(10));
let df = df.select_columns(&["a", "b"])?.filter(filter)?;

// print the results
df.show().await?;

Ok(())
}
60 changes: 0 additions & 60 deletions datafusion-examples/examples/dataframe_in_memory.rs

This file was deleted.

0 comments on commit e7f7a9b

Please sign in to comment.