Skip to content

Commit

Permalink
fix ci & delete ExecutionEngine trait
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 2, 2024
1 parent d7afcf7 commit 3465972
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 59 deletions.
37 changes: 0 additions & 37 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ jobs:
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cargo test
cd examples
cargo run --example standalone_sql --features=ballista/standalone
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
Expand Down Expand Up @@ -336,41 +334,6 @@ jobs:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"

docker:
name: Docker
needs: [linux-build-lib, react-build]
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v3
- name: Restore rust artifacts
uses: actions/download-artifact@v2
with:
name: rust-artifacts
path: /home/runner/work/arrow-ballista/arrow-ballista/target/release
- name: Restore react artifacts
uses: actions/download-artifact@v2
with:
name: react-artifacts
path: /home/runner/work/arrow-ballista/arrow-ballista/ballista/scheduler/ui/build
- name: Build and push Docker image
run: |
echo "github user is $DOCKER_USER"
docker build -t arrow-ballista-standalone:latest -f dev/docker/ballista-standalone.Dockerfile .
export DOCKER_TAG="$(git describe --exact-match --tags $(git log -n1 --pretty='%h') || echo '')"
if [[ $DOCKER_TAG =~ ^[0-9\.]+-rc[0-9]+$ ]]
then
echo "publishing docker tag $DOCKER_TAG"
docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS"
docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
fi
env:
DOCKER_USER: ${{ github.actor }}
DOCKER_PASS: ${{ secrets.GITHUB_TOKEN }}

clippy:
name: Clippy
needs: [linux-build-lib]
Expand Down
18 changes: 3 additions & 15 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@ use datafusion::physical_plan::ExecutionPlan;
use std::fmt::Debug;
use std::sync::Arc;

/// Execution engine extension point
pub trait ExecutionEngine: Sync + Send {
fn create_query_stage_exec(
&self,
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>>;
}

/// QueryStageExecutor executes a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
Expand All @@ -53,10 +41,10 @@ pub trait QueryStageExecutor: Sync + Send + Debug {
fn collect_plan_metrics(&self) -> Vec<MetricsSet>;
}

pub struct DefaultExecutionEngine {}
pub struct DatafusionExecutionEngine {}

impl ExecutionEngine for DefaultExecutionEngine {
fn create_query_stage_exec(
impl DatafusionExecutionEngine {
pub fn create_query_stage_exec(
&self,
job_id: String,
stage_id: usize,
Expand Down
9 changes: 4 additions & 5 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

//! Ballista executor logic
use crate::execution_engine::DefaultExecutionEngine;
use crate::execution_engine::ExecutionEngine;
use crate::execution_engine::DatafusionExecutionEngine;
use crate::execution_engine::QueryStageExecutor;
use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
Expand Down Expand Up @@ -91,7 +90,7 @@ pub struct Executor {

/// Execution engine that the executor will delegate to
/// for executing query stages
pub(crate) execution_engine: Arc<dyn ExecutionEngine>,
pub(crate) execution_engine: Arc<DatafusionExecutionEngine>,
}

impl Executor {
Expand All @@ -103,7 +102,7 @@ impl Executor {
runtime_with_data_cache: Option<Arc<RuntimeEnv>>,
metrics_collector: Arc<dyn ExecutorMetricsCollector>,
concurrent_tasks: usize,
execution_engine: Option<Arc<dyn ExecutionEngine>>,
execution_engine: Option<Arc<DatafusionExecutionEngine>>,
) -> Self {
Self {
metadata,
Expand All @@ -118,7 +117,7 @@ impl Executor {
concurrent_tasks,
abort_handles: Default::default(),
execution_engine: execution_engine
.unwrap_or_else(|| Arc::new(DefaultExecutionEngine {})),
.unwrap_or_else(|| Arc::new(DatafusionExecutionEngine {})),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use ballista_core::utils::{
};
use ballista_core::BALLISTA_VERSION;

use crate::execution_engine::ExecutionEngine;
use crate::execution_engine::DatafusionExecutionEngine;
use crate::executor::{Executor, TasksDrainedFuture};
use crate::executor_server;
use crate::executor_server::TERMINATING;
Expand Down Expand Up @@ -91,7 +91,7 @@ pub struct ExecutorProcessConfig {
pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
pub execution_engine: Option<Arc<DatafusionExecutionEngine>>,
}

pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
Expand Down

0 comments on commit 3465972

Please sign in to comment.