From e0e0a4bcbbef519a3e2a92ad54a592386a71ecb3 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 14 Nov 2024 15:25:52 -0800 Subject: [PATCH] fmt --- crates/arroyo-api/src/cloud.rs | 4 +- crates/arroyo-api/src/lib.rs | 10 +- crates/arroyo-api/src/rest_utils.rs | 4 +- .../src/filesystem/sink/mod.rs | 4 +- .../arroyo-connectors/src/kinesis/source.rs | 2 +- .../src/schedulers/kubernetes/mod.rs | 2 +- .../arroyo-planner/src/extension/aggregate.rs | 11 +- .../arroyo-planner/src/extension/debezium.rs | 7 +- .../src/extension/key_calculation.rs | 7 +- crates/arroyo-planner/src/extension/mod.rs | 13 +- .../src/extension/remote_table.rs | 6 +- crates/arroyo-planner/src/extension/sink.rs | 6 +- .../src/extension/table_source.rs | 7 +- .../src/extension/watermark_node.rs | 10 +- crates/arroyo-planner/src/functions.rs | 6 +- crates/arroyo-planner/src/lib.rs | 125 ++++++++++-------- crates/arroyo-planner/src/logical.rs | 6 +- crates/arroyo-planner/src/physical.rs | 27 ++-- crates/arroyo-planner/src/plan/aggregate.rs | 8 +- crates/arroyo-planner/src/plan/window_fn.rs | 13 +- crates/arroyo-planner/src/rewriters.rs | 13 +- crates/arroyo-planner/src/tables.rs | 38 ++++-- crates/arroyo-server-common/src/lib.rs | 3 +- .../src/tables/expiring_time_key_map.rs | 6 +- crates/arroyo-udf/arroyo-udf-host/src/test.rs | 21 +-- crates/arroyo-worker/src/arrow/mod.rs | 7 +- crates/arroyo/src/main.rs | 4 +- crates/arroyo/src/run.rs | 4 +- 28 files changed, 221 insertions(+), 153 deletions(-) diff --git a/crates/arroyo-api/src/cloud.rs b/crates/arroyo-api/src/cloud.rs index bf47144a2..e91f192f8 100644 --- a/crates/arroyo-api/src/cloud.rs +++ b/crates/arroyo-api/src/cloud.rs @@ -1,7 +1,7 @@ -use axum_extra::headers::Authorization; +use crate::{rest_utils::ErrorResp, AuthData, OrgMetadata}; use axum_extra::headers::authorization::Bearer; +use axum_extra::headers::Authorization; use axum_extra::TypedHeader; -use crate::{rest_utils::ErrorResp, AuthData, OrgMetadata}; use cornucopia_async::Database; pub(crate) async fn authenticate( diff --git a/crates/arroyo-api/src/lib.rs b/crates/arroyo-api/src/lib.rs index 54c2b25d2..ea65d0625 100644 --- a/crates/arroyo-api/src/lib.rs +++ b/crates/arroyo-api/src/lib.rs @@ -3,7 +3,7 @@ use axum::Json; use cornucopia_async::DatabaseSource; use http::StatusCode; use serde::{Deserialize, Serialize}; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use time::OffsetDateTime; use tokio::net::TcpListener; use tonic::transport::Channel; @@ -138,11 +138,9 @@ pub async fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> any ); info!("Starting API server on {:?}", local_addr); - guard.into_spawn_task(wrap_start( - "api", - local_addr, - async { axum::serve(listener, app.into_make_service()).await }, - )); + guard.into_spawn_task(wrap_start("api", local_addr, async { + axum::serve(listener, app.into_make_service()).await + })); Ok(local_addr.port()) } diff --git a/crates/arroyo-api/src/rest_utils.rs b/crates/arroyo-api/src/rest_utils.rs index 274044385..539471a6c 100644 --- a/crates/arroyo-api/src/rest_utils.rs +++ b/crates/arroyo-api/src/rest_utils.rs @@ -3,9 +3,9 @@ use arroyo_server_common::log_event; use axum::extract::rejection::JsonRejection; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use axum::{Json}; -use axum_extra::headers::Authorization; +use axum::Json; use axum_extra::headers::authorization::Bearer; +use axum_extra::headers::Authorization; use axum_extra::TypedHeader; use serde_json::json; use tracing::{error, warn}; diff --git a/crates/arroyo-connectors/src/filesystem/sink/mod.rs b/crates/arroyo-connectors/src/filesystem/sink/mod.rs index 2b55f5f0f..5df14b831 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/mod.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/mod.rs @@ -21,6 +21,7 @@ use arroyo_storage::StorageProvider; use async_trait::async_trait; use bincode::{Decode, Encode}; use chrono::{DateTime, Utc}; +use datafusion::execution::SessionStateBuilder; use datafusion::prelude::concat; use datafusion::{ common::{Column, Result as DFResult}, @@ -35,7 +36,6 @@ use datafusion::{ physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, scalar::ScalarValue, }; -use datafusion::execution::SessionStateBuilder; use futures::{stream::FuturesUnordered, Future}; use futures::{stream::StreamExt, TryStreamExt}; use object_store::{multipart::PartId, path::Path, MultipartId}; @@ -212,7 +212,7 @@ fn partition_string_for_fields_and_time( fn compile_expression(expr: &Expr, schema: ArroyoSchemaRef) -> Result> { let physical_planner = DefaultPhysicalPlanner::default(); let session_state = SessionStateBuilder::new().build(); - + let plan = physical_planner.create_physical_expr( expr, &(schema.schema.as_ref().clone()).try_into()?, diff --git a/crates/arroyo-connectors/src/kinesis/source.rs b/crates/arroyo-connectors/src/kinesis/source.rs index f38a9bd2c..b7565b85d 100644 --- a/crates/arroyo-connectors/src/kinesis/source.rs +++ b/crates/arroyo-connectors/src/kinesis/source.rs @@ -337,7 +337,7 @@ impl KinesisSourceFunc { } async fn init_client(&mut self) { - let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28()); + let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28()); if let Some(region) = &self.aws_region { loader = loader.region(Region::new(region.clone())); } diff --git a/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs b/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs index 226bbdc19..06e88f537 100644 --- a/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs +++ b/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs @@ -8,11 +8,11 @@ use arroyo_rpc::grpc::rpc::{HeartbeatNodeReq, RegisterNodeReq, WorkerFinishedReq use arroyo_types::{WorkerId, JOB_ID_ENV, RUN_ID_ENV}; use async_trait::async_trait; use k8s_openapi::api::core::v1::Pod; +use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use kube::api::{DeleteParams, ListParams}; use kube::{Api, Client}; use serde_json::json; use std::time::Duration; -use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use tonic::Status; use tracing::{info, warn}; diff --git a/crates/arroyo-planner/src/extension/aggregate.rs b/crates/arroyo-planner/src/extension/aggregate.rs index 33f539411..8840d8ded 100644 --- a/crates/arroyo-planner/src/extension/aggregate.rs +++ b/crates/arroyo-planner/src/extension/aggregate.rs @@ -28,9 +28,14 @@ use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode}; use prost::Message; -use crate::{builder::{NamedNode, Planner, SplitPlanOutput}, fields_with_qualifiers, multifield_partial_ord, physical::ArroyoPhysicalExtensionCodec, schema_from_df_fields, schema_from_df_fields_with_metadata, DFField, WindowBehavior}; -use crate::physical::window; use super::{ArroyoExtension, NodeWithIncomingEdges, TimestampAppendExtension}; +use crate::physical::window; +use crate::{ + builder::{NamedNode, Planner, SplitPlanOutput}, + fields_with_qualifiers, multifield_partial_ord, + physical::ArroyoPhysicalExtensionCodec, + schema_from_df_fields, schema_from_df_fields_with_metadata, DFField, WindowBehavior, +}; pub(crate) const AGGREGATE_EXTENSION_NAME: &str = "AggregateExtension"; @@ -331,7 +336,7 @@ impl AggregateExtension { let timestamp_column = Column::new(timestamp_field.qualifier().cloned(), timestamp_field.name()); aggregate_fields.insert(window_index, window_field.clone()); - + let window_expression = Expr::ScalarFunction(ScalarFunction { func: window(), args: vec![ diff --git a/crates/arroyo-planner/src/extension/debezium.rs b/crates/arroyo-planner/src/extension/debezium.rs index 557a0a42a..561c1d15b 100644 --- a/crates/arroyo-planner/src/extension/debezium.rs +++ b/crates/arroyo-planner/src/extension/debezium.rs @@ -25,7 +25,12 @@ pub struct DebeziumUnrollingExtension { primary_key_names: Arc>, } -multifield_partial_ord!(DebeziumUnrollingExtension, input, primary_keys, primary_key_names); +multifield_partial_ord!( + DebeziumUnrollingExtension, + input, + primary_keys, + primary_key_names +); impl DebeziumUnrollingExtension { pub(crate) fn as_debezium_schema( diff --git a/crates/arroyo-planner/src/extension/key_calculation.rs b/crates/arroyo-planner/src/extension/key_calculation.rs index 263b8e501..dda3e4dd5 100644 --- a/crates/arroyo-planner/src/extension/key_calculation.rs +++ b/crates/arroyo-planner/src/extension/key_calculation.rs @@ -11,7 +11,12 @@ use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode}; use prost::Message; -use crate::{builder::{NamedNode, Planner}, fields_with_qualifiers, multifield_partial_ord, physical::ArroyoPhysicalExtensionCodec, schema_from_df_fields_with_metadata}; +use crate::{ + builder::{NamedNode, Planner}, + fields_with_qualifiers, multifield_partial_ord, + physical::ArroyoPhysicalExtensionCodec, + schema_from_df_fields_with_metadata, +}; use super::{ArroyoExtension, NodeWithIncomingEdges}; diff --git a/crates/arroyo-planner/src/extension/mod.rs b/crates/arroyo-planner/src/extension/mod.rs index ff92af289..cd2a27ca7 100644 --- a/crates/arroyo-planner/src/extension/mod.rs +++ b/crates/arroyo-planner/src/extension/mod.rs @@ -114,7 +114,6 @@ macro_rules! multifield_partial_ord { } } - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub(crate) struct TimestampAppendExtension { pub(crate) input: LogicalPlan, @@ -187,7 +186,17 @@ pub(crate) struct AsyncUDFExtension { pub(crate) final_schema: DFSchemaRef, } -multifield_partial_ord!(AsyncUDFExtension, input, name, udf, arg_exprs, final_exprs, ordered, max_concurrency, timeout); +multifield_partial_ord!( + AsyncUDFExtension, + input, + name, + udf, + arg_exprs, + final_exprs, + ordered, + max_concurrency, + timeout +); impl ArroyoExtension for AsyncUDFExtension { fn node_name(&self) -> Option { diff --git a/crates/arroyo-planner/src/extension/remote_table.rs b/crates/arroyo-planner/src/extension/remote_table.rs index 8103bea9a..97d817a76 100644 --- a/crates/arroyo-planner/src/extension/remote_table.rs +++ b/crates/arroyo-planner/src/extension/remote_table.rs @@ -11,7 +11,11 @@ use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode}; use prost::Message; -use crate::{builder::{NamedNode, Planner}, multifield_partial_ord, physical::ArroyoPhysicalExtensionCodec}; +use crate::{ + builder::{NamedNode, Planner}, + multifield_partial_ord, + physical::ArroyoPhysicalExtensionCodec, +}; use super::{ArroyoExtension, NodeWithIncomingEdges}; diff --git a/crates/arroyo-planner/src/extension/sink.rs b/crates/arroyo-planner/src/extension/sink.rs index 685f52d0f..602dd2bf7 100644 --- a/crates/arroyo-planner/src/extension/sink.rs +++ b/crates/arroyo-planner/src/extension/sink.rs @@ -11,7 +11,11 @@ use datafusion::logical_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalN use prost::Message; -use crate::{builder::{NamedNode, Planner}, multifield_partial_ord, tables::Table}; +use crate::{ + builder::{NamedNode, Planner}, + multifield_partial_ord, + tables::Table, +}; use super::{ debezium::ToDebeziumExtension, remote_table::RemoteTableExtension, ArroyoExtension, diff --git a/crates/arroyo-planner/src/extension/table_source.rs b/crates/arroyo-planner/src/extension/table_source.rs index 1802d816f..ef8dd62d9 100644 --- a/crates/arroyo-planner/src/extension/table_source.rs +++ b/crates/arroyo-planner/src/extension/table_source.rs @@ -10,7 +10,12 @@ use prost::Message; use super::{ArroyoExtension, DebeziumUnrollingExtension, NodeWithIncomingEdges}; use crate::tables::FieldSpec; -use crate::{builder::{NamedNode, Planner}, multifield_partial_ord, schema_from_df_fields, schemas::add_timestamp_field, tables::ConnectorTable}; +use crate::{ + builder::{NamedNode, Planner}, + multifield_partial_ord, schema_from_df_fields, + schemas::add_timestamp_field, + tables::ConnectorTable, +}; pub(crate) const TABLE_SOURCE_NAME: &str = "TableSourceExtension"; #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/crates/arroyo-planner/src/extension/watermark_node.rs b/crates/arroyo-planner/src/extension/watermark_node.rs index e0f9a1690..aa44acfa8 100644 --- a/crates/arroyo-planner/src/extension/watermark_node.rs +++ b/crates/arroyo-planner/src/extension/watermark_node.rs @@ -1,5 +1,6 @@ use crate::builder::{NamedNode, Planner}; use crate::extension::{ArroyoExtension, NodeWithIncomingEdges}; +use crate::multifield_partial_ord; use crate::schemas::add_timestamp_field; use arroyo_datastream::logical::{LogicalEdge, LogicalEdgeType, LogicalNode, OperatorName}; use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef}; @@ -12,7 +13,6 @@ use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; use prost::Message; use std::fmt::Formatter; use std::sync::Arc; -use crate::multifield_partial_ord; pub(crate) const WATERMARK_NODE_NAME: &str = "WatermarkNode"; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -24,7 +24,13 @@ pub struct WatermarkNode { timestamp_index: usize, } -multifield_partial_ord!(WatermarkNode, input, qualifier, watermark_expression, timestamp_index); +multifield_partial_ord!( + WatermarkNode, + input, + qualifier, + watermark_expression, + timestamp_index +); impl UserDefinedLogicalNodeCore for WatermarkNode { fn name(&self) -> &str { diff --git a/crates/arroyo-planner/src/functions.rs b/crates/arroyo-planner/src/functions.rs index 8b3ac0e84..b7ba16cd7 100644 --- a/crates/arroyo-planner/src/functions.rs +++ b/crates/arroyo-planner/src/functions.rs @@ -66,11 +66,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) { .register_udf(Arc::new(create_udf( "extract_json", vec![DataType::Utf8, DataType::Utf8], - DataType::List(Arc::new(Field::new( - "item", - DataType::Utf8, - true, - ))), + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), Volatility::Immutable, Arc::new(extract_json), ))) diff --git a/crates/arroyo-planner/src/lib.rs b/crates/arroyo-planner/src/lib.rs index acb884374..9705234bf 100644 --- a/crates/arroyo-planner/src/lib.rs +++ b/crates/arroyo-planner/src/lib.rs @@ -25,7 +25,6 @@ use arroyo_datastream::WindowType; use datafusion::common::{not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue}; use datafusion::datasource::DefaultTableSource; #[allow(deprecated)] - use datafusion::prelude::SessionConfig; use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; @@ -33,7 +32,10 @@ use datafusion::sql::sqlparser::parser::{Parser, ParserError}; use datafusion::sql::{planner::ContextProvider, sqlparser, TableReference}; use datafusion::logical_expr::expr::ScalarFunction; -use datafusion::logical_expr::{create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature, Volatility, WindowUDF}; +use datafusion::logical_expr::{ + create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + WindowUDF, +}; use datafusion::logical_expr::{AggregateUDF, TableSource}; use logical::LogicalBatchInput; @@ -68,9 +70,9 @@ use datafusion::logical_expr::expr_rewriter::FunctionRewrite; use datafusion::logical_expr::planner::ExprPlanner; use datafusion::optimizer::Analyzer; use datafusion::sql::sqlparser::ast::{OneOrManyWithParens, Statement}; +use std::any::Any; use std::time::{Duration, SystemTime}; use std::{collections::HashMap, sync::Arc}; -use std::any::Any; use syn::Item; use tracing::{debug, info, warn}; use unicase::UniCase; @@ -130,7 +132,7 @@ pub fn register_functions(registry: &mut dyn FunctionRegistry) { for p in SessionStateDefaults::default_window_functions() { registry.register_udwf(p).unwrap(); } - + for p in SessionStateDefaults::default_expr_planners() { registry.register_expr_planner(p).unwrap(); } @@ -144,7 +146,7 @@ struct PlaceholderUdf { return_type: Arc Result + Send + Sync + 'static>, } -impl Debug for PlaceholderUdf{ +impl Debug for PlaceholderUdf { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "PlaceholderUDF<{}>", self.name) } @@ -168,8 +170,12 @@ impl ScalarUDFImpl for PlaceholderUdf { } } -impl PlaceholderUdf { - pub fn with_return(name: impl Into, args: Vec, ret: DataType) -> Arc { +impl PlaceholderUdf { + pub fn with_return( + name: impl Into, + args: Vec, + ret: DataType, + ) -> Arc { Arc::new(ScalarUDF::new_from_impl(PlaceholderUdf { name: name.into(), signature: Signature::exact(args, Volatility::Volatile), @@ -184,54 +190,58 @@ impl ArroyoSchemaProvider { ..Default::default() }; - registry.register_udf(PlaceholderUdf::with_return( - "hop", - vec![ - DataType::Interval(datatypes::IntervalUnit::MonthDayNano), - DataType::Interval(datatypes::IntervalUnit::MonthDayNano), - ], - window_arrow_struct(), - )).unwrap(); - - registry.register_udf(PlaceholderUdf::with_return( - "tumble", - vec![ - DataType::Interval(datatypes::IntervalUnit::MonthDayNano), - ], - window_arrow_struct(), - )).unwrap(); - - registry.register_udf(PlaceholderUdf::with_return( - "session", - vec![ - DataType::Interval(datatypes::IntervalUnit::MonthDayNano), - ], - window_arrow_struct(), - )).unwrap(); - - registry.register_udf(Arc::new(ScalarUDF::new_from_impl(PlaceholderUdf { - name: "unnest".to_string(), - signature: Signature::any(1, Volatility::Volatile), - return_type: Arc::new(|args| { - match args.first().ok_or_else(|| { - DataFusionError::Plan("unnest takes one argument".to_string()) - })? { - DataType::List(t) => Ok(t.data_type().clone()), - _ => Err(DataFusionError::Plan( - "unnest may only be called on arrays".to_string(), - )), - } - }) - }))).unwrap(); + registry + .register_udf(PlaceholderUdf::with_return( + "hop", + vec![ + DataType::Interval(datatypes::IntervalUnit::MonthDayNano), + DataType::Interval(datatypes::IntervalUnit::MonthDayNano), + ], + window_arrow_struct(), + )) + .unwrap(); + + registry + .register_udf(PlaceholderUdf::with_return( + "tumble", + vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)], + window_arrow_struct(), + )) + .unwrap(); + + registry + .register_udf(PlaceholderUdf::with_return( + "session", + vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)], + window_arrow_struct(), + )) + .unwrap(); - registry.register_udf(PlaceholderUdf::with_return( - "metadata", - vec![ + registry + .register_udf(Arc::new(ScalarUDF::new_from_impl(PlaceholderUdf { + name: "unnest".to_string(), + signature: Signature::any(1, Volatility::Volatile), + return_type: Arc::new(|args| { + match args.first().ok_or_else(|| { + DataFusionError::Plan("unnest takes one argument".to_string()) + })? { + DataType::List(t) => Ok(t.data_type().clone()), + _ => Err(DataFusionError::Plan( + "unnest may only be called on arrays".to_string(), + )), + } + }), + }))) + .unwrap(); + + registry + .register_udf(PlaceholderUdf::with_return( + "metadata", + vec![DataType::Utf8], DataType::Utf8, - ], - DataType::Utf8, - )).unwrap(); - + )) + .unwrap(); + register_functions(&mut registry); registry @@ -323,7 +333,8 @@ impl ArroyoSchemaProvider { .map(|t| t.data_type.clone()) .collect(), parsed.udf.ret_type.data_type.clone(), - ))?.is_some() + ))? + .is_some() }; if replaced { @@ -690,7 +701,7 @@ pub async fn parse_and_get_arrow_program( let session_state = SessionStateBuilder::new() .with_config(config) .with_default_features() - .with_physical_optimizer_rules(vec![]) + .with_physical_optimizer_rules(vec![]) .build(); let mut inserts = vec![]; @@ -699,9 +710,7 @@ pub async fn parse_and_get_arrow_program( continue; } - if let Some(table) = - Table::try_from_statement(&statement, &schema_provider)? - { + if let Some(table) = Table::try_from_statement(&statement, &schema_provider)? { schema_provider.insert_table(table); } else { inserts.push(Insert::try_from_statement( diff --git a/crates/arroyo-planner/src/logical.rs b/crates/arroyo-planner/src/logical.rs index 39690043e..8ab122e6e 100644 --- a/crates/arroyo-planner/src/logical.rs +++ b/crates/arroyo-planner/src/logical.rs @@ -1,12 +1,10 @@ use std::{any::Any, sync::Arc}; use arrow_schema::SchemaRef; +use datafusion::catalog::Session; use datafusion::common::Result as DFResult; use datafusion::logical_expr::{Expr, TableType}; -use datafusion::{ - datasource::TableProvider, physical_plan::ExecutionPlan, -}; -use datafusion::catalog::Session; +use datafusion::{datasource::TableProvider, physical_plan::ExecutionPlan}; use serde::{Deserialize, Serialize}; use crate::physical::ArroyoMemExec; diff --git a/crates/arroyo-planner/src/physical.rs b/crates/arroyo-planner/src/physical.rs index fccca258a..0ec39b5d9 100644 --- a/crates/arroyo-planner/src/physical.rs +++ b/crates/arroyo-planner/src/physical.rs @@ -27,8 +27,9 @@ use std::{ }; use crate::functions::MultiHashFunction; -use crate::{make_udf_function, register_functions}; use crate::rewriters::UNNESTED_COL; +use crate::schemas::window_arrow_struct; +use crate::{make_udf_function, register_functions}; use arrow_array::types::{TimestampNanosecondType, UInt64Type}; use arroyo_operator::operator::Registry; use arroyo_rpc::grpc::api::{ @@ -39,8 +40,7 @@ use arroyo_rpc::{ updating_meta_field, updating_meta_fields, TIMESTAMP_FIELD, UPDATING_META_FIELD, }; use datafusion::logical_expr::{ - ColumnarValue, ScalarUDFImpl, - Signature, TypeSignature, Volatility, + ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; @@ -54,7 +54,6 @@ use prost::Message; use std::fmt::Debug; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; -use crate::schemas::window_arrow_struct; #[derive(Debug)] pub struct WindowFunctionUdf { @@ -70,7 +69,7 @@ impl Default for WindowFunctionUdf { DataType::Timestamp(TimeUnit::Nanosecond, None), ]), Volatility::Immutable, - ) + ), } } } @@ -99,15 +98,15 @@ impl ScalarUDFImpl for WindowFunctionUdf { // check both columns are of the correct type if columns[0].data_type() != DataType::Timestamp(TimeUnit::Nanosecond, None) { return plan_err!( - "window function expected first argument to be a timestamp, got {:?}", - columns[0].data_type() - ); + "window function expected first argument to be a timestamp, got {:?}", + columns[0].data_type() + ); } if columns[1].data_type() != DataType::Timestamp(TimeUnit::Nanosecond, None) { return plan_err!( - "window function expected second argument to be a timestamp, got {:?}", - columns[1].data_type() - ); + "window function expected second argument to be a timestamp, got {:?}", + columns[1].data_type() + ); } let fields = vec![ Arc::new(arrow::datatypes::Field::new( @@ -121,7 +120,7 @@ impl ScalarUDFImpl for WindowFunctionUdf { false, )), ] - .into(); + .into(); match (&columns[0], &columns[1]) { (ColumnarValue::Array(start), ColumnarValue::Array(end)) => { @@ -152,7 +151,7 @@ impl ScalarUDFImpl for WindowFunctionUdf { StructArray::new(fields, vec![start.to_array()?, end.to_array()?], None).into(), ))) } - } + } } } @@ -927,7 +926,7 @@ impl DebeziumUnrollingStream { .iter() .map(|i| ColumnarValue::Array(columns[*i].clone())) .collect::>(), - num_rows + num_rows, )?; let ids = hash.into_array(num_rows)?; diff --git a/crates/arroyo-planner/src/plan/aggregate.rs b/crates/arroyo-planner/src/plan/aggregate.rs index e10bfd7cc..eae10375f 100644 --- a/crates/arroyo-planner/src/plan/aggregate.rs +++ b/crates/arroyo-planner/src/plan/aggregate.rs @@ -9,11 +9,11 @@ use crate::{ use arroyo_rpc::{TIMESTAMP_FIELD, UPDATING_META_FIELD}; use datafusion::common::tree_node::{Transformed, TreeNodeRewriter}; use datafusion::common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result}; +use datafusion::functions_aggregate::expr_fn::max; use datafusion::logical_expr; use datafusion::logical_expr::{Aggregate, Expr, Extension, LogicalPlan}; -use std::sync::Arc; -use datafusion::functions_aggregate::expr_fn::max; use datafusion::prelude::col; +use std::sync::Arc; use tracing::debug; pub struct AggregateRewriter<'a> { @@ -80,11 +80,11 @@ impl<'a> AggregateRewriter<'a> { else { return plan_err!("no timestamp field found in schema"); }; - + let timestamp_field: DFField = timestamp_field.into(); let column = timestamp_field.qualified_column(); aggr_expr.push(max(col(column.clone()))); - + let mut output_schema_fields = fields_with_qualifiers(&schema); output_schema_fields.push(timestamp_field.clone()); let output_schema = Arc::new(schema_from_df_fields_with_metadata( diff --git a/crates/arroyo-planner/src/plan/window_fn.rs b/crates/arroyo-planner/src/plan/window_fn.rs index b336a4c5f..955f211d0 100644 --- a/crates/arroyo-planner/src/plan/window_fn.rs +++ b/crates/arroyo-planner/src/plan/window_fn.rs @@ -112,8 +112,7 @@ impl TreeNodeRewriter for WindowFunctionRewriter { // don't need to shuffle or partition by the window. additional_keys.remove(index); let key_count = additional_keys.len(); - - + let new_window_func = WindowFunction { fun, args, @@ -160,12 +159,10 @@ impl TreeNodeRewriter for WindowFunctionRewriter { let mut sort_expressions: Vec<_> = additional_keys .iter() - .map(|partition| { - logical_expr::expr::Sort { - expr: partition.clone(), - asc: true, - nulls_first: false, - } + .map(|partition| logical_expr::expr::Sort { + expr: partition.clone(), + asc: true, + nulls_first: false, }) .collect(); sort_expressions.extend(new_window_func.order_by.clone()); diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index 7efbc87ef..64da2fdef 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -26,7 +26,9 @@ use datafusion::common::{ }; use datafusion::logical_expr; use datafusion::logical_expr::expr::ScalarFunction; -use datafusion::logical_expr::{BinaryExpr, ColumnUnnestList, Expr, Extension, LogicalPlan, Projection, TableScan, Unnest}; +use datafusion::logical_expr::{ + BinaryExpr, ColumnUnnestList, Expr, Extension, LogicalPlan, Projection, TableScan, Unnest, +}; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -422,12 +424,13 @@ impl TreeNodeRewriter for UnnestRewriter { ) .qualified_column()], input: produce_list, - list_type_columns: vec![ - (unnest_idx, ColumnUnnestList { + list_type_columns: vec![( + unnest_idx, + ColumnUnnestList { output_column: Column::new_unqualified(UNNESTED_COL), depth: 1, - }) - ], + }, + )], struct_type_columns: vec![], dependency_indices: vec![], schema: Arc::new(schema_from_df_fields(&unnest_fields).unwrap()), diff --git a/crates/arroyo-planner/src/tables.rs b/crates/arroyo-planner/src/tables.rs index dc8284e9b..dd8f7141f 100644 --- a/crates/arroyo-planner/src/tables.rs +++ b/crates/arroyo-planner/src/tables.rs @@ -7,7 +7,10 @@ use arroyo_connectors::connector_for_type; use crate::extension::remote_table::RemoteTableExtension; use crate::types::convert_data_type; -use crate::{external::{ProcessingMode, SqlSource}, fields_with_qualifiers, multifield_partial_ord, parse_sql, ArroyoSchemaProvider, DFField}; +use crate::{ + external::{ProcessingMode, SqlSource}, + fields_with_qualifiers, multifield_partial_ord, parse_sql, ArroyoSchemaProvider, DFField, +}; use crate::{rewrite_plan, DEFAULT_IDLE_TIME}; use arroyo_datastream::default_sink; use arroyo_operator::connector::Connection; @@ -73,8 +76,20 @@ pub struct ConnectorTable { pub inferred_fields: Option>, } -multifield_partial_ord!(ConnectorTable, id, connector, name, connection_type, config, - description, format, event_time_field, watermark_field, idle_time, primary_keys); +multifield_partial_ord!( + ConnectorTable, + id, + connector, + name, + connection_type, + config, + description, + format, + event_time_field, + watermark_field, + idle_time, + primary_keys +); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum FieldSpec { @@ -667,12 +682,15 @@ impl Table { let SqlOption::KeyValue { key, value } = &option else { return plan_err!("Invalid with option: {:?}", option); }; - + let sqlparser::ast::Expr::Value(value) = value else { - return plan_err!("Expected a string literal in with clause, but found {}", value); + return plan_err!( + "Expected a string literal in with clause, but found {}", + value + ); }; - - with_map.insert(key.value.to_string(), value_to_inner_string(value)?); + + with_map.insert(key.value.to_string(), value_to_inner_string(value)?); } let connector = with_map.remove("connector"); @@ -862,10 +880,8 @@ fn infer_sink_schema( table_name: String, schema_provider: &mut ArroyoSchemaProvider, ) -> Result<()> { - let plan = produce_optimized_plan( - &Statement::Query(Box::new(source.clone())), - schema_provider, - )?; + let plan = + produce_optimized_plan(&Statement::Query(Box::new(source.clone())), schema_provider)?; let table = schema_provider .get_table_mut(&table_name) .ok_or_else(|| DataFusionError::Plan(format!("table {} not found", table_name)))?; diff --git a/crates/arroyo-server-common/src/lib.rs b/crates/arroyo-server-common/src/lib.rs index a1d3b0770..215de71f6 100644 --- a/crates/arroyo-server-common/src/lib.rs +++ b/crates/arroyo-server-common/src/lib.rs @@ -5,7 +5,7 @@ pub mod shutdown; use anyhow::anyhow; use arroyo_types::POSTHOG_KEY; -use axum::body::{Bytes}; +use axum::body::Bytes; use axum::extract::State; use axum::http::StatusCode; use axum::response::IntoResponse; @@ -337,7 +337,6 @@ where type Response = S::Response; type Error = S::Error; type Future = BoxFuture<'static, Result>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) diff --git a/crates/arroyo-state/src/tables/expiring_time_key_map.rs b/crates/arroyo-state/src/tables/expiring_time_key_map.rs index 0113138a6..70476de66 100644 --- a/crates/arroyo-state/src/tables/expiring_time_key_map.rs +++ b/crates/arroyo-state/src/tables/expiring_time_key_map.rs @@ -14,7 +14,6 @@ use arrow_array::{ BooleanArray, PrimitiveArray, RecordBatch, TimestampNanosecondArray, UInt64Array, }; use arrow_ord::{partition::partition, sort::sort_to_indices}; -use datafusion::parquet::arrow::async_reader::ParquetObjectReader; use arroyo_rpc::{ df::server_for_hash_array, grpc::rpc::{ @@ -27,15 +26,16 @@ use arroyo_storage::StorageProviderRef; use arroyo_types::{ from_micros, from_nanos, print_time, server_for_hash, to_micros, to_nanos, TaskInfoRef, }; +use datafusion::parquet::arrow::async_reader::ParquetObjectReader; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; +use parquet::arrow::AsyncArrowWriter; use parquet::{ - arrow::{ParquetRecordBatchStreamBuilder}, + arrow::ParquetRecordBatchStreamBuilder, basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; -use parquet::arrow::AsyncArrowWriter; use tokio::sync::mpsc::Sender; use crate::{ diff --git a/crates/arroyo-udf/arroyo-udf-host/src/test.rs b/crates/arroyo-udf/arroyo-udf-host/src/test.rs index d0aab7870..3b995a88e 100644 --- a/crates/arroyo-udf/arroyo-udf-host/src/test.rs +++ b/crates/arroyo-udf/arroyo-udf-host/src/test.rs @@ -47,15 +47,18 @@ fn test_udf() { let udf = test_udf_1::__local().config; let sync_udf: SyncUdfDylib = (&udf).try_into().unwrap(); let result = sync_udf - .invoke_batch(&[ - ColumnarValue::Array(Arc::new(Int32Array::from(vec![1, 10, 20]))), - ColumnarValue::Array(Arc::new(StringArray::from(vec!["a", "b", "c"]))), - ColumnarValue::Array(Arc::new(BinaryArray::from(vec![ - b"x".as_ref(), - b"y".as_ref(), - b"z".as_ref(), - ]))), - ], 3) + .invoke_batch( + &[ + ColumnarValue::Array(Arc::new(Int32Array::from(vec![1, 10, 20]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["a", "b", "c"]))), + ColumnarValue::Array(Arc::new(BinaryArray::from(vec![ + b"x".as_ref(), + b"y".as_ref(), + b"z".as_ref(), + ]))), + ], + 3, + ) .unwrap(); let ColumnarValue::Array(a) = result else { diff --git a/crates/arroyo-worker/src/arrow/mod.rs b/crates/arroyo-worker/src/arrow/mod.rs index 52cfcf8ca..381bf5f45 100644 --- a/crates/arroyo-worker/src/arrow/mod.rs +++ b/crates/arroyo-worker/src/arrow/mod.rs @@ -222,8 +222,11 @@ impl StatelessPhysicalExecutor { context: DecodingContext::SingleLockedBatch(batch.clone()), }; - let plan = - plan.try_into_physical_plan(registry, &RuntimeEnv::try_new(RuntimeConfig::new())?, &codec)?; + let plan = plan.try_into_physical_plan( + registry, + &RuntimeEnv::try_new(RuntimeConfig::new())?, + &codec, + )?; Ok(Self { batch, diff --git a/crates/arroyo/src/main.rs b/crates/arroyo/src/main.rs index 86a1fbf99..03c0f4ca0 100644 --- a/crates/arroyo/src/main.rs +++ b/crates/arroyo/src/main.rs @@ -415,7 +415,9 @@ async fn start_control_plane(service: CPService) { shutdown.spawn_task("admin", start_admin_server(service.name())); if service == CPService::Api || service == CPService::All { - arroyo_api::start_server(db.clone(), shutdown.guard("api")).await.unwrap(); + arroyo_api::start_server(db.clone(), shutdown.guard("api")) + .await + .unwrap(); } if service == CPService::Compiler || service == CPService::All { diff --git a/crates/arroyo/src/run.rs b/crates/arroyo/src/run.rs index c00e17aae..0e3b8df2b 100644 --- a/crates/arroyo/src/run.rs +++ b/crates/arroyo/src/run.rs @@ -414,7 +414,9 @@ pub async fn run(args: RunArgs) { config::update(|c| c.controller.rpc_port = controller_port); - let http_port = arroyo_api::start_server(db.clone(), shutdown.guard("api")).await.unwrap(); + let http_port = arroyo_api::start_server(db.clone(), shutdown.guard("api")) + .await + .unwrap(); let client = Arc::new(Client::new_with_client( &format!("http://localhost:{http_port}/api",),