Skip to content

Commit

Permalink
Fixed warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Nov 14, 2024
1 parent 037bbe3 commit eec4f91
Show file tree
Hide file tree
Showing 23 changed files with 38 additions and 43 deletions.
5 changes: 3 additions & 2 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::{
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
scalar::ScalarValue,
};
use datafusion::execution::SessionStateBuilder;
use futures::{stream::FuturesUnordered, Future};
use futures::{stream::StreamExt, TryStreamExt};
use object_store::{multipart::PartId, path::Path, MultipartId};
Expand Down Expand Up @@ -210,8 +211,8 @@ fn partition_string_for_fields_and_time(

fn compile_expression(expr: &Expr, schema: ArroyoSchemaRef) -> Result<Arc<dyn PhysicalExpr>> {
let physical_planner = DefaultPhysicalPlanner::default();
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()));
let session_state = SessionStateBuilder::new().build();

let plan = physical_planner.create_physical_expr(
expr,
&(schema.schema.as_ref().clone()).try_into()?,
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kinesis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arroyo_operator::operator::ArrowOperator;
use arroyo_rpc::retry;
use arroyo_types::CheckpointBarrier;
use async_trait::async_trait;
use aws_config::{from_env, Region};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::types::PutRecordsRequestEntry;
use aws_sdk_kinesis::Client as KinesisClient;
Expand All @@ -33,7 +33,7 @@ impl ArrowOperator for KinesisSinkFunc {
}

async fn on_start(&mut self, _ctx: &mut ArrowContext) {
let mut loader = from_env();
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
if let Some(region) = &self.aws_region {
loader = loader.region(Region::new(region.clone()));
}
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kinesis/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use arroyo_state::global_table_config;
use arroyo_state::tables::global_keyed_map::GlobalKeyedView;
use arroyo_types::{from_nanos, UserError};
use async_trait::async_trait;
use aws_config::{from_env, Region};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_kinesis::error::SdkError;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::builders::GetShardIteratorFluentBuilder;
Expand Down Expand Up @@ -337,7 +337,7 @@ impl KinesisSourceFunc {
}

async fn init_client(&mut self) {
let mut loader = from_env();
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
if let Some(region) = &self.aws_region {
loader = loader.region(Region::new(region.clone()));
}
Expand Down
8 changes: 4 additions & 4 deletions crates/arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,10 @@ impl ControllerServer {
}

pub async fn start(self, guard: ShutdownGuard) -> anyhow::Result<u16> {
let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(arroyo_rpc::grpc::API_FILE_DESCRIPTOR_SET)
.build_v1()
.unwrap();
// let reflection = tonic_reflection::server::Builder::configure()
// .register_encoded_file_descriptor_set(arroyo_rpc::grpc::API_FILE_DESCRIPTOR_SET)
// .build_v1()
// .unwrap();

let addr = SocketAddr::new(
config().controller.bind_address,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-planner/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<'a> Planner<'a> {
// need to convert to ExecutionPlan to get the partial schema.
let partial_aggregation_exec_plan = partial_aggregation_plan.try_into_physical_plan(
self.schema_provider,
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&codec,
)?;

Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-planner/src/extension/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-planner/src/extension/updating_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use super::{ArroyoExtension, IsRetractExtension, NodeWithIncomingEdges};
use crate::functions::multi_hash;
use arroyo_rpc::config::config;
use prost::Message;
use crate::multifield_partial_ord;

pub(crate) const UPDATING_AGGREGATE_EXTENSION_NAME: &str = "UpdatingAggregateExtension";

Expand Down
8 changes: 3 additions & 5 deletions crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ mod test;
mod utils;

use anyhow::bail;
use arrow::array::ArrayRef;
use arrow::datatypes::{self, DataType};
use arrow_schema::{Field, FieldRef, Schema};
use arroyo_datastream::WindowType;
Expand All @@ -27,14 +26,14 @@ use datafusion::common::{not_impl_err, plan_err, Column, DFSchema, Result, Scala
use datafusion::datasource::DefaultTableSource;
#[allow(deprecated)]

use datafusion::prelude::{create_udf, SessionConfig};
use datafusion::prelude::SessionConfig;

use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion::sql::sqlparser::parser::{Parser, ParserError};
use datafusion::sql::{planner::ContextProvider, sqlparser, TableReference};

use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{create_udaf, Expr, Extension, LogicalPlan, ReturnTypeFunction, ScalarUDF, ScalarUDFImpl, Signature, Volatility, WindowUDF};
use datafusion::logical_expr::{create_udaf, Expr, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, Signature, Volatility, WindowUDF};

use datafusion::logical_expr::{AggregateUDF, TableSource};
use logical::LogicalBatchInput;
Expand All @@ -49,7 +48,7 @@ use arroyo_datastream::logical::{DylibUdfConfig, ProgramConfig, PythonUdfConfig}
use arroyo_rpc::api_types::connections::ConnectionProfile;
use datafusion::common::DataFusionError;
use std::collections::HashSet;
use std::fmt::{write, Debug, Formatter};
use std::fmt::{Debug, Formatter};

use crate::functions::{is_json_union, serialize_outgoing_json};
use crate::rewriters::{SourceMetadataVisitor, TimeWindowUdfChecker, UnnestRewriter};
Expand All @@ -72,7 +71,6 @@ use datafusion::sql::sqlparser::ast::{OneOrManyWithParens, Statement};
use std::time::{Duration, SystemTime};
use std::{collections::HashMap, sync::Arc};
use std::any::Any;
use datafusion_functions::core::planner::CoreFunctionPlanner;
use syn::Item;
use tracing::{debug, info, warn};
use unicase::UniCase;
Expand Down
5 changes: 3 additions & 2 deletions crates/arroyo-planner/src/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use arroyo_rpc::{
updating_meta_field, updating_meta_fields, TIMESTAMP_FIELD, UPDATING_META_FIELD,
};
use datafusion::logical_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, ScalarUDFImpl,
ColumnarValue, ScalarUDFImpl,
Signature, TypeSignature, Volatility,
};
use datafusion::physical_expr::EquivalenceProperties;
Expand Down Expand Up @@ -921,12 +921,13 @@ impl DebeziumUnrollingStream {

let mut columns = unrolled_array.as_struct().columns().to_vec();

let hash = MultiHashFunction::default().invoke(
let hash = MultiHashFunction::default().invoke_batch(
&self
.primary_keys
.iter()
.map(|i| ColumnarValue::Array(columns[*i].clone()))
.collect::<Vec<_>>(),
num_rows
)?;

let ids = hash.into_array(num_rows)?;
Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-planner/src/plan/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use arroyo_rpc::{TIMESTAMP_FIELD, UPDATING_META_FIELD};
use datafusion::common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion::common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
use datafusion::logical_expr;
use datafusion::logical_expr::expr::AggregateFunction;
use datafusion::logical_expr::{Aggregate, Expr, Extension, LogicalPlan};
use std::sync::Arc;
use datafusion::functions_aggregate::expr_fn::max;
Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-planner/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use arroyo_types::ArroyoExtensionType;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::common::{config::ConfigOptions, DFSchema, Result, ScalarValue};
use datafusion::common::{plan_err, Column, DataFusionError};
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::{
CreateMemoryTable, CreateView, DdlStatement, DmlStatement, Expr, Extension, LogicalPlan,
WriteOp,
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-rpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.compile(&["proto/rpc.proto"], &["proto/"])?;
.compile_protos(&["proto/rpc.proto"], &["proto/"])?;

let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());

Expand All @@ -13,7 +13,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.file_descriptor_set_path(out_dir.join("api_descriptor.bin"))
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.type_attribute(".", "#[serde(rename_all = \"camelCase\")]")
.compile(&["proto/api.proto"], &["proto/"])?;
.compile_protos(&["proto/api.proto"], &["proto/"])?;

Ok(())
}
4 changes: 2 additions & 2 deletions crates/arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ pub mod df;

pub mod grpc {
pub mod rpc {
#![allow(clippy::derive_partial_eq_without_eq)]
#![allow(clippy::derive_partial_eq_without_eq, deprecated)]
tonic::include_proto!("arroyo_rpc");
}

pub mod api {
#![allow(clippy::derive_partial_eq_without_eq)]
#![allow(clippy::derive_partial_eq_without_eq, deprecated)]
tonic::include_proto!("api");
}

Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-server-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::net::TcpListener;
use tonic::{Status};
use tonic::transport::Server;
use tower::layer::util::Stack;
use tower::{Layer, Service};
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-udf/arroyo-udf-host/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ fn test_udf() {
let udf = test_udf_1::__local().config;
let sync_udf: SyncUdfDylib = (&udf).try_into().unwrap();
let result = sync_udf
.invoke(&[
.invoke_batch(&[
ColumnarValue::Array(Arc::new(Int32Array::from(vec![1, 10, 20]))),
ColumnarValue::Array(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
ColumnarValue::Array(Arc::new(BinaryArray::from(vec![
b"x".as_ref(),
b"y".as_ref(),
b"z".as_ref(),
]))),
])
], 3)
.unwrap();

let ColumnarValue::Array(a) = result else {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/arrow/instant_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl OperatorConstructor for InstantJoinConstructor {
};
let join_exec = join_physical_plan_node.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new())?,
&RuntimeEnv::try_new(RuntimeConfig::new())?,
&codec,
)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/arrow/join_with_expiration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl OperatorConstructor for JoinWithExpirationConstructor {
let join_physical_plan_node = PhysicalPlanNode::decode(&mut config.join_plan.as_slice())?;
let join_execution_plan = join_physical_plan_node.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new())?,
&RuntimeEnv::try_new(RuntimeConfig::new())?,
&codec,
)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl StatelessPhysicalExecutor {
};

let plan =
plan.try_into_physical_plan(registry, &RuntimeEnv::new(RuntimeConfig::new())?, &codec)?;
plan.try_into_physical_plan(registry, &RuntimeEnv::try_new(RuntimeConfig::new())?, &codec)?;

Ok(Self {
batch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ impl OperatorConstructor for SessionAggregatingWindowConstructor {
let final_plan = PhysicalPlanNode::decode(&mut config.final_aggregation_plan.as_slice())?;
let final_execution_plan = final_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&codec,
)?;

Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-worker/src/arrow/sliding_aggregating_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl OperatorConstructor for SlidingAggregatingWindowConstructor {

let partial_aggregation_plan = partial_aggregation_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&codec,
)?;

Expand All @@ -494,14 +494,14 @@ impl OperatorConstructor for SlidingAggregatingWindowConstructor {
};
let finish_execution_plan = finish_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&final_codec,
)?;

let final_projection = PhysicalPlanNode::decode(&mut config.final_projection.as_slice())?;
let final_projection = final_projection.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&final_codec,
)?;

Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl OperatorConstructor for TumblingAggregateWindowConstructor {
// for each bin.
let partial_aggregation_plan = partial_aggregation_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&codec,
)?;

Expand All @@ -161,7 +161,7 @@ impl OperatorConstructor for TumblingAggregateWindowConstructor {
// deserialize the finish plan to read directly from a Vec<RecordBatch> behind a RWLock.
let finish_execution_plan = finish_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&final_codec,
)?;
let finish_projection = config
Expand All @@ -173,7 +173,7 @@ impl OperatorConstructor for TumblingAggregateWindowConstructor {
.map(|finish_projection| {
finish_projection.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&final_codec,
)
})
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-worker/src/arrow/updating_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl OperatorConstructor for UpdatingAggregatingConstructor {
// deserialize partial aggregation into execution plan with an UnboundedBatchStream source.
let partial_aggregation_plan = partial_aggregation_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&codec,
)?;

Expand All @@ -388,15 +388,15 @@ impl OperatorConstructor for UpdatingAggregatingConstructor {
let combine_plan = PhysicalPlanNode::decode(&mut config.combine_plan.as_slice())?;
let combine_execution_plan = combine_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&codec,
)?;

let finish_plan = PhysicalPlanNode::decode(&mut config.final_aggregation_plan.as_slice())?;

let finish_execution_plan = finish_plan.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new()).unwrap(),
&RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(),
&codec,
)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/arrow/window_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl OperatorConstructor for WindowFunctionConstructor {
};
let window_exec = window_exec.try_into_physical_plan(
registry.as_ref(),
&RuntimeEnv::new(RuntimeConfig::new())?,
&RuntimeEnv::try_new(RuntimeConfig::new())?,
&codec,
)?;
let input_schema_unkeyed = Arc::new(ArroyoSchema::from_schema_unkeyed(
Expand Down

0 comments on commit eec4f91

Please sign in to comment.