Skip to content

Commit

Permalink
[NativeIO/Fix] Add error info of native writer && fix case of aux_sor…
Browse files Browse the repository at this point in the history
…t_cols (lakesoul-io#547)

* add error info of native writer && fix case of aux_sort_cols

Signed-off-by: zenghua <[email protected]>

* fix clippy

Signed-off-by: zenghua <[email protected]>

* do cargo fmt

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Sep 27, 2024
1 parent 281a085 commit 068f935
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 168 deletions.
1 change: 0 additions & 1 deletion rust/lakesoul-io/src/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion::{
use datafusion_common::{DataFusionError, Result};
use parquet::format::FileMetaData;


// The result of a flush operation with format (partition_desc, file_path, file_meta)
pub type WriterFlushResult = Result<Vec<(String, String, FileMetaData)>>;

Expand Down
9 changes: 5 additions & 4 deletions rust/lakesoul-io/src/async_writer/multipart_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use url::Url;

use crate::{
constant::TBD_PARTITION_DESC, helpers::get_batch_memory_size, lakesoul_io_config::{create_session_context, LakeSoulIOConfig}, transform::{uniform_record_batch, uniform_schema}
constant::TBD_PARTITION_DESC,
helpers::get_batch_memory_size,
lakesoul_io_config::{create_session_context, LakeSoulIOConfig},
transform::{uniform_record_batch, uniform_schema},
};

use super::{AsyncBatchWriter, WriterFlushResult, InMemBuf};
use super::{AsyncBatchWriter, InMemBuf, WriterFlushResult};

/// An async writer using object_store's multi-part upload feature for cloud storage.
/// This writer uses a `VecDeque<u8>` as `std::io::Write` for arrow-rs's ArrowWriter.
Expand Down Expand Up @@ -169,7 +172,6 @@ impl MultiPartAsyncWriter {

#[async_trait::async_trait]
impl AsyncBatchWriter for MultiPartAsyncWriter {

async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()> {
let batch = uniform_record_batch(batch)?;
self.num_rows += batch.num_rows() as u64;
Expand Down Expand Up @@ -213,5 +215,4 @@ impl AsyncBatchWriter for MultiPartAsyncWriter {
fn buffered_size(&self) -> u64 {
self.buffered_size
}

}
37 changes: 13 additions & 24 deletions rust/lakesoul-io/src/async_writer/partitioning_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,26 @@ use datafusion::{
PhysicalSortExpr,
},
physical_plan::{
projection::ProjectionExec,
sorts::sort::SortExec,
stream::RecordBatchReceiverStream,
ExecutionPlan, Partitioning, PhysicalExpr,
projection::ProjectionExec, sorts::sort::SortExec, stream::RecordBatchReceiverStream, ExecutionPlan,
Partitioning, PhysicalExpr,
},
};
use datafusion_common::{DataFusionError, Result};

use rand::distributions::DistString;
use tokio::{
sync::mpsc::Sender,
task::JoinHandle,
};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;
use tracing::debug;

use crate::{
helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values},
helpers::{
columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values,
},
lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder},
repartition::RepartitionByRangeAndHashExec,
};

use super::{AsyncBatchWriter, WriterFlushResult, MultiPartAsyncWriter, ReceiverStreamExec};
use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult};

// type PartitionedWriterInfo = Arc<Mutex<HashMap<String, Vec<WriterFlushResult>>>>;

Expand Down Expand Up @@ -75,7 +72,7 @@ impl PartitioningAsyncWriter {
task_context.clone(),
config.clone().into(),
Arc::new(config.range_partitions.clone()),
write_id.clone()
write_id.clone(),
));
// // In a separate task, wait for each input to be done
// // (and pass along any errors, including panic!s)
Expand Down Expand Up @@ -198,7 +195,6 @@ impl PartitioningAsyncWriter {

let mut err = None;


let mut partitioned_writer = HashMap::<String, Box<MultiPartAsyncWriter>>::new();
let mut flush_join_handle_list = Vec::new();
// let mut partitioned_flush_result_locked = partitioned_flush_result.lock().await;
Expand Down Expand Up @@ -230,7 +226,6 @@ impl PartitioningAsyncWriter {
// row_count += batch_excluding_range.num_rows();
async_writer.write_record_batch(batch_excluding_range).await?;
}

}
// received abort signal
Err(e) => {
Expand All @@ -256,19 +251,13 @@ impl PartitioningAsyncWriter {
}
Ok(flush_join_handle_list)
} else {

for (partition_desc, writer) in partitioned_writer.into_iter() {

let flush_result = tokio::spawn(async move {
let writer_flush_results =writer.flush_and_close().await?;
Ok(
writer_flush_results.into_iter().map(
|(_, path, file_metadata)|
{
(partition_desc.clone(), path, file_metadata)
}
).collect::<Vec<_>>()
)
let writer_flush_results = writer.flush_and_close().await?;
Ok(writer_flush_results
.into_iter()
.map(|(_, path, file_metadata)| (partition_desc.clone(), path, file_metadata))
.collect::<Vec<_>>())
});
flush_join_handle_list.push(flush_result);
}
Expand Down
8 changes: 3 additions & 5 deletions rust/lakesoul-io/src/async_writer/sort_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ use datafusion::{
PhysicalSortExpr,
},
physical_plan::{
projection::ProjectionExec,
sorts::sort::SortExec,
stream::RecordBatchReceiverStream,
ExecutionPlan, PhysicalExpr,
projection::ProjectionExec, sorts::sort::SortExec, stream::RecordBatchReceiverStream, ExecutionPlan,
PhysicalExpr,
},
};
use datafusion_common::{DataFusionError, Result};
Expand All @@ -24,7 +22,7 @@ use tokio_stream::StreamExt;

