From 7ad89172576c71bd91c3e9e1e1c2f04a6212f3a9 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 27 Aug 2024 15:54:18 +0300 Subject: [PATCH] refactor(cubestore): Replace direct dependency to arrow and parquet with datafusion's reexports (#8629) --- rust/cubestore/Cargo.lock | 4 +- rust/cubestore/cubestore/Cargo.toml | 2 - .../cubestore/src/cluster/message.rs | 2 +- rust/cubestore/cubestore/src/cluster/mod.rs | 6 +-- .../cubestore/src/cluster/worker_pool.rs | 2 +- rust/cubestore/cubestore/src/import/mod.rs | 2 +- rust/cubestore/cubestore/src/lib.rs | 6 +-- rust/cubestore/cubestore/src/metastore/mod.rs | 10 ++-- .../cubestore/src/metastore/table.rs | 2 +- .../src/queryplanner/check_memory.rs | 6 +-- .../cubestore/src/queryplanner/coalesce.rs | 4 +- .../src/queryplanner/filter_by_key_range.rs | 8 +-- .../info_schema/info_schema_columns.rs | 4 +- .../info_schema/info_schema_schemata.rs | 4 +- .../info_schema/info_schema_tables.rs | 4 +- .../info_schema/rocksdb_properties.rs | 4 +- .../queryplanner/info_schema/system_cache.rs | 4 +- .../queryplanner/info_schema/system_chunks.rs | 6 ++- .../info_schema/system_indexes.rs | 4 +- .../queryplanner/info_schema/system_jobs.rs | 4 +- .../info_schema/system_partitions.rs | 4 +- .../queryplanner/info_schema/system_queue.rs | 4 +- .../info_schema/system_queue_results.rs | 6 ++- .../info_schema/system_replay_handles.rs | 6 ++- .../info_schema/system_snapshots.rs | 4 +- .../queryplanner/info_schema/system_tables.rs | 6 ++- .../cubestore/src/queryplanner/mod.rs | 18 ++++--- .../cubestore/src/queryplanner/panic.rs | 2 +- .../src/queryplanner/partition_filter.rs | 4 +- .../cubestore/src/queryplanner/planning.rs | 8 +-- .../src/queryplanner/providers/query_cache.rs | 6 +-- .../src/queryplanner/query_executor.rs | 16 +++--- .../src/queryplanner/serialized_plan.rs | 4 +- .../cubestore/src/queryplanner/tail_limit.rs | 10 ++-- .../src/queryplanner/topk/execute.rs | 18 +++---- .../cubestore/src/queryplanner/topk/mod.rs | 2 +- .../cubestore/src/queryplanner/topk/plan.rs | 2 +- .../src/queryplanner/trace_data_loaded.rs | 6 +-- .../cubestore/src/queryplanner/udfs.rs | 4 +- rust/cubestore/cubestore/src/sql/mod.rs | 6 +-- .../cubestore/src/store/compaction.rs | 24 +++++---- rust/cubestore/cubestore/src/store/mod.rs | 26 ++++++---- .../cubestore/src/streaming/kafka.rs | 6 +-- .../src/streaming/kafka_post_processing.rs | 8 +-- rust/cubestore/cubestore/src/streaming/mod.rs | 52 +++++++++---------- .../src/streaming/topic_table_provider.rs | 8 +-- rust/cubestore/cubestore/src/table/data.rs | 6 +-- rust/cubestore/cubestore/src/table/mod.rs | 4 +- rust/cubestore/cubestore/src/table/parquet.rs | 22 ++++---- .../cubestore/src/table/redistribute.rs | 2 +- .../cubestore/src/util/batch_memory.rs | 4 +- 51 files changed, 200 insertions(+), 186 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index adaa08b9504e5..6c1adcd965db7 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1100,7 +1100,6 @@ dependencies = [ "actix-rt", "anyhow", "arc-swap", - "arrow", "async-compression", "async-std", "async-trait", @@ -1147,7 +1146,6 @@ dependencies = [ "msql-srv", "nanoid", "num 0.3.1", - "parquet", "parquet-format", "parse-size", "paste", @@ -1271,7 +1269,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube#823422f3b2bbdcbc0b18e1ee9bf30a377dde010a" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube#11027d53f93c550d86e32ebf75e3a54cef6c8546" dependencies = [ "ahash", "arrow", diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index a49badb8292b0..eb95829a83138 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -28,8 +28,6 @@ cubehll = { path = "../cubehll" } cubezetasketch = { path = "../cubezetasketch" } cubedatasketches = { path = "../cubedatasketches" } cuberpc = { path = "../cuberpc" } -parquet = { git = "https://github.com/cube-js/arrow-rs", branch = "cube", features = ["arrow"] } -arrow = { git = "https://github.com/cube-js/arrow-rs", branch = "cube" } datafusion = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube", features = ["default_nulls_last"] } csv = "1.1.3" bytes = "1.6.0" diff --git a/rust/cubestore/cubestore/src/cluster/message.rs b/rust/cubestore/cubestore/src/cluster/message.rs index dd09c1ab52178..19721a366197d 100644 --- a/rust/cubestore/cubestore/src/cluster/message.rs +++ b/rust/cubestore/cubestore/src/cluster/message.rs @@ -2,7 +2,7 @@ use crate::metastore::{MetaStoreRpcMethodCall, MetaStoreRpcMethodResult}; use crate::queryplanner::query_executor::SerializedRecordBatchStream; use crate::queryplanner::serialized_plan::SerializedPlan; use crate::CubeError; -use arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::SchemaRef; use serde::{Deserialize, Serialize}; use std::io::ErrorKind; use tokio::io::{AsyncReadExt, AsyncWriteExt}; diff --git a/rust/cubestore/cubestore/src/cluster/mod.rs b/rust/cubestore/cubestore/src/cluster/mod.rs index e455ca62b946d..afe622f875401 100644 --- a/rust/cubestore/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/cubestore/src/cluster/mod.rs @@ -43,10 +43,10 @@ use crate::remotefs::RemoteFs; use crate::store::ChunkDataStore; use crate::telemetry::tracing::TracingHelper; use crate::CubeError; -use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::ArrowError; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext; use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use flatbuffers::bitflags::_core::pin::Pin; diff --git a/rust/cubestore/cubestore/src/cluster/worker_pool.rs b/rust/cubestore/cubestore/src/cluster/worker_pool.rs index a02db68c5ef1b..0ff552f38006f 100644 --- a/rust/cubestore/cubestore/src/cluster/worker_pool.rs +++ b/rust/cubestore/cubestore/src/cluster/worker_pool.rs @@ -450,8 +450,8 @@ mod tests { use std::sync::Arc; use std::time::Duration; - use arrow::datatypes::{DataType, Field, Schema}; use async_trait::async_trait; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::logical_plan::ToDFSchema; use futures_timer::Delay; use serde::{Deserialize, Serialize}; diff --git a/rust/cubestore/cubestore/src/import/mod.rs b/rust/cubestore/cubestore/src/import/mod.rs index 7f9c820a2b888..8d1db1a845f97 100644 --- a/rust/cubestore/cubestore/src/import/mod.rs +++ b/rust/cubestore/cubestore/src/import/mod.rs @@ -5,12 +5,12 @@ use std::path::Path; use std::pin::Pin; use std::sync::Arc; -use arrow::array::{ArrayBuilder, ArrayRef}; use async_compression::tokio::bufread::GzipDecoder; use async_std::io::SeekFrom; use async_std::task::{Context, Poll}; use async_trait::async_trait; use bigdecimal::{BigDecimal, Num}; +use datafusion::arrow::array::{ArrayBuilder, ArrayRef}; use datafusion::cube_ext; use futures::future::join_all; use futures::{Stream, StreamExt}; diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 63d5fd55948fa..89ddb44e15599 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -15,13 +15,13 @@ extern crate core; use crate::metastore::TableId; use crate::remotefs::queue::RemoteFsOpResult; -use arrow::error::ArrowError; use cubehll::HllError; use cubezetasketch::ZetaError; +use datafusion::arrow::error::ArrowError; use datafusion::cube_ext::catch_unwind::PanicError; +use datafusion::parquet::errors::ParquetError; use flexbuffers::{DeserializationError, ReaderError}; use log::SetLoggerError; -use parquet::errors::ParquetError; use serde_derive::{Deserialize, Serialize}; use sqlparser::parser::ParserError; use std::any::Any; @@ -286,7 +286,7 @@ impl From for datafusion::error::DataFusionError { } } -impl From for CubeError { +impl From for CubeError { fn from(v: ArrowError) -> Self { CubeError::from_error(v) } diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 96a7d2237532d..7e115c465a55b 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -54,8 +54,6 @@ use crate::table::{Row, TableValue}; use crate::util::WorkerLoop; use crate::{meta_store_table_impl, CubeError}; -use arrow::datatypes::TimeUnit::Microsecond; -use arrow::datatypes::{DataType, Field}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use chrono::{DateTime, Utc}; use chunks::ChunkRocksTable; @@ -63,14 +61,16 @@ use core::fmt; use cubehll::HllSketch; use cuberockstore::rocksdb::backup::{BackupEngine, BackupEngineOptions}; use cubezetasketch::HyperLogLogPlusPlus; +use datafusion::arrow::datatypes::TimeUnit::Microsecond; +use datafusion::arrow::datatypes::{DataType, Field}; use datafusion::cube_ext; +use datafusion::parquet::basic::{ConvertedType, Repetition}; +use datafusion::parquet::{basic::Type, schema::types}; use futures_timer::Delay; use index::{IndexRocksIndex, IndexRocksTable}; use itertools::Itertools; use log::trace; use multi_index::{MultiIndex, MultiIndexRocksIndex, MultiIndexRocksTable}; -use parquet::basic::{ConvertedType, Repetition}; -use parquet::{basic::Type, schema::types}; use partition::{PartitionRocksIndex, PartitionRocksTable}; use regex::Regex; @@ -477,7 +477,7 @@ impl ColumnType { } } -impl From<&Column> for parquet::schema::types::Type { +impl From<&Column> for types::Type { fn from(column: &Column) -> Self { match column.get_column_type() { ColumnType::String => { diff --git a/rust/cubestore/cubestore/src/metastore/table.rs b/rust/cubestore/cubestore/src/metastore/table.rs index d39c020b47da3..ad0b1709dc5a5 100644 --- a/rust/cubestore/cubestore/src/metastore/table.rs +++ b/rust/cubestore/cubestore/src/metastore/table.rs @@ -7,10 +7,10 @@ use crate::queryplanner::udfs::aggregate_udf_by_kind; use crate::queryplanner::udfs::CubeAggregateUDFKind; use crate::rocks_table_impl; use crate::{base_rocks_secondary_index, CubeError}; -use arrow::datatypes::Schema as ArrowSchema; use byteorder::{BigEndian, WriteBytesExt}; use chrono::DateTime; use chrono::Utc; +use datafusion::arrow::datatypes::Schema as ArrowSchema; use datafusion::physical_plan::expressions::{Column as FusionColumn, Max, Min, Sum}; use datafusion::physical_plan::{udaf, AggregateExpr, PhysicalExpr}; use itertools::Itertools; diff --git a/rust/cubestore/cubestore/src/queryplanner/check_memory.rs b/rust/cubestore/cubestore/src/queryplanner/check_memory.rs index bf07a1f779ede..9e7879ce18fb6 100644 --- a/rust/cubestore/cubestore/src/queryplanner/check_memory.rs +++ b/rust/cubestore/cubestore/src/queryplanner/check_memory.rs @@ -1,8 +1,8 @@ use crate::util::memory::MemoryHandler; -use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::Result as ArrowResult; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::DataFusionError; use datafusion::physical_plan::{ ExecutionPlan, OptimizerHints, Partitioning, RecordBatchStream, SendableRecordBatchStream, diff --git a/rust/cubestore/cubestore/src/queryplanner/coalesce.rs b/rust/cubestore/cubestore/src/queryplanner/coalesce.rs index ca10e98c15df7..5bc88a5190645 100644 --- a/rust/cubestore/cubestore/src/queryplanner/coalesce.rs +++ b/rust/cubestore/cubestore/src/queryplanner/coalesce.rs @@ -1,5 +1,5 @@ -use arrow::array::ArrayRef; -use arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; use datafusion::cube_match_array; use datafusion::error::DataFusionError; use datafusion::physical_plan::ColumnarValue; diff --git a/rust/cubestore/cubestore/src/queryplanner/filter_by_key_range.rs b/rust/cubestore/cubestore/src/queryplanner/filter_by_key_range.rs index 157621b625f0e..011b281e3011c 100644 --- a/rust/cubestore/cubestore/src/queryplanner/filter_by_key_range.rs +++ b/rust/cubestore/cubestore/src/queryplanner/filter_by_key_range.rs @@ -1,10 +1,10 @@ use crate::queryplanner::serialized_plan::{RowFilter, RowRange}; use crate::table::data::cmp_partition_key; -use arrow::array::ArrayRef; -use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::ArrowError; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext::stream::StreamWithSchema; use datafusion::error::DataFusionError; use datafusion::physical_plan::{ diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs index 1a1547c7b2453..8d265e95d5572 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs @@ -2,9 +2,9 @@ use crate::metastore::table::TablePath; use crate::metastore::Column; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, StringArray}; -use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field}; use std::sync::Arc; pub struct ColumnsInfoSchemaTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs index 9799f1b98e8c3..f97198700bf9c 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs @@ -1,9 +1,9 @@ use crate::metastore::{IdRow, MetaStoreTable, Schema}; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, StringArray}; -use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field}; use std::sync::Arc; pub struct SchemataInfoSchemaTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs index 8085f80cf195d..f401978817a5a 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs @@ -1,9 +1,9 @@ use crate::metastore::table::TablePath; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct TablesInfoSchemaTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/rocksdb_properties.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/rocksdb_properties.rs index 29ef5ebfba115..22d3b0d8cb855 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/rocksdb_properties.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/rocksdb_properties.rs @@ -1,9 +1,9 @@ use crate::metastore::RocksPropertyRow; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, StringArray}; -use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field}; use std::sync::Arc; pub struct RocksDBPropertiesTableDef { diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs index 3e4fe96d65283..309fd7fd7f9ce 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs @@ -2,9 +2,9 @@ use crate::cachestore::CacheItem; use crate::metastore::IdRow; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemCacheTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs index f86247984546f..fc56f5306c270 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs @@ -2,9 +2,11 @@ use crate::metastore::chunks::chunk_file_name; use crate::metastore::{Chunk, IdRow, MetaStoreTable}; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ + ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array, +}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemChunksTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs index c8b021a26a0d1..ec36824ef36f6 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs @@ -1,9 +1,9 @@ use crate::metastore::{IdRow, Index, MetaStoreTable}; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, StringArray, UInt64Array}; -use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, StringArray, UInt64Array}; +use datafusion::arrow::datatypes::{DataType, Field}; use std::sync::Arc; pub struct SystemIndexesTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs index b09b8143cff40..d54fd44c05031 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs @@ -2,9 +2,9 @@ use crate::metastore::job::Job; use crate::metastore::IdRow; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray, UInt64Array}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemJobsTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs index 2716814e6f9c4..7f603a3d09759 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs @@ -2,9 +2,9 @@ use crate::metastore::partition::partition_file_name; use crate::metastore::{IdRow, MetaStoreTable, Partition}; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, BooleanArray, StringArray, UInt64Array}; -use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, BooleanArray, StringArray, UInt64Array}; +use datafusion::arrow::datatypes::{DataType, Field}; use std::sync::Arc; pub struct SystemPartitionsTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs index 03623a7d10038..4c7ccaeb98b92 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs @@ -1,9 +1,9 @@ use crate::cachestore::QueueAllItem; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, Int64Array, StringArray, TimestampNanosecondArray}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampNanosecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemQueueTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs index aa09b83e06075..08f5db63545b3 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs @@ -2,9 +2,11 @@ use crate::cachestore::QueueResult; use crate::metastore::IdRow; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ + ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array, +}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemQueueResultsTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs index 609a0ae30e1f7..894eaa88d4fc2 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs @@ -2,9 +2,11 @@ use crate::metastore::replay_handle::{ReplayHandle, SeqPointerForLocation}; use crate::metastore::IdRow; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ + ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array, +}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemReplayHandlesTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs index c1e63844a47e8..7dfe33c29e37a 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs @@ -1,9 +1,9 @@ use crate::metastore::snapshot_info::SnapshotInfo; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemSnapshotsTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs index 5cc393a39fd3c..6fb259c8957c2 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs @@ -1,9 +1,11 @@ use crate::metastore::table::TablePath; use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; use crate::CubeError; -use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::array::{ + ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array, +}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use std::sync::Arc; pub struct SystemTablesTableDef; diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index a5ce20ac42f9c..b661ab0393ab2 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -51,12 +51,12 @@ use crate::sql::cache::SqlResultCache; use crate::sql::InlineTables; use crate::store::DataFrame; use crate::{app_metrics, metastore, CubeError}; -use arrow::array::ArrayRef; -use arrow::datatypes::Field; -use arrow::record_batch::RecordBatch; -use arrow::{datatypes::Schema, datatypes::SchemaRef}; use async_trait::async_trait; use core::fmt; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::datatypes::Field; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::arrow::{datatypes::Schema, datatypes::SchemaRef}; use datafusion::catalog::TableReference; use datafusion::datasource::datasource::{Statistics, TableProviderFilterPushDown}; use datafusion::error::DataFusionError; @@ -480,15 +480,15 @@ macro_rules! base_info_schema_table_def { ($table: ty) => { #[async_trait] impl crate::queryplanner::BaseInfoSchemaTableDef for $table { - fn schema_ref(&self) -> arrow::datatypes::SchemaRef { - Arc::new(arrow::datatypes::Schema::new(self.schema())) + fn schema_ref(&self) -> datafusion::arrow::datatypes::SchemaRef { + Arc::new(datafusion::arrow::datatypes::Schema::new(self.schema())) } async fn scan( &self, ctx: crate::queryplanner::InfoSchemaTableDefContext, limit: Option, - ) -> Result { + ) -> Result { let rows = self.rows(ctx, limit).await?; let schema = self.schema_ref(); let columns = self.columns(); @@ -496,7 +496,9 @@ macro_rules! base_info_schema_table_def { .into_iter() .map(|c| c(rows.clone())) .collect::>(); - Ok(arrow::record_batch::RecordBatch::try_new(schema, columns)?) + Ok(datafusion::arrow::record_batch::RecordBatch::try_new( + schema, columns, + )?) } } }; diff --git a/rust/cubestore/cubestore/src/queryplanner/panic.rs b/rust/cubestore/cubestore/src/queryplanner/panic.rs index 52f0c14beb27d..155efe19e3f85 100644 --- a/rust/cubestore/cubestore/src/queryplanner/panic.rs +++ b/rust/cubestore/cubestore/src/queryplanner/panic.rs @@ -1,6 +1,6 @@ use crate::queryplanner::planning::WorkerExec; -use arrow::datatypes::{Schema, SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::error::DataFusionError; use datafusion::logical_plan::{DFSchema, DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::{ diff --git a/rust/cubestore/cubestore/src/queryplanner/partition_filter.rs b/rust/cubestore/cubestore/src/queryplanner/partition_filter.rs index 833bfdff34ab8..ea9c43b869bd1 100644 --- a/rust/cubestore/cubestore/src/queryplanner/partition_filter.rs +++ b/rust/cubestore/cubestore/src/queryplanner/partition_filter.rs @@ -1,6 +1,6 @@ use crate::table::{cmp_same_types, TableValue}; use crate::util::decimal::Decimal; -use arrow::datatypes::{DataType, Schema}; +use datafusion::arrow::datatypes::{DataType, Schema}; use datafusion::logical_plan::{Column, Expr, Operator}; use datafusion::scalar::ScalarValue; use std::cmp::Ordering; @@ -561,7 +561,7 @@ impl Builder<'_> { mod tests { use super::*; use crate::sql::parser::{CubeStoreParser, Statement as CubeStatement}; - use arrow::datatypes::Field; + use datafusion::arrow::datatypes::Field; use datafusion::catalog::TableReference; use datafusion::datasource::TableProvider; use datafusion::logical_plan::ToDFSchema; diff --git a/rust/cubestore/cubestore/src/queryplanner/planning.rs b/rust/cubestore/cubestore/src/queryplanner/planning.rs index fbc0c26e1bb61..2efcb66ea60b1 100644 --- a/rust/cubestore/cubestore/src/queryplanner/planning.rs +++ b/rust/cubestore/cubestore/src/queryplanner/planning.rs @@ -20,8 +20,8 @@ use std::collections::hash_map::RandomState; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use arrow::datatypes::{Field, SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::datatypes::{Field, SchemaRef}; use datafusion::error::DataFusionError; use datafusion::execution::context::ExecutionContextState; use datafusion::logical_plan::{DFSchemaRef, Expr, LogicalPlan, Operator, UserDefinedLogicalNode}; @@ -1311,7 +1311,7 @@ fn pick_partitions( Ok(partition_snapshots) } -fn partition_filter_schema(index: &IdRow) -> arrow::datatypes::Schema { +fn partition_filter_schema(index: &IdRow) -> datafusion::arrow::datatypes::Schema { let schema_fields: Vec; schema_fields = index .get_row() @@ -1320,7 +1320,7 @@ fn partition_filter_schema(index: &IdRow) -> arrow::datatypes::Schema { .map(|c| c.clone().into()) .take(index.get_row().sort_key_size() as usize) .collect(); - arrow::datatypes::Schema::new(schema_fields) + datafusion::arrow::datatypes::Schema::new(schema_fields) } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -1640,8 +1640,8 @@ pub fn get_worker_plan( pub mod tests { use std::sync::Arc; - use arrow::datatypes::Schema as ArrowSchema; use async_trait::async_trait; + use datafusion::arrow::datatypes::Schema as ArrowSchema; use datafusion::datasource::TableProvider; use datafusion::execution::context::ExecutionContext; use datafusion::logical_plan::LogicalPlan; diff --git a/rust/cubestore/cubestore/src/queryplanner/providers/query_cache.rs b/rust/cubestore/cubestore/src/queryplanner/providers/query_cache.rs index fcfd54c8c678e..12ed4ef0cea4c 100644 --- a/rust/cubestore/cubestore/src/queryplanner/providers/query_cache.rs +++ b/rust/cubestore/cubestore/src/queryplanner/providers/query_cache.rs @@ -1,9 +1,9 @@ use crate::queryplanner::project_schema; use crate::sql::cache::{sql_result_cache_sizeof, SqlResultCache}; -use arrow::array::{Array, Int64Builder, StringBuilder}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::arrow::array::{Array, Int64Builder, StringBuilder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::Statistics; use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 20110fe1404dc..c58dc44971468 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -18,7 +18,9 @@ use crate::table::{Row, TableValue, TimestampValue}; use crate::telemetry::suboptimal_query_plan_event; use crate::util::memory::MemoryHandler; use crate::{app_metrics, CubeError}; -use arrow::array::{ +use async_trait::async_trait; +use core::fmt; +use datafusion::arrow::array::{ make_array, Array, ArrayRef, BinaryArray, BooleanArray, Float64Array, Int16Array, Int32Array, Int64Array, Int64Decimal0Array, Int64Decimal10Array, Int64Decimal1Array, Int64Decimal2Array, Int64Decimal3Array, Int64Decimal4Array, Int64Decimal5Array, Int96Array, Int96Decimal0Array, @@ -26,12 +28,10 @@ use arrow::array::{ Int96Decimal4Array, Int96Decimal5Array, MutableArrayData, StringArray, TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, }; -use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; -use arrow::ipc::reader::StreamReader; -use arrow::ipc::writer::MemStreamWriter; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use core::fmt; +use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; +use datafusion::arrow::ipc::reader::StreamReader; +use datafusion::arrow::ipc::writer::MemStreamWriter; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::{Statistics, TableProviderFilterPushDown}; use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; @@ -1761,7 +1761,7 @@ fn slice_copy(a: &dyn Array, start: usize, len: usize) -> ArrayRef { #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::Field; + use datafusion::arrow::datatypes::Field; #[test] fn test_batch_to_dataframe() -> Result<(), CubeError> { diff --git a/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs b/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs index ec307d102060f..fd7e472943269 100644 --- a/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs +++ b/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs @@ -13,8 +13,8 @@ use crate::queryplanner::udfs::{ use crate::queryplanner::InfoSchemaTableProvider; use crate::table::Row; use crate::CubeError; -use arrow::datatypes::DataType; -use arrow::record_batch::RecordBatch; +use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext::alias::LogicalAlias; use datafusion::cube_ext::join::SkewedLeftCrossJoin; use datafusion::cube_ext::joinagg::CrossJoinAgg; diff --git a/rust/cubestore/cubestore/src/queryplanner/tail_limit.rs b/rust/cubestore/cubestore/src/queryplanner/tail_limit.rs index 57995c7fce93b..f93ae6fa879c5 100644 --- a/rust/cubestore/cubestore/src/queryplanner/tail_limit.rs +++ b/rust/cubestore/cubestore/src/queryplanner/tail_limit.rs @@ -1,7 +1,7 @@ -use arrow::datatypes::SchemaRef; -use arrow::error::{ArrowError, Result as ArrowResult}; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::{ArrowError, Result as ArrowResult}; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext; use datafusion::error::DataFusionError; use datafusion::physical_plan::common::{collect, combine_batches}; @@ -182,8 +182,8 @@ impl RecordBatchStream for TailLimitStream { #[cfg(test)] mod tests { use super::*; - use arrow::array::Int64Array; - use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::physical_plan::collect as result_collect; use datafusion::physical_plan::memory::MemoryExec; use itertools::Itertools; diff --git a/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs b/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs index c3afbee4646a7..08126dd2c2e43 100644 --- a/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs +++ b/rust/cubestore/cubestore/src/queryplanner/topk/execute.rs @@ -1,11 +1,11 @@ use crate::queryplanner::topk::SortColumn; use crate::queryplanner::udfs::read_sketch; -use arrow::array::ArrayRef; -use arrow::compute::SortOptions; -use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::compute::SortOptions; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::ArrowError; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext; use datafusion::error::DataFusionError; @@ -857,10 +857,10 @@ fn to_empty_sketch(s: &mut ScalarValue) { mod tests { use super::*; use crate::queryplanner::topk::{AggregateTopKExec, SortColumn}; - use arrow::array::{Array, ArrayRef, Int64Array}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use arrow::error::ArrowError; - use arrow::record_batch::RecordBatch; + use datafusion::arrow::array::{Array, ArrayRef, Int64Array}; + use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion::arrow::error::ArrowError; + use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::catalog::MemoryCatalogList; use datafusion::error::DataFusionError; use datafusion::execution::context::{ExecutionConfig, ExecutionContextState, ExecutionProps}; diff --git a/rust/cubestore/cubestore/src/queryplanner/topk/mod.rs b/rust/cubestore/cubestore/src/queryplanner/topk/mod.rs index e62b9f277db62..7ef6017b5081c 100644 --- a/rust/cubestore/cubestore/src/queryplanner/topk/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/topk/mod.rs @@ -6,7 +6,7 @@ pub use plan::materialize_topk; pub use plan::plan_topk; use crate::queryplanner::planning::Snapshots; -use arrow::compute::SortOptions; +use datafusion::arrow::compute::SortOptions; use datafusion::logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode}; use itertools::Itertools; use serde::Deserialize; diff --git a/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs b/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs index ef2a4c08cd848..ccedf71b8228e 100644 --- a/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs +++ b/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs @@ -5,7 +5,7 @@ use crate::queryplanner::udfs::{ aggregate_kind_by_name, scalar_kind_by_name, scalar_udf_by_kind, CubeAggregateUDFKind, CubeScalarUDFKind, }; -use arrow::datatypes::{DataType, Schema}; +use datafusion::arrow::datatypes::{DataType, Schema}; use datafusion::error::DataFusionError; use datafusion::execution::context::ExecutionContextState; use datafusion::logical_plan::{DFSchema, DFSchemaRef, Expr, LogicalPlan}; diff --git a/rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs b/rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs index 712b9a3546271..cbd26d9b9bc9e 100644 --- a/rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs +++ b/rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs @@ -1,8 +1,8 @@ use crate::util::batch_memory::record_batch_buffer_size; -use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::Result as ArrowResult; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::DataFusionError; use datafusion::physical_plan::{ ExecutionPlan, OptimizerHints, Partitioning, RecordBatchStream, SendableRecordBatchStream, diff --git a/rust/cubestore/cubestore/src/queryplanner/udfs.rs b/rust/cubestore/cubestore/src/queryplanner/udfs.rs index 017b996c70fd9..f5df60b97c6f6 100644 --- a/rust/cubestore/cubestore/src/queryplanner/udfs.rs +++ b/rust/cubestore/cubestore/src/queryplanner/udfs.rs @@ -1,9 +1,9 @@ use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES}; use crate::queryplanner::hll::{Hll, HllUnion}; use crate::CubeError; -use arrow::array::{Array, BinaryArray, TimestampNanosecondArray, UInt64Builder}; -use arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; use chrono::{TimeZone, Utc}; +use datafusion::arrow::array::{Array, BinaryArray, TimestampNanosecondArray, UInt64Builder}; +use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; use datafusion::cube_ext::datetime::{date_addsub_array, date_addsub_scalar}; use datafusion::error::DataFusionError; use datafusion::physical_plan::functions::Signature; diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 9edb40d9518a1..90d382c1b277b 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -4,8 +4,6 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use arrow::array::*; -use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use async_trait::async_trait; use chrono::format::Fixed::Nanosecond3; use chrono::format::Item::{Fixed, Literal, Numeric, Space}; @@ -13,6 +11,8 @@ use chrono::format::Numeric::{Day, Hour, Minute, Month, Second, Year}; use chrono::format::Pad::Zero; use chrono::format::Parsed; use chrono::{ParseResult, TimeZone, Utc}; +use datafusion::arrow::array::*; +use datafusion::arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use datafusion::cube_ext; use datafusion::physical_plan::ExecutionPlan; use datafusion::sql::parser::Statement as DFStatement; @@ -4261,7 +4261,7 @@ mod tests { sum(value) value from ( select * from test.test - union all + union all select * from test.test1 ) group by 1, 2 diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index 95eabcf9d352f..f451fd236c891 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -18,13 +18,14 @@ use crate::table::redistribute::redistribute; use crate::table::{Row, TableValue}; use crate::util::batch_memory::record_batch_buffer_size; use crate::CubeError; -use arrow::array::{ArrayRef, UInt64Array}; -use arrow::compute::{lexsort_to_indices, SortColumn, SortOptions}; -use arrow::datatypes::DataType; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; use chrono::Utc; +use datafusion::arrow::array::{ArrayRef, UInt64Array}; +use datafusion::arrow::compute::{lexsort_to_indices, SortColumn, SortOptions}; +use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext; +use datafusion::parquet::arrow::ArrowWriter; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{Column, Count, Literal}; @@ -43,7 +44,6 @@ use futures::StreamExt; use futures_util::future::join_all; use itertools::{EitherOrBoth, Itertools}; use num::integer::div_ceil; -use parquet::arrow::ArrowWriter; use std::cmp::Ordering; use std::fs::File; use std::mem::take; @@ -611,7 +611,7 @@ impl CompactionService for CompactionServiceImpl { // Concat rows from all chunks. let mut columns = Vec::with_capacity(num_columns); for i in 0..num_columns { - let v = arrow::compute::concat( + let v = datafusion::arrow::compute::concat( &data.iter().map(|a| a.column(i).as_ref()).collect_vec(), )?; columns.push(v); @@ -630,7 +630,11 @@ impl CompactionService for CompactionServiceImpl { let indices = lexsort_to_indices(&sort_key, None)?; let mut new = Vec::with_capacity(num_columns); for c in columns { - new.push(arrow::compute::take(c.as_ref(), &indices, None)?) + new.push(datafusion::arrow::compute::take( + c.as_ref(), + &indices, + None, + )?) } Ok((store, new)) }) @@ -1395,10 +1399,10 @@ mod tests { use crate::store::MockChunkDataStore; use crate::table::data::rows_to_columns; use crate::table::{cmp_same_types, Row, TableValue}; - use arrow::array::{Int64Array, StringArray}; - use arrow::datatypes::Schema; - use arrow::record_batch::RecordBatch; use cuberockstore::rocksdb::{Options, DB}; + use datafusion::arrow::array::{Int64Array, StringArray}; + use datafusion::arrow::datatypes::Schema; + use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::collect; use std::fs; use std::path::{Path, PathBuf}; diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 5c203a6c68261..559daa784cbe5 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -1,7 +1,7 @@ pub mod compaction; -use arrow::compute::{lexsort_to_indices, SortColumn, SortOptions}; use async_trait::async_trait; +use datafusion::arrow::compute::{lexsort_to_indices, SortColumn, SortOptions}; use datafusion::physical_plan::collect; use datafusion::physical_plan::common::collect as common_collect; use datafusion::physical_plan::empty::EmptyExec; @@ -24,7 +24,7 @@ use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs}; use crate::table::{Row, TableValue}; use crate::util::batch_memory::columns_vec_buffer_size; use crate::CubeError; -use arrow::datatypes::{Schema, SchemaRef}; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; use std::{ fs::File, io::{BufReader, BufWriter, Write}, @@ -39,9 +39,9 @@ use crate::metastore::chunks::chunk_file_name; use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::table::data::cmp_partition_key; use crate::table::parquet::{arrow_schema, ParquetTableStore}; -use arrow::array::{Array, ArrayRef, Int64Builder, StringBuilder, UInt64Array}; -use arrow::record_batch::RecordBatch; use compaction::{merge_chunks, merge_replay_handles}; +use datafusion::arrow::array::{Array, ArrayRef, Int64Builder, StringBuilder, UInt64Array}; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext; use datafusion::cube_ext::util::lexcmp_array_rows; use deepsize::DeepSizeOf; @@ -447,7 +447,7 @@ impl ChunkDataStore for ChunkStore { let mut columns = Vec::new(); for i in 0..batches[0].num_columns() { - columns.push(arrow::compute::concat( + columns.push(datafusion::arrow::compute::concat( &batches.iter().map(|b| b.column(i).as_ref()).collect_vec(), )?) } @@ -512,7 +512,7 @@ impl ChunkDataStore for ChunkStore { } let mut columns = Vec::new(); for i in 0..batches[0].num_columns() { - columns.push(arrow::compute::concat( + columns.push(datafusion::arrow::compute::concat( &batches.iter().map(|b| b.column(i).as_ref()).collect_vec(), )?) } @@ -635,7 +635,7 @@ impl ChunkDataStore for ChunkStore { let num_columns = data[0].num_columns(); let mut columns = Vec::with_capacity(num_columns); for i in 0..num_columns { - let v = arrow::compute::concat( + let v = datafusion::arrow::compute::concat( &data.iter().map(|a| a.column(i).as_ref()).collect_vec(), )?; columns.push(v); @@ -654,7 +654,11 @@ impl ChunkDataStore for ChunkStore { let indices = lexsort_to_indices(&sort_key, None)?; let mut new = Vec::with_capacity(num_columns); for c in columns { - new.push(arrow::compute::take(c.as_ref(), &indices, None)?) + new.push(datafusion::arrow::compute::take( + c.as_ref(), + &indices, + None, + )?) } Ok(new) }) @@ -798,8 +802,8 @@ mod tests { use crate::remotefs::LocalDirRemoteFs; use crate::table::data::{concat_record_batches, rows_to_columns}; use crate::{metastore::ColumnType, table::TableValue}; - use arrow::array::{Int64Array, StringArray}; use cuberockstore::rocksdb::{Options, DB}; + use datafusion::arrow::array::{Int64Array, StringArray}; use std::fs; use std::path::{Path, PathBuf}; @@ -1210,7 +1214,7 @@ impl ChunkStore { let to_write = UInt64Array::from(to_write); let columns = columns .iter() - .map(|c| arrow::compute::take(c.as_ref(), &to_write, None)) + .map(|c| datafusion::arrow::compute::take(c.as_ref(), &to_write, None)) .collect::, _>>()?; let columns = self.post_process_columns(index.clone(), columns).await?; @@ -1330,7 +1334,7 @@ impl ChunkStore { } } - /// Processes data into parquet files in the current task and schedules an async file upload. + /// Processes data intuet files in the current task and schedules an async file upload. /// Join the returned handle to wait for the upload to finish. async fn add_chunk_columns( &self, diff --git a/rust/cubestore/cubestore/src/streaming/kafka.rs b/rust/cubestore/cubestore/src/streaming/kafka.rs index 6aef766251620..a6c12a5dfe7e2 100644 --- a/rust/cubestore/cubestore/src/streaming/kafka.rs +++ b/rust/cubestore/cubestore/src/streaming/kafka.rs @@ -7,9 +7,9 @@ use crate::streaming::traffic_sender::TrafficSender; use crate::streaming::{parse_json_payload_and_key, StreamingSource}; use crate::table::{Row, TableValue}; use crate::CubeError; -use arrow::array::ArrayRef; use async_std::stream; use async_trait::async_trait; +use datafusion::arrow::array::ArrayRef; use datafusion::cube_ext; use futures::Stream; use json::object::Object; @@ -405,8 +405,8 @@ mod tests { use crate::queryplanner::query_executor::batches_to_dataframe; use crate::sql::MySqlDialectWithBackTicks; use crate::streaming::topic_table_provider::TopicTableProvider; - use arrow::array::StringArray; - use arrow::record_batch::RecordBatch; + use datafusion::arrow::array::StringArray; + use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; use datafusion::physical_plan::collect; use datafusion::physical_plan::memory::MemoryExec; diff --git a/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs b/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs index b247646f40966..ab5034c06287e 100644 --- a/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs +++ b/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs @@ -2,9 +2,9 @@ use crate::metastore::Column; use crate::sql::MySqlDialectWithBackTicks; use crate::streaming::topic_table_provider::TopicTableProvider; use crate::CubeError; -use arrow::array::ArrayRef; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::logical_plan::{ Column as DFColumn, DFField, DFSchema, DFSchemaRef, Expr, LogicalPlan, }; @@ -434,7 +434,7 @@ impl KafkaPostProcessPlanner { Arc::new(schema.join(&DFSchema::new(vec![DFField::new( None, self.seq_column.get_name(), - arrow::datatypes::DataType::Int64, + datafusion::arrow::datatypes::DataType::Int64, true, )])?)?) } else { diff --git a/rust/cubestore/cubestore/src/streaming/mod.rs b/rust/cubestore/cubestore/src/streaming/mod.rs index d02724ffe0e47..46f6db8827fab 100644 --- a/rust/cubestore/cubestore/src/streaming/mod.rs +++ b/rust/cubestore/cubestore/src/streaming/mod.rs @@ -17,11 +17,11 @@ use crate::table::data::{append_row, create_array_builders}; use crate::table::{Row, TableValue, TimestampValue}; use crate::util::decimal::Decimal; use crate::{app_metrics, CubeError}; -use arrow::array::ArrayBuilder; -use arrow::array::ArrayRef; use async_trait::async_trait; use buffered_stream::BufferedStream; use chrono::Utc; +use datafusion::arrow::array::ArrayBuilder; +use datafusion::arrow::array::ArrayRef; use datafusion::cube_ext::ordfloat::OrdF64; use futures::future::join_all; use futures::stream::StreamExt; @@ -1653,14 +1653,14 @@ mod tests { PARSE_TIMESTAMP(\ FORMAT_TIMESTAMP(\ CONVERT_TZ(\ - PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\'), - \\'UTC\\', - \\'UTC\\' - ), - \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:00.000\\' - ), - \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSS\\', - \\'UTC\\' + PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\'), + \\'UTC\\', + \\'UTC\\' + ), + \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:00.000\\' + ), + \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSS\\', + \\'UTC\\' ) minute_timestamp FROM EVENTS_BY_TYPE \ WHERE PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') >= PARSE_TIMESTAMP(\\'1970-01-01T01:00:00.000Z\\', \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') \ @@ -1685,14 +1685,14 @@ mod tests { PARSE_TIMESTAMP(\ FORMAT_TIMESTAMP(\ CONVERT_TZ(\ - PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\'), - \\'UTC\\', - \\'UTC\\' - ), - \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:00.000\\' - ), - \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSS\\', - \\'UTC\\' + PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\'), + \\'UTC\\', + \\'UTC\\' + ), + \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:00.000\\' + ), + \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSS\\', + \\'UTC\\' ) minute_timestamp FROM EVENTS_BY_TYPE \ WHERE PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') >= PARSE_TIMESTAMP(\\'1970-01-01T01:00:00.000Z\\', \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') \ @@ -1749,14 +1749,14 @@ mod tests { PARSE_TIMESTAMP(\ FORMAT_TIMESTAMP(\ CONVERT_TZ(\ - PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\'), - \\'UTC\\', - \\'UTC\\' - ), - \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:00.000\\' - ), - \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSS\\', - \\'UTC\\' + PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\'), + \\'UTC\\', + \\'UTC\\' + ), + \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:00.000\\' + ), + \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSS\\', + \\'UTC\\' ) minute_timestamp FROM EVENTS_BY_TYPE \ WHERE PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') >= PARSE_TIMESTAMP(\\'1970-01-01T01:00:00.000Z\\', \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') \ diff --git a/rust/cubestore/cubestore/src/streaming/topic_table_provider.rs b/rust/cubestore/cubestore/src/streaming/topic_table_provider.rs index 531ff1d4d504e..ea89e9a505650 100644 --- a/rust/cubestore/cubestore/src/streaming/topic_table_provider.rs +++ b/rust/cubestore/cubestore/src/streaming/topic_table_provider.rs @@ -1,11 +1,11 @@ use crate::metastore::Column; use crate::CubeError; -use arrow::array::{ - Array, StringArray, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder, -}; -use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; use chrono::{TimeZone, Utc}; use chrono_tz::Tz; +use datafusion::arrow::array::{ + Array, StringArray, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder, +}; +use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion::catalog::TableReference; use datafusion::datasource::datasource::Statistics; use datafusion::datasource::TableProvider; diff --git a/rust/cubestore/cubestore/src/table/data.rs b/rust/cubestore/cubestore/src/table/data.rs index f096f538dbac9..6ce58333c2c0a 100644 --- a/rust/cubestore/cubestore/src/table/data.rs +++ b/rust/cubestore/cubestore/src/table/data.rs @@ -2,11 +2,11 @@ use crate::metastore::{Column, ColumnType}; use crate::table::{Row, TableValue, TimestampValue}; use crate::util::decimal::{Decimal, Decimal96}; use crate::util::int96::Int96; -use arrow::array::{Array, ArrayBuilder, ArrayRef, StringArray}; -use arrow::record_batch::RecordBatch; use itertools::Itertools; use std::cmp::Ordering; +use datafusion::arrow::array::{Array, ArrayBuilder, ArrayRef, StringArray}; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext::ordfloat::OrdF64; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; @@ -136,7 +136,7 @@ pub fn cmp_same_types(l: &TableValueR, r: &TableValueR) -> Ordering { #[macro_export] macro_rules! match_column_type { ($t: expr, $matcher: ident) => {{ - use arrow::array::*; + use datafusion::arrow::array::*; let t = $t; match t { ColumnType::String => $matcher!(String, StringBuilder, String), diff --git a/rust/cubestore/cubestore/src/table/mod.rs b/rust/cubestore/cubestore/src/table/mod.rs index f942c260090cf..a71f0df9de5b3 100644 --- a/rust/cubestore/cubestore/src/table/mod.rs +++ b/rust/cubestore/cubestore/src/table/mod.rs @@ -1,14 +1,14 @@ use crate::util::decimal::{Decimal, Decimal96}; use crate::util::int96::Int96; -use arrow::array::{ +use datafusion::arrow::array::{ Array, ArrayRef, BinaryArray, BooleanArray, Float64Array, Int64Array, Int64Decimal0Array, Int64Decimal10Array, Int64Decimal1Array, Int64Decimal2Array, Int64Decimal3Array, Int64Decimal4Array, Int64Decimal5Array, Int96Array, Int96Decimal0Array, Int96Decimal10Array, Int96Decimal1Array, Int96Decimal2Array, Int96Decimal3Array, Int96Decimal4Array, Int96Decimal5Array, StringArray, TimestampMicrosecondArray, }; -use arrow::datatypes::{DataType, TimeUnit}; +use datafusion::arrow::datatypes::{DataType, TimeUnit}; use chrono::{SecondsFormat, TimeZone, Utc}; use datafusion::cube_ext::ordfloat::OrdF64; diff --git a/rust/cubestore/cubestore/src/table/parquet.rs b/rust/cubestore/cubestore/src/table/parquet.rs index fcf0be8396054..62bb1a5d8f2e0 100644 --- a/rust/cubestore/cubestore/src/table/parquet.rs +++ b/rust/cubestore/cubestore/src/table/parquet.rs @@ -1,12 +1,12 @@ use crate::config::injection::DIService; use crate::metastore::Index; use crate::CubeError; -use arrow::array::ArrayRef; -use arrow::datatypes::Schema; -use arrow::record_batch::RecordBatch; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; +use datafusion::parquet::file::properties::{WriterProperties, WriterVersion}; use datafusion::physical_plan::parquet::{NoopParquetMetadataCache, ParquetMetadataCache}; -use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; -use parquet::file::properties::{WriterProperties, WriterVersion}; use std::fs::File; use std::sync::Arc; @@ -111,16 +111,16 @@ mod tests { use crate::table::parquet::{arrow_schema, ParquetTableStore}; use crate::table::{Row, TableValue}; use crate::util::decimal::Decimal; - use arrow::array::{ + use datafusion::arrow::array::{ ArrayRef, BooleanArray, Float64Array, Int64Array, Int64Decimal4Array, StringArray, TimestampMicrosecondArray, }; - use arrow::record_batch::RecordBatch; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::parquet::data_type::DataType; + use datafusion::parquet::file::reader::FileReader; + use datafusion::parquet::file::reader::SerializedFileReader; + use datafusion::parquet::file::statistics::{Statistics, TypedStatistics}; use itertools::Itertools; - use parquet::data_type::DataType; - use parquet::file::reader::FileReader; - use parquet::file::reader::SerializedFileReader; - use parquet::file::statistics::{Statistics, TypedStatistics}; use pretty_assertions::assert_eq; use std::sync::Arc; use tempfile::NamedTempFile; diff --git a/rust/cubestore/cubestore/src/table/redistribute.rs b/rust/cubestore/cubestore/src/table/redistribute.rs index b9e055b87768c..974a755a31fa7 100644 --- a/rust/cubestore/cubestore/src/table/redistribute.rs +++ b/rust/cubestore/cubestore/src/table/redistribute.rs @@ -1,6 +1,6 @@ use crate::table::data::concat_record_batches; use crate::CubeError; -use arrow::record_batch::RecordBatch; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use itertools::Itertools; diff --git a/rust/cubestore/cubestore/src/util/batch_memory.rs b/rust/cubestore/cubestore/src/util/batch_memory.rs index d80297554cb59..d5829f9e5db9c 100644 --- a/rust/cubestore/cubestore/src/util/batch_memory.rs +++ b/rust/cubestore/cubestore/src/util/batch_memory.rs @@ -1,5 +1,5 @@ -use arrow::array::ArrayRef; -use arrow::record_batch::RecordBatch; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::record_batch::RecordBatch; pub fn record_batch_buffer_size(batch: &RecordBatch) -> usize { columns_vec_buffer_size(batch.columns())