Skip to content

Commit

Permalink
Add example for using a separate threadpool for CPU bound work
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 14, 2024
1 parent e7f7a9b commit de91326
Showing 1 changed file with 153 additions and 0 deletions.
153 changes: 153 additions & 0 deletions datafusion-examples/examples/thread_pools.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::util::pretty::pretty_format_batches;
use datafusion::error::Result;
use datafusion::prelude::*;
use futures::stream::StreamExt;

/// This example shows how to use a separate thread pool (tokio [`Runtime`])) to
/// run DataFusion CPU intensive jobs.
///
/// If you just run a DataFusion plan without setting up a separate `Runtime`,
/// it will execute CPU intensive jobs on the same thread pool as any I/O which
/// can cause the issues described in the [Architecture section] such as
/// bandwidth being throttled by network congestion control and increasing
/// latencies for processing network messages.

/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it in
/// a thread local variable as the "current" runtime (on which `async` futures,
/// streams and tasks are run).
///
/// Basic programs do not need to delve into this level of detail, but for this
/// example it is important to understand which [`Runtime`] is the "current" one.
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new()
// enabling URL table means we can select directly from
// paths in SQL queries.
.enable_url_table();
let sql = format!(
"SELECT * FROM '{}/alltypes_plain.parquet'",
datafusion::test_util::parquet_test_data()
);

same_runtime(&ctx, &sql).await?;
different_runtime(ctx, sql).await?;
Ok(())
}

/// Demonstrates how to run queries directly on the current tokio `Runtime`
///
/// This is how examples are shown in DataFusion and works well for development
/// and local query processing.
async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
// Note that calling .sql is an async function as it may also do network
// I/O, for example to contact a remote catalog or do an object store LIST
let df = ctx.sql(sql).await?;

// Many examples call `collect` or `show()` which buffers the results, but
// internally DataFusion generates output a RecordBatch at a time

// Calling `execute_stream` on a DataFrame returns a
// `SendableRecordBatchStream` which can then be incrementally polled
let mut stream = df.execute_stream().await?;

// Calling `next()` drives the plan, producing new `RecordBatch`es using the
// current runtime (and typically also the current thread).
//
// Note that executing the plan like this results in all CPU intensive work
// is done on the same runtime that is used to do IO by default.
while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
}
Ok(())
}

/// Demonstrates how to run queries on a **different** runtime than the current one
///
/// This is typically how you should run DataFusion queries from a network
/// server or when processing data from a remote object store.
async fn different_runtime(ctx: SessionContext, sql: String) -> Result<()> {
// First, we need a new runtime, which we can create with the tokio builder
// however, since we are already in the context of another runtime
// (installed by #[tokio::main]) we create a new thread for the runtime
tokio::task::spawn_blocking(move || {
std::thread::spawn(move || thread_entry(ctx, sql.to_string()))
.join()
.expect("thread did not panic")
})
.await
.expect("task did not panic")
}

/// This is the entry point of thread that we started our second runtime on
fn thread_entry(ctx: SessionContext, sql: String) -> Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
// only enable the time driver (not the I/O driver), meaning this
// runtime will not be able to perform network I/O
.enable_time()
.build()?;

// Now we can run the actual code we want on a different runtime
runtime.block_on(async move { different_runtime_inner(ctx, sql).await })

}

/// this is run on a new, separate runtime
async fn different_runtime_inner(ctx: SessionContext, sql: String) -> Result<()> {
// Setup execution as before
let df = ctx.sql(&sql)
.await?;

let mut stream = df.execute_stream().await?;


//XXX Note at this point, calling next() will be run on our new threadpool. However, this will also spawn the catalog and object store requests on the same threadpool as well!

// While this will mean we don't interfere with handling of other network requests, it will mean tht the network requests that happen as part of query processing will still be running on the same threadpool

// TODO show this working

// To avoid this, all IO access, both catalog and data (e.g. object_store) must be spawned on to their own runtime, like this:
// TODO....


//
// care is required to avoid calling `next()` (aka polling) from the default IO thread (even if planning / execution is run on that other thread)
// Best practice is to do all of DataFusion planning / execution on a separate pool. Note that some care is required for remote catalogs such as iceberg that
// themselves do network IO
// TODO figure out how to cause an erorr due to io thread

while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
}



//
// You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you.

// Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools.

// Using separate Runtime will avoid other requests being messed up but it won't really help requests made from DataSources such as
// reading parquet files from object_store.
//
// Thus this runtime also disables IO so that we are sure there is no IO work being done on it.

Ok(())
}

0 comments on commit de91326

Please sign in to comment.