Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dictionary encoding in structures for FlightDataEncoder, add documentation for arrow_flight::encode::Dictionary #5488

Merged
merged 7 commits into from
Mar 15, 2024

Conversation

thinkharderdev
Copy link
Contributor

…Handling

Which issue does this PR close?

Closes #5487

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Mar 9, 2024
@@ -388,29 +388,39 @@ impl Stream for FlightDataEncoder {
/// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how things work in arrow-rs and I'm just assuming this is how the flight protocol is supposed to work :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update this comment maybe to say that the FlightDataEncoder doesn't really handle sending the same dictionary multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't really handle as in will send the same dictionary over and over again for evert batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

/// The current implementation does not support "delta" dictionaries so a new dictionary batch will be sent each time the encoder sees a
/// dictionary which is not pointer-equal to the previously observed dictionary for a given `dict_id`.

to clarify that we do not send delta dictionaries but the current implementation will actually skip sending the dictionary if it is pointer-equal to existing tracked dictionary.

@alamb alamb changed the title Add more detailed documentation for arrow_flight::encode::Dicationary… Add more detailed documentation for arrow_flight::encode::Dictionary Mar 9, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @thinkharderdev -- I think there is still a misunderstanding somewhere as this new text doesn't match what I think the code does

///
/// See also:
/// * <https://github.com/apache/arrow-rs/issues/1206>
/// The arrow flight protocol supports delta dictionaries where the sender can send new dictionary values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches my understanding too

/// An IPC response, streaming or otherwise, defines its schema up front
/// which defines the mapping from dictionary IDs. It then sends these
/// dictionaries over the wire.
/// This method should be used if all batches over the entire lifetime fo the flight stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this description is accurate. The behavior is what is "supposed" to happen with Flight, but to the best of my knowledge, this behavior isn't implemented -- the gap is tracked in #3389 but I don't think it is yet implemented

I think what Hydrate actually does is to send arrays using the underlying type -- for example, DictionaryArray<Int32, Utf8> would be cast to StringArray and the StringArray would be sent over the network

I believe Resend actually resends a new dictionary for each DictionaryArray, without trying to figure out if it was the same as the previous dictionary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think you're right but the current implementation is actually broken then because it doesn't take into account nested dictionary fields:

fn prepare_schema_for_flight(schema: &Schema, send_dictionaries: bool) -> Schema {
    let fields: Fields = schema
        .fields()
        .iter()
        .map(|field| match field.data_type() {
            DataType::Dictionary(_, value_type) if !send_dictionaries => Field::new(
                field.name(),
                value_type.as_ref().clone(),
                field.is_nullable(),
            )
            .with_metadata(field.metadata().clone()),
            _ => field.as_ref().clone(),
        })
        .collect();

    Schema::new(fields).with_metadata(schema.metadata().clone())
}

Then if you have DictionaryHandling::Hydrate (the default) it will break on dictionary replacement of the nested field:

    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
        let dict_data = column.to_data();
        let dict_values = &dict_data.child_data()[0];

        // If a dictionary with this id was already emitted, check if it was the same.
        if let Some(last) = self.written.get(&dict_id) {
            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
                // Same dictionary values => no need to emit it again
                return Ok(false);
            }
            if self.error_on_replacement {
                // If error on replacement perform a logical comparison
                if last.child_data()[0] == *dict_values {
                    // Same dictionary values => no need to emit it again
                    return Ok(false);
                }
                return Err(ArrowError::InvalidArgumentError(
                    "Dictionary replacement detected when writing IPC file format. \
                     Arrow IPC files only support a single dictionary for a given field \
                     across all batches."
                        .to_string(),
                ));
            }
        }

        self.written.insert(dict_id, dict_data);
        Ok(true)
    }

Copy link
Contributor Author

@thinkharderdev thinkharderdev Mar 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yeah, I think this is why I was getting so confused. The original description of Hydrate was correct but it just doesn't work correctly for nested dictionary arrays. So I guess we can

  1. Fix the issue with nested fields
  2. Revert back to original description

But I don't quite understand why Hydrate is even an option? It seems strictly worse than Resend? If you wanted to hydrate a dictionary array it seems like you would do that prior to sending over the flight stream instead of doing it on the fly in the protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 It seems strictly worse than Resend?

If you have a sparse dictionary, as you might if you've applied a selective predicate, sending a dictionary for each batch where most of the values are not used could conceivably be worse.

That being said this is also historical, as resend was added later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it was mostly historical

The last time I checked (maybe 2 years ago) at least one client (maybe the golang one) didn't handle dictionaries (so if the rust server sent DictionaryArray the client couldn't deal with it) which was important for us as well

All in all, sorting out how to handle Dictionaries better / more rationally in arrow-flight would be a very nice improvement I think (aka working on #3389)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb @tustvold Ok so I fixed handling of nested dictionary fields so Hydrate works in those cases now and restored the documentation back to more or less what it originally said (with some additional comments to clarify things a bit

@github-actions github-actions bot added the parquet Changes to the parquet crate label Mar 12, 2024
@github-actions github-actions bot removed the parquet Changes to the parquet crate label Mar 12, 2024
@alamb alamb changed the title Add more detailed documentation for arrow_flight::encode::Dictionary Support dictionary encoding in structures for FlightDataEncoder, add documentation for arrow_flight::encode::Dictionary Mar 13, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @thinkharderdev -- I think this is an improvement over what is on main. I left some small suggestions as well but all in all thank you

send_dictionaries: bool,
) -> Result<ArrayRef> {
let arr = match (array.data_type(), data_type) {
(DataType::Dictionary(_, value), _) if !send_dictionaries => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code needed? It is strange to special case dictionary here, but just cast to data_type below. Maybe it is covered already by the (tpe, data_type) case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, yeah I think you're right. We would have already changed the data type in the schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this and also renamed this method to hydrate_dictionaries since we only need to call when we have DictionaryHandling::Hydrate. In the case of Resend I just skip this entirely to avoid the pointless clones


#[tokio::test]
async fn test_dictionary_struct_hydration() {
let struct_fields = vec![Field::new_list(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The amount of code required to build these nested arrays is quite sad (nothing wrong with this PR, I am just commenting in general)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it is painful :)

@@ -388,29 +388,39 @@ impl Stream for FlightDataEncoder {
/// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update this comment maybe to say that the FlightDataEncoder doesn't really handle sending the same dictionary multiple times?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 very nice

@tustvold tustvold merged commit 14bd53d into apache:master Mar 15, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve documentation around handling of dictionary arrays in arrow flight
3 participants