use crate::{helpers::get_batch_memory_size, lakesoul_io_config::LakeSoulIOConfig};

use super::{AsyncBatchWriter, WriterFlushResult, MultiPartAsyncWriter, ReceiverStreamExec};
use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult};

/// Wrap the above async writer with a SortExec to
/// sort the batches before write to async writer
Expand Down
7 changes: 4 additions & 3 deletions rust/lakesoul-io/src/filter/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ impl Parser {
}

pub(crate) fn parse_proto(plan: &Plan, df_schema: &DFSchema) -> Result<Expr> {

let function_extension = plan
.extensions
.iter()
Expand Down Expand Up @@ -733,7 +732,10 @@ fn _from_nullability(nullability: Nullability) -> bool {
mod tests {
use std::result::Result;

use datafusion::{logical_expr::{LogicalPlan, TableScan}, prelude::{ParquetReadOptions, SessionContext}};
use datafusion::{
logical_expr::{LogicalPlan, TableScan},
prelude::{ParquetReadOptions, SessionContext},
};
use prost::Message;

use super::*;
Expand All @@ -750,7 +752,6 @@ mod tests {

#[tokio::test]
async fn tt() {

let ctx = SessionContext::new();
let options = ParquetReadOptions::default();
let table_path = "/var/folders/_b/qyl87wbn1119cvw8kts6fqtw0000gn/T/lakeSource/type/part-00000-97db3149-f99e-404a-aa9a-2af4ab3f7a44_00000.c000.parquet";
Expand Down
37 changes: 20 additions & 17 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use url::Url;

use crate::{
constant::{
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING,
TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT,
TIMESTAMP_SECOND_FORMAT, LAKESOUL_COMMA, LAKESOUL_EQ
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_COMMA, LAKESOUL_EMPTY_STRING, LAKESOUL_EQ,
LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT,
TIMESTAMP_SECOND_FORMAT,
},
filter::parser::Parser,
lakesoul_io_config::LakeSoulIOConfig,
Expand Down Expand Up @@ -169,12 +169,10 @@ pub fn format_scalar_value(v: &ScalarValue) -> String {
}
ScalarValue::Decimal128(Some(s), _, _) => format!("{}", s),
ScalarValue::Decimal256(Some(s), _, _) => format!("{}", s),
ScalarValue::Binary(e)
| ScalarValue::FixedSizeBinary(_, e)
| ScalarValue::LargeBinary(e) => match e {
Some(bytes) => hex::encode(bytes),
None => LAKESOUL_NULL_STRING.to_string(),
}
ScalarValue::Binary(e) | ScalarValue::FixedSizeBinary(_, e) | ScalarValue::LargeBinary(e) => match e {
Some(bytes) => hex::encode(bytes),
None => LAKESOUL_NULL_STRING.to_string(),
},
other => other.to_string(),
}
}
Expand All @@ -192,7 +190,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
},
DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(None, *p, *s)),
DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(None, *p, *s)),
DataType::Binary=> Ok(ScalarValue::Binary(None)),
DataType::Binary => Ok(ScalarValue::Binary(None)),
DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, None)),
DataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
_ => Ok(ScalarValue::Null),
Expand All @@ -204,7 +202,9 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
if val.eq(LAKESOUL_EMPTY_STRING) {
Ok(ScalarValue::Utf8(Some("".to_string())))
} else {
Ok(ScalarValue::Utf8(Some(val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ","))))
Ok(ScalarValue::Utf8(Some(
val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ","),
)))
}
}
DataType::Timestamp(unit, timezone) => match unit {
Expand Down Expand Up @@ -264,7 +264,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
},
DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(Some(val.parse::<i128>().unwrap()), *p, *s)),
DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(Some(i256::from_string(val).unwrap()), *p, *s)),
DataType::Binary=> Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))),
DataType::Binary => Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))),
DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, Some(hex::decode(val).unwrap()))),
DataType::LargeBinary => Ok(ScalarValue::LargeBinary(Some(hex::decode(val).unwrap()))),
_ => ScalarValue::try_from_string(val.to_string(), data_type),
Expand Down Expand Up @@ -526,7 +526,11 @@ pub fn timestamp_str_to_unix_time(value: &str, fmt: &str) -> Result<Duration> {
Ok(datetime.signed_duration_since(epoch_time.naive_utc()))
}

