Skip to content

Commit

Permalink
remove rustfmt.toml
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 22, 2024
1 parent 28564cd commit c10de07
Show file tree
Hide file tree
Showing 50 changed files with 579 additions and 1,169 deletions.
23 changes: 6 additions & 17 deletions ballista-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ impl Command {
Self::Quit => Err(BallistaError::Internal(
"Unexpected quit, this should be handled outside".to_string(),
)),
Self::ListFunctions => {
display_all_functions().map_err(BallistaError::DataFusionError)
}
Self::ListFunctions => display_all_functions().map_err(BallistaError::DataFusionError),
Self::SearchFunctions(function) => {
if let Ok(func) = function.parse::<Function>() {
let details = func.function_details()?;
Expand All @@ -105,8 +103,7 @@ impl Command {
}
}
Self::OutputFormat(_) => Err(BallistaError::Internal(
"Unexpected change output format, this should be handled outside"
.to_string(),
"Unexpected change output format, this should be handled outside".to_string(),
)),
}
}
Expand All @@ -120,9 +117,7 @@ impl Command {
Self::ListFunctions => ("\\h", "function list"),
Self::SearchFunctions(_) => ("\\h function", "search function"),
Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"),
Self::OutputFormat(_) => {
("\\pset [NAME [VALUE]]", "set table output option\n(format)")
}
Self::OutputFormat(_) => ("\\pset [NAME [VALUE]]", "set table output option\n(format)"),
}
}
}
Expand Down Expand Up @@ -173,16 +168,10 @@ impl FromStr for Command {
("?", None) => Self::Help,
("h", None) => Self::ListFunctions,
("h", Some(function)) => Self::SearchFunctions(function.into()),
("quiet", Some("true" | "t" | "yes" | "y" | "on")) => {
Self::QuietMode(Some(true))
}
("quiet", Some("false" | "f" | "no" | "n" | "off")) => {
Self::QuietMode(Some(false))
}
("quiet", Some("true" | "t" | "yes" | "y" | "on")) => Self::QuietMode(Some(true)),
("quiet", Some("false" | "f" | "no" | "n" | "off")) => Self::QuietMode(Some(false)),
("quiet", None) => Self::QuietMode(None),
("pset", Some(subcommand)) => {
Self::OutputFormat(Some(subcommand.to_string()))
}
("pset", Some(subcommand)) => Self::OutputFormat(Some(subcommand.to_string())),
("pset", None) => Self::OutputFormat(None),
_ => return Err(()),
})
Expand Down
9 changes: 2 additions & 7 deletions ballista-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,11 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
Command::OutputFormat(subcommand) => {
if let Some(subcommand) = subcommand {
if let Ok(command) = subcommand.parse::<OutputFormat>() {
if let Err(e) =
command.execute(&mut print_options).await
{
if let Err(e) = command.execute(&mut print_options).await {
eprintln!("{e}")
}
} else {
eprintln!(
"'\\{}' is not a valid command",
&line[1..]
);
eprintln!("'\\{}' is not a valid command", &line[1..]);
}
} else {
println!("Output format is {:?}.", print_options.format);
Expand Down
52 changes: 18 additions & 34 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::{source_as_provider, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{
CreateExternalTable, DdlStatement, LogicalPlan, TableScan,
};
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan, TableScan};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext,
};
Expand All @@ -58,11 +56,7 @@ struct BallistaContextState {
}

impl BallistaContextState {
pub fn new(
scheduler_host: String,
scheduler_port: u16,
config: &BallistaConfig,
) -> Self {
pub fn new(scheduler_host: String, scheduler_port: u16, config: &BallistaConfig) -> Self {
Self {
config: config.clone(),
scheduler_host,
Expand Down Expand Up @@ -90,8 +84,7 @@ impl BallistaContext {
) -> ballista_core::error::Result<Self> {
let state = BallistaContextState::new(host.to_owned(), port, config);

let scheduler_url =
format!("http://{}:{}", &state.scheduler_host, state.scheduler_port);
let scheduler_url = format!("http://{}:{}", &state.scheduler_host, state.scheduler_port);
info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
Expand Down Expand Up @@ -169,11 +162,7 @@ impl BallistaContext {
}

/// Register a DataFrame as a table that can be referenced from a SQL query
pub fn register_table(
&self,
name: &str,
table: Arc<dyn TableProvider>,
) -> Result<()> {
pub fn register_table(&self, name: &str, table: Arc<dyn TableProvider>) -> Result<()> {
let mut state = self.state.lock();
state.tables.insert(name.to_owned(), table);
Ok(())
Expand All @@ -188,9 +177,7 @@ impl BallistaContext {
let plan = self
.read_csv(path, options)
.await
.map_err(|e| {
DataFusionError::Context(format!("Can't read CSV: {path}"), Box::new(e))
})?
.map_err(|e| DataFusionError::Context(format!("Can't read CSV: {path}"), Box::new(e)))?
.into_optimized_plan()?;
match plan {
LogicalPlan::TableScan(TableScan { source, .. }) => {
Expand Down Expand Up @@ -275,9 +262,8 @@ impl BallistaContext {
if is_show {
let state = self.state.lock();
ctx = Arc::new(SessionContext::new_with_config(
SessionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
),
SessionConfig::new()
.with_information_schema(state.config.default_with_information_schema()),
));
}

Expand All @@ -303,19 +289,17 @@ impl BallistaContext {
let plan = ctx.state().create_logical_plan(sql).await?;

match plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
ref delimiter,
ref table_partition_cols,
ref if_not_exists,
..
},
)) => {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
ref delimiter,
ref table_partition_cols,
ref if_not_exists,
..
})) => {
let table_exists = ctx.table_exist(name)?;
let schema: SchemaRef = Arc::new(schema.as_ref().to_owned().into());
let table_partition_cols = table_partition_cols
Expand Down
5 changes: 2 additions & 3 deletions ballista/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ pub use ballista_core::{
config::{
BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING,
BALLISTA_PLUGIN_DIR, BALLISTA_REPARTITION_AGGREGATIONS,
BALLISTA_REPARTITION_JOINS, BALLISTA_REPARTITION_WINDOWS,
BALLISTA_WITH_INFORMATION_SCHEMA,
BALLISTA_PLUGIN_DIR, BALLISTA_REPARTITION_AGGREGATIONS, BALLISTA_REPARTITION_JOINS,
BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA,
},
error::{BallistaError, Result},
};
Expand Down
28 changes: 9 additions & 19 deletions ballista/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ impl BallistaClient {
pub async fn try_new(host: &str, port: u16) -> Result<Self> {
let addr = format!("http://{host}:{port}");
debug!("BallistaClient connecting to {}", addr);
let connection =
create_grpc_client_connection(addr.clone())
.await
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
let connection = create_grpc_client_connection(addr.clone())
.await
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
"Error connecting to Ballista scheduler or executor at {addr}: {e:?}"
))
})?;
})?;
let flight_client = FlightServiceClient::new(connection);
debug!("BallistaClient connected OK");

Expand Down Expand Up @@ -109,10 +108,7 @@ impl BallistaClient {
}

/// Execute an action and retrieve the results
pub async fn execute_action(
&mut self,
action: &Action,
) -> Result<SendableRecordBatchStream> {
pub async fn execute_action(&mut self, action: &Action) -> Result<SendableRecordBatchStream> {
let serialized_action: protobuf::Action = action.to_owned().try_into()?;

let mut buf: Vec<u8> = Vec::with_capacity(serialized_action.encoded_len());
Expand All @@ -127,10 +123,7 @@ impl BallistaClient {
"Remote shuffle read fail, retry {} times, sleep {} ms.",
i, IO_RETRY_WAIT_TIME_MS
);
tokio::time::sleep(std::time::Duration::from_millis(
IO_RETRY_WAIT_TIME_MS,
))
.await;
tokio::time::sleep(std::time::Duration::from_millis(IO_RETRY_WAIT_TIME_MS)).await;
}

let request = tonic::Request::new(Ticket {
Expand Down Expand Up @@ -173,11 +166,8 @@ impl BallistaClient {
}
Err(e) => {
if i == IO_RETRIES_TIMES - 1 || e.code() != Code::Unknown {
return BallistaError::GrpcActionError(format!(
"{:?}",
e.to_string()
))
.into();
return BallistaError::GrpcActionError(format!("{:?}", e.to_string()))
.into();
}
continue;
}
Expand Down
27 changes: 8 additions & 19 deletions ballista/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,10 @@ impl From<String> for BallistaError {
impl From<ArrowError> for BallistaError {
fn from(e: ArrowError) -> Self {
match e {
ArrowError::ExternalError(e)
if e.downcast_ref::<BallistaError>().is_some() =>
{
ArrowError::ExternalError(e) if e.downcast_ref::<BallistaError>().is_some() => {
*e.downcast::<BallistaError>().unwrap()
}
ArrowError::ExternalError(e)
if e.downcast_ref::<DataFusionError>().is_some() =>
{
ArrowError::ExternalError(e) if e.downcast_ref::<DataFusionError>().is_some() => {
BallistaError::DataFusionError(*e.downcast::<DataFusionError>().unwrap())
}
other => BallistaError::ArrowError(other),
Expand Down Expand Up @@ -228,24 +224,17 @@ impl Display for BallistaError {
impl From<BallistaError> for FailedTask {
fn from(e: BallistaError) -> Self {
match e {
BallistaError::FetchFailed(
executor_id,
map_stage_id,
map_partition_id,
desc,
) => {
BallistaError::FetchFailed(executor_id, map_stage_id, map_partition_id, desc) => {
FailedTask {
error: desc,
// fetch partition error is considered to be non-retryable
retryable: false,
count_to_failures: false,
failed_reason: Some(FailedReason::FetchPartitionError(
FetchPartitionError {
executor_id,
map_stage_id: map_stage_id as u32,
map_partition_id: map_partition_id as u32,
},
)),
failed_reason: Some(FailedReason::FetchPartitionError(FetchPartitionError {
executor_id,
map_stage_id: map_stage_id as u32,
map_partition_id: map_partition_id as u32,
})),
}
}
BallistaError::IoError(io) => {
Expand Down
14 changes: 6 additions & 8 deletions ballista/core/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ pub struct EventLoop<E> {
}

impl<E: Send + 'static> EventLoop<E> {
pub fn new(
name: String,
buffer_size: usize,
action: Arc<dyn EventAction<E>>,
) -> Self {
pub fn new(name: String, buffer_size: usize, action: Arc<dyn EventAction<E>>) -> Self {
Self {
name,
buffer_size,
Expand Down Expand Up @@ -116,9 +112,11 @@ impl<E: Send + 'static> EventLoop<E> {

pub fn get_sender(&self) -> Result<EventSender<E>> {
Ok(EventSender {
tx_event: self.tx_event.as_ref().cloned().ok_or_else(|| {
BallistaError::General("Event sender not exist!!!".to_string())
})?,
tx_event: self
.tx_event
.as_ref()
.cloned()
.ok_or_else(|| BallistaError::General("Event sender not exist!!!".to_string()))?,
})
}
}
Expand Down
40 changes: 14 additions & 26 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
}

impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
Expand Down Expand Up @@ -181,23 +177,18 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
assert_eq!(0, partition);

let mut buf: Vec<u8> = vec![];
let plan_message = T::try_from_logical_plan(
&self.plan,
self.extension_codec.as_ref(),
)
.map_err(|e| {
DataFusionError::Internal(format!("failed to serialize logical plan: {e:?}"))
})?;
let plan_message = T::try_from_logical_plan(&self.plan, self.extension_codec.as_ref())
.map_err(|e| {
DataFusionError::Internal(format!("failed to serialize logical plan: {e:?}"))
})?;
plan_message.try_encode(&mut buf).map_err(|e| {
DataFusionError::Execution(format!("failed to encode logical plan: {e:?}"))
})?;

let query = ExecuteQueryParams {
query: Some(Query::LogicalPlan(buf)),
settings: vec![],
optional_session_id: Some(OptionalSessionId::SessionId(
self.session_id.clone(),
)),
optional_session_id: Some(OptionalSessionId::SessionId(self.session_id.clone())),
};

let stream = futures::stream::once(
Expand Down Expand Up @@ -296,8 +287,7 @@ async fn execute_query(
}
Some(job_status::Status::Successful(successful)) => {
let streams = successful.partition_location.into_iter().map(|p| {
let f = fetch_partition(p)
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
let f = fetch_partition(p).map_err(|e| ArrowError::ExternalError(Box::new(e)));

futures::stream::once(f).try_flatten()
});
Expand All @@ -308,15 +298,13 @@ async fn execute_query(
}
}

async fn fetch_partition(
location: PartitionLocation,
) -> Result<SendableRecordBatchStream> {
let metadata = location.executor_meta.ok_or_else(|| {
DataFusionError::Internal("Received empty executor metadata".to_owned())
})?;
let partition_id = location.partition_id.ok_or_else(|| {
DataFusionError::Internal("Received empty partition id".to_owned())
})?;
async fn fetch_partition(location: PartitionLocation) -> Result<SendableRecordBatchStream> {
let metadata = location
.executor_meta
.ok_or_else(|| DataFusionError::Internal("Received empty executor metadata".to_owned()))?;
let partition_id = location
.partition_id
.ok_or_else(|| DataFusionError::Internal("Received empty partition id".to_owned()))?;
let host = metadata.host.as_str();
let port = metadata.port as u16;
let mut ballista_client = BallistaClient::try_new(host, port)
Expand Down
Loading

0 comments on commit c10de07

Please sign in to comment.