Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add docs, part of #37 #6453

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow_flight::sql::server::PeekableFlightDataStream;
use arrow_flight::sql::DoPutPreparedStatementResult;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use core::str;
use futures::{stream, Stream, TryStreamExt};
use once_cell::sync::Lazy;
use prost::Message;
Expand Down Expand Up @@ -168,7 +169,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let bytes = BASE64_STANDARD
.decode(base64)
.map_err(|e| status!("authorization not decodable", e))?;
let str = String::from_utf8(bytes).map_err(|e| status!("authorization not parsable", e))?;
let str = str::from_utf8(&bytes).map_err(|e| status!("authorization not parsable", e))?;
let parts: Vec<_> = str.split(':').collect();
let (user, pass) = match parts.as_slice() {
[user, pass] => (user, pass),
Expand Down
1 change: 1 addition & 0 deletions arrow-flight/src/bin/flight_sql_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow_flight::{
};
use arrow_schema::Schema;
use clap::{Parser, Subcommand};
use core::str;
use futures::TryStreamExt;
use tonic::{
metadata::MetadataMap,
Expand Down
7 changes: 6 additions & 1 deletion arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,33 +388,38 @@ struct FlightStreamState {
/// FlightData and the decoded payload (Schema, RecordBatch), if any
#[derive(Debug)]
pub struct DecodedFlightData {
/// The original FlightData message
pub inner: FlightData,
/// The decoded payload
pub payload: DecodedPayload,
}

impl DecodedFlightData {
/// Create a new DecodedFlightData with no payload
pub fn new_none(inner: FlightData) -> Self {
Self {
inner,
payload: DecodedPayload::None,
}
}

/// Create a new DecodedFlightData with a [`Schema`] payload
pub fn new_schema(inner: FlightData, schema: SchemaRef) -> Self {
Self {
inner,
payload: DecodedPayload::Schema(schema),
}
}

/// Create a new [`DecodedFlightData`] with a [`RecordBatch`] payload
pub fn new_record_batch(inner: FlightData, batch: RecordBatch) -> Self {
Self {
inner,
payload: DecodedPayload::RecordBatch(batch),
}
}

/// return the metadata field of the inner flight data
/// Return the metadata field of the inner flight data
pub fn app_metadata(&self) -> Bytes {
self.inner.app_metadata.clone()
}
Expand Down
3 changes: 2 additions & 1 deletion arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Default for FlightDataEncoderBuilder {
}

impl FlightDataEncoderBuilder {
/// Create a new [`FlightDataEncoderBuilder`].
pub fn new() -> Self {
Self::default()
}
Expand Down Expand Up @@ -1403,7 +1404,7 @@ mod tests {
let input_rows = batch.num_rows();

let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes);
let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect();
let sizes: Vec<_> = split.iter().map(RecordBatch::num_rows).collect();
let output_rows: usize = sizes.iter().sum();

assert_eq!(sizes, expected_sizes, "mismatch for {batch:?}");
Expand Down
2 changes: 2 additions & 0 deletions arrow-flight/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub enum FlightError {
}

impl FlightError {
/// Generate a new `FlightError::ProtocolError` variant.
pub fn protocol(message: impl Into<String>) -> Self {
Self::ProtocolError(message.into())
}
Expand Down Expand Up @@ -98,6 +99,7 @@ impl From<FlightError> for tonic::Status {
}
}

/// Result type for the Apache Arrow Flight crate
pub type Result<T> = std::result::Result<T, FlightError>;

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
//!
//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
#![allow(rustdoc::invalid_html_tags)]
#![warn(missing_docs)]

use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
use arrow_schema::{ArrowError, Schema};
Expand All @@ -52,6 +53,8 @@ type ArrowResult<T> = std::result::Result<T, ArrowError>;

#[allow(clippy::all)]
mod gen {
// Since this file is auto-generated, we suppress all warnings
ByteBaker marked this conversation as resolved.
Show resolved Hide resolved
#![allow(missing_docs)]
include!("arrow.flight.protocol.rs");
}

Expand Down Expand Up @@ -125,6 +128,7 @@ use flight_descriptor::DescriptorType;

/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
pub struct SchemaAsIpc<'a> {
/// Data type representing a schema and its IPC write options
pub pair: (&'a Schema, &'a IpcWriteOptions),
}

Expand Down Expand Up @@ -684,6 +688,7 @@ impl PollInfo {
}

impl<'a> SchemaAsIpc<'a> {
/// Create a new `SchemaAsIpc` from a `Schema` and `IpcWriteOptions`
pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
SchemaAsIpc {
pair: (schema, options),
Expand Down
4 changes: 3 additions & 1 deletion arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,11 @@ fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
}
}

// A polymorphic structure to natively represent different types of data contained in `FlightData`
/// A polymorphic structure to natively represent different types of data contained in `FlightData`
pub enum ArrowFlightData {
/// A record batch
RecordBatch(RecordBatch),
/// A schema
Schema(Schema),
}

Expand Down
13 changes: 3 additions & 10 deletions arrow-flight/src/sql/metadata/sql_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl SqlInfoUnionBuilder {
///
/// Servers constuct - usually static - [`SqlInfoData`] via the [`SqlInfoDataBuilder`],
/// and build responses using [`CommandGetSqlInfo::into_builder`]
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Default)]
pub struct SqlInfoDataBuilder {
/// Use BTreeMap to ensure the values are sorted by value as
/// to make output consistent
Expand All @@ -341,17 +341,10 @@ pub struct SqlInfoDataBuilder {
infos: BTreeMap<u32, SqlInfoValue>,
}

impl Default for SqlInfoDataBuilder {
fn default() -> Self {
Self::new()
}
}

impl SqlInfoDataBuilder {
/// Create a new SQL info builder
pub fn new() -> Self {
Self {
infos: BTreeMap::new(),
}
Self::default()
}

/// register the specific sql metadata item
Expand Down
39 changes: 29 additions & 10 deletions arrow-flight/src/sql/metadata/xdbc_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,43 @@ use crate::sql::{CommandGetXdbcTypeInfo, Nullable, Searchable, XdbcDataType, Xdb
/// Data structure representing type information for xdbc types.
#[derive(Debug, Clone, Default)]
pub struct XdbcTypeInfo {
/// The name of the type
pub type_name: String,
/// The data type of the type
pub data_type: XdbcDataType,
/// The column size of the type
pub column_size: Option<i32>,
/// The prefix of the type
pub literal_prefix: Option<String>,
/// The suffix of the type
pub literal_suffix: Option<String>,
/// The create parameters of the type
pub create_params: Option<Vec<String>>,
/// The nullability of the type
pub nullable: Nullable,
/// Whether the type is case sensitive
pub case_sensitive: bool,
/// Whether the type is searchable
pub searchable: Searchable,
/// Whether the type is unsigned
pub unsigned_attribute: Option<bool>,
/// Whether the type has fixed precision and scale
pub fixed_prec_scale: bool,
/// Whether the type is auto-incrementing
pub auto_increment: Option<bool>,
/// The local type name of the type
pub local_type_name: Option<String>,
/// The minimum scale of the type
pub minimum_scale: Option<i32>,
/// The maximum scale of the type
pub maximum_scale: Option<i32>,
/// The SQL data type of the type
pub sql_data_type: XdbcDataType,
/// The optional datetime subcode of the type
pub datetime_subcode: Option<XdbcDatetimeSubcode>,
/// The number precision radix of the type
pub num_prec_radix: Option<i32>,
/// The interval precision of the type
pub interval_precision: Option<i32>,
}

Expand Down Expand Up @@ -93,16 +112,6 @@ impl XdbcTypeInfoData {
}
}

pub struct XdbcTypeInfoDataBuilder {
infos: Vec<XdbcTypeInfo>,
}

impl Default for XdbcTypeInfoDataBuilder {
fn default() -> Self {
Self::new()
}
}

/// A builder for [`XdbcTypeInfoData`] which is used to create [`CommandGetXdbcTypeInfo`] responses.
///
/// # Example
Expand Down Expand Up @@ -138,6 +147,16 @@ impl Default for XdbcTypeInfoDataBuilder {
/// // to access the underlying record batch
/// let batch = info_list.record_batch(None);
/// ```
pub struct XdbcTypeInfoDataBuilder {
infos: Vec<XdbcTypeInfo>,
}

impl Default for XdbcTypeInfoDataBuilder {
fn default() -> Self {
Self::new()
}
}

impl XdbcTypeInfoDataBuilder {
/// Create a new instance of [`XdbcTypeInfoDataBuilder`].
pub fn new() -> Self {
Expand Down
11 changes: 9 additions & 2 deletions arrow-flight/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ use bytes::Bytes;
use paste::paste;
use prost::Message;

#[allow(clippy::all)]
mod gen {
#![allow(clippy::all)]
#![allow(rustdoc::unportable_markdown)]
// Since this file is auto-generated, we suppress all warnings
#![allow(missing_docs)]
include!("arrow.flight.protocol.sql.rs");
}

Expand Down Expand Up @@ -163,7 +165,9 @@ macro_rules! prost_message_ext {
/// ```
#[derive(Clone, Debug, PartialEq)]
pub enum Command {
$($name($name),)*
$(
#[doc = concat!(stringify!($name), "variant")]
$name($name),)*

/// Any message that is not any FlightSQL command.
Unknown(Any),
Expand Down Expand Up @@ -297,10 +301,12 @@ pub struct Any {
}

impl Any {
/// Checks whether the message is of type `M`
pub fn is<M: ProstMessageExt>(&self) -> bool {
M::type_url() == self.type_url
}

/// Unpacks the contents of the message if it is of type `M`
pub fn unpack<M: ProstMessageExt>(&self) -> Result<Option<M>, ArrowError> {
if !self.is::<M>() {
return Ok(None);
Expand All @@ -310,6 +316,7 @@ impl Any {
Ok(Some(m))
}

/// Packs a message into an [`Any`] message
pub fn pack<M: ProstMessageExt>(message: &M) -> Result<Any, ArrowError> {
Ok(message.as_any())
}
Expand Down
7 changes: 5 additions & 2 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ pub fn batches_to_flight_data(
dictionaries.extend(encoded_dictionaries.into_iter().map(Into::into));
flight_data.push(encoded_batch.into());
}
let mut stream = vec![schema_flight_data];

let mut stream = Vec::with_capacity(1 + dictionaries.len() + flight_data.len());

stream.push(schema_flight_data);
stream.extend(dictionaries);
stream.extend(flight_data);
let flight_data: Vec<_> = stream.into_iter().collect();
let flight_data = stream;
Ok(flight_data)
}
Loading
Loading