pub fn column_with_name_and_name2index<'a>(schema: &'a SchemaRef, name: &str, name_to_index: &Option<HashMap<String, usize>>) -> Option<(usize, &'a Field)> {
pub fn column_with_name_and_name2index<'a>(
schema: &'a SchemaRef,
name: &str,
name_to_index: &Option<HashMap<String, usize>>,
) -> Option<(usize, &'a Field)> {
if let Some(name_to_index) = name_to_index {
name_to_index.get(name).map(|index| (*index, schema.field(*index)))
} else {
Expand All @@ -535,12 +539,11 @@ pub fn column_with_name_and_name2index<'a>(schema: &'a SchemaRef, name: &str, na
}

pub fn get_batch_memory_size(batch: &RecordBatch) -> Result<usize> {
Ok(
batch.columns()
Ok(batch
.columns()
.iter()
.map(|array| array.to_data().get_slice_memory_size())
.collect::<std::result::Result<Vec<usize>, ArrowError>>()?
.into_iter()
.sum()
)
.sum())
}
4 changes: 1 addition & 3 deletions rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ pub static OPTION_DEFAULT_VALUE_KEEP_ORDERS: &str = "false";

pub static OPTION_KEY_MEM_LIMIT: &str = "mem_limit";
pub static OPTION_KEY_POOL_SIZE: &str = "pool_size";
pub static OPTION_KEY_HASH_BUCKET_ID : &str = "hash_bucket_id";
pub static OPTION_KEY_HASH_BUCKET_ID: &str = "hash_bucket_id";
pub static OPTION_KEY_MAX_FILE_SIZE: &str = "max_file_size";



#[derive(Debug, Derivative)]
#[derivative(Default, Clone)]
pub struct LakeSoulIOConfig {
Expand Down
Loading

0 comments on commit 068f935

Please sign in to comment.