Skip to content

Commit

Permalink
[WIP cube_match_array! hygiene] refactor(cubestore): Replace direct d…
Browse files Browse the repository at this point in the history
…ependency to arrow and parquet with datafusion's reexports
  • Loading branch information
mcheshkov committed Aug 24, 2024
1 parent 861f13e commit 88c305c
Show file tree
Hide file tree
Showing 51 changed files with 201 additions and 185 deletions.
2 changes: 0 additions & 2 deletions rust/cubestore/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions rust/cubestore/cubestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ cubehll = { path = "../cubehll" }
cubezetasketch = { path = "../cubezetasketch" }
cubedatasketches = { path = "../cubedatasketches" }
cuberpc = { path = "../cuberpc" }
parquet = { git = "https://github.com/cube-js/arrow-rs", branch = "cube", features = ["arrow"] }
arrow = { git = "https://github.com/cube-js/arrow-rs", branch = "cube" }
datafusion = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube", features = ["default_nulls_last"] }
csv = "1.1.3"
bytes = "1.6.0"
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/cluster/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::metastore::{MetaStoreRpcMethodCall, MetaStoreRpcMethodResult};
use crate::queryplanner::query_executor::SerializedRecordBatchStream;
use crate::queryplanner::serialized_plan::SerializedPlan;
use crate::CubeError;
use arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::SchemaRef;
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down
6 changes: 3 additions & 3 deletions rust/cubestore/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ use crate::remotefs::RemoteFs;
use crate::store::ChunkDataStore;
use crate::telemetry::tracing::TracingHelper;
use crate::CubeError;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::cube_ext;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use flatbuffers::bitflags::_core::pin::Pin;
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/cluster/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use arrow::datatypes::{DataType, Field, Schema};
use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::ToDFSchema;
use futures_timer::Delay;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;

use arrow::array::{ArrayBuilder, ArrayRef};
use async_compression::tokio::bufread::GzipDecoder;
use async_std::io::SeekFrom;
use async_std::task::{Context, Poll};
use async_trait::async_trait;
use bigdecimal::{BigDecimal, Num};
use datafusion::arrow::array::{ArrayBuilder, ArrayRef};
use datafusion::cube_ext;
use futures::future::join_all;
use futures::{Stream, StreamExt};
Expand Down
6 changes: 3 additions & 3 deletions rust/cubestore/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ extern crate core;

use crate::metastore::TableId;
use crate::remotefs::queue::RemoteFsOpResult;
use arrow::error::ArrowError;
use cubehll::HllError;
use cubezetasketch::ZetaError;
use datafusion::arrow::error::ArrowError;
use datafusion::cube_ext::catch_unwind::PanicError;
use datafusion::parquet::errors::ParquetError;
use flexbuffers::{DeserializationError, ReaderError};
use log::SetLoggerError;
use parquet::errors::ParquetError;
use serde_derive::{Deserialize, Serialize};
use sqlparser::parser::ParserError;
use std::any::Any;
Expand Down Expand Up @@ -286,7 +286,7 @@ impl From<CubeError> for datafusion::error::DataFusionError {
}
}

impl From<arrow::error::ArrowError> for CubeError {
impl From<ArrowError> for CubeError {
fn from(v: ArrowError) -> Self {
CubeError::from_error(v)
}
Expand Down
10 changes: 5 additions & 5 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,23 @@ use crate::table::{Row, TableValue};

use crate::util::WorkerLoop;
use crate::{meta_store_table_impl, CubeError};
use arrow::datatypes::TimeUnit::Microsecond;
use arrow::datatypes::{DataType, Field};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use chrono::{DateTime, Utc};
use chunks::ChunkRocksTable;
use core::fmt;
use cubehll::HllSketch;
use cuberockstore::rocksdb::backup::{BackupEngine, BackupEngineOptions};
use cubezetasketch::HyperLogLogPlusPlus;
use datafusion::arrow::datatypes::TimeUnit::Microsecond;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::cube_ext;
use datafusion::parquet::basic::{ConvertedType, Repetition};
use datafusion::parquet::{basic::Type, schema::types};
use futures_timer::Delay;
use index::{IndexRocksIndex, IndexRocksTable};
use itertools::Itertools;
use log::trace;
use multi_index::{MultiIndex, MultiIndexRocksIndex, MultiIndexRocksTable};
use parquet::basic::{ConvertedType, Repetition};
use parquet::{basic::Type, schema::types};
use partition::{PartitionRocksIndex, PartitionRocksTable};
use regex::Regex;

Expand Down Expand Up @@ -477,7 +477,7 @@ impl ColumnType {
}
}

impl From<&Column> for parquet::schema::types::Type {
impl From<&Column> for types::Type {
fn from(column: &Column) -> Self {
match column.get_column_type() {
ColumnType::String => {
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/metastore/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use crate::queryplanner::udfs::aggregate_udf_by_kind;
use crate::queryplanner::udfs::CubeAggregateUDFKind;
use crate::rocks_table_impl;
use crate::{base_rocks_secondary_index, CubeError};
use arrow::datatypes::Schema as ArrowSchema;
use byteorder::{BigEndian, WriteBytesExt};
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::datatypes::Schema as ArrowSchema;
use datafusion::physical_plan::expressions::{Column as FusionColumn, Max, Min, Sum};
use datafusion::physical_plan::{udaf, AggregateExpr, PhysicalExpr};
use itertools::Itertools;
Expand Down
6 changes: 3 additions & 3 deletions rust/cubestore/cubestore/src/queryplanner/check_memory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::util::memory::MemoryHandler;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::{
ExecutionPlan, OptimizerHints, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Expand Down
6 changes: 4 additions & 2 deletions rust/cubestore/cubestore/src/queryplanner/coalesce.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datafusion::cube_match_array;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
Expand Down Expand Up @@ -147,5 +147,7 @@ fn do_coalesce(start: &ArrayRef, rest: &[ColumnarValue]) -> Result<ArrayRef, Dat
Ok(Arc::new(b.finish()))
}};
}
//TODO improve cube_match_array! hygiene instead
use datafusion::arrow;
cube_match_array!(start, apply_coalesce)
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::queryplanner::serialized_plan::{RowFilter, RowRange};
use crate::table::data::cmp_partition_key;
use arrow::array::ArrayRef;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::cube_ext::stream::StreamWithSchema;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::metastore::table::TablePath;
use crate::metastore::Column;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, StringArray};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field};
use std::sync::Arc;

pub struct ColumnsInfoSchemaTableDef;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::metastore::{IdRow, MetaStoreTable, Schema};
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, StringArray};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field};
use std::sync::Arc;

pub struct SchemataInfoSchemaTableDef;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::metastore::table::TablePath;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct TablesInfoSchemaTableDef;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::metastore::RocksPropertyRow;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, StringArray};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field};
use std::sync::Arc;

pub struct RocksDBPropertiesTableDef {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::cachestore::CacheItem;
use crate::metastore::IdRow;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemCacheTableDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use crate::metastore::chunks::chunk_file_name;
use crate::metastore::{Chunk, IdRow, MetaStoreTable};
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{
ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array,
};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemChunksTableDef;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::metastore::{IdRow, Index, MetaStoreTable};
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, StringArray, UInt64Array};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringArray, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field};
use std::sync::Arc;

pub struct SystemIndexesTableDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::metastore::job::Job;
use crate::metastore::IdRow;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringArray, TimestampNanosecondArray, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemJobsTableDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::metastore::partition::partition_file_name;
use crate::metastore::{IdRow, MetaStoreTable, Partition};
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, BooleanArray, StringArray, UInt64Array};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, BooleanArray, StringArray, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field};
use std::sync::Arc;

pub struct SystemPartitionsTableDef;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::cachestore::QueueAllItem;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, Int64Array, StringArray, TimestampNanosecondArray};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampNanosecondArray};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemQueueTableDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use crate::cachestore::QueueResult;
use crate::metastore::IdRow;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{
ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array,
};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemQueueResultsTableDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use crate::metastore::replay_handle::{ReplayHandle, SeqPointerForLocation};
use crate::metastore::IdRow;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{
ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array,
};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemReplayHandlesTableDef;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::metastore::snapshot_info::SnapshotInfo;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemSnapshotsTableDef;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::metastore::table::TablePath;
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
use crate::CubeError;
use arrow::array::{ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, TimeUnit};
use async_trait::async_trait;
use datafusion::arrow::array::{
ArrayRef, BooleanArray, StringArray, TimestampNanosecondArray, UInt64Array,
};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use std::sync::Arc;

pub struct SystemTablesTableDef;
Expand Down
Loading

0 comments on commit 88c305c

Please sign in to comment.