Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing a string column containing JSON values into a typed array #6522

Open
scovich opened this issue Oct 7, 2024 · 3 comments
Open

Parsing a string column containing JSON values into a typed array #6522

scovich opened this issue Oct 7, 2024 · 3 comments
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers help wanted

Comments

@scovich
Copy link

scovich commented Oct 7, 2024

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I have a nullable StringArray column that contains JSON object literals.

I need to JSON parse the column into a StructArray of values following some schema, and NULL input values should become NULL output values.

This can almost be implemented using arrow_json::reader::ReaderBuilder::build_decoder and then feeding in the bytes of each string. But the decoder has no concept of record separators in the input stream. Thus, invalid inputs such as blank strings (""), or truncated records ("{\"a\":1"), or multiple objects ("{\"a\": 1} {\"a\": 2}") will confuse the decoding process. If we're lucky, it will produce the wrong number of records, but an adversarial input could easily seem to produce the correct number of records even tho no single input string represented a valid JSON object. Thus, if I want such safety, I'm forced to parse each string as its own RecordBatch (which can then be validated independently), and then concatenate them all. Ugly, error-prone, and inefficient:

pub fn parse_json(
    json_strings: StringArray, 
    schema: SchemaRef,
) -> Result<RecordBatch, ArrowError> {
    // Use batch size of 1 to force one record per string input 
    let mut decoder = ReaderBuilder::new(output_schema.clone())
        .with_batch_size(1)
        .build_decoder()?;

    // Feed a single string into the decoder and flush it to a record batch
    let mut parse_one = |json_string: Option<&str>| -> Result<RecordBatch, ArrowError> {
        // NOTE: null input becomes empty object (all fields null)
        let s = json_string.unwrap_or("{}");
        let mut reader = BufReader::new(s.as_bytes());
        let buf = reader.fill_buf()?;
        let read = buf.len();
        let decoded = decoder.decode(buf)?;
        assert_eq!(decoded, read);
        Ok(decoder.flush()?.unwrap())
    };
    let output: Vec<_> = json_strings
        .iter()
        .map(parse_one)
        .try_collect()?;
    concat_batches(&schema, output.iter())
}

(example code, has panics instead of full error handling)

Describe the solution you'd like

Ideally, the JSON Decoder could define public methods that say how many buffered rows the decoder has, and whether the decoder is currently at a record boundary or not. This is essentially a side effect-free version the same check that Tape::finish already performs when Decoder::flush is called:

impl TapeDecoder {
      ...
    /// The number of buffered rows this decoder has, including any in progress if [`has_partial_record()`].
    pub fn num_buffered_rows(&self) -> usize {
        self.cur_row
    }
    /// True if the decoder is part way through decoding a record. If so, calling [`finish`] would return an error.
    pub fn has_partial_row(&self) -> bool {
        !self.stack.is_empty()
    }

and

impl Decoder {
      ...
    /// The number records currently buffered in this decoder. 
    /// A successful call to [`flush`] would produce this many rows. 
    pub fn num_buffered_records(&self) -> usize {
        self.tape_decoder.num_buffered_rows()
    }
    /// True if the decoder is part way through decoding a record. 
    /// Calling [`flush`] on a partial record would return an error.
    pub fn has_partial_record(&self) -> bool {
        self.tape_decoder.has_partial_record()
    }

That way, the above implementation becomes a bit simpler and a lot more efficient:

pub fn parse_json(
    json_strings: StringArray, 
    schema: SchemaRef,
) -> Result<RecordBatch, ArrowError> {
    let mut decoder = ReaderBuilder::new(output_schema.clone())
        .with_batch_size(json_strings.len())
        .build_decoder()?;

    // Feed a single string into the decoder and flush it to a record batch
    let mut parse_one = |json_string| -> Result<(), ArrowError> {
        let mut reader = BufReader::new(json_string.as_bytes());
        let buf = reader.fill_buf()?;
        let read = buf.len();
        let decoded = decoder.decode(buf)?;
        assert_eq!(decoded, read);
        assert!(!decoder.has_partial_record());
    };
    
    // Make sure each string produces exactly one record.
    let mut num_records = 0;
    for json_string in json_strings {
        // NOTE: null input becomes empty object (all fields null)
        parse_one(json_string.unwrap_or("{}"))?;
        assert_eq!(decoder.num_buffered_records(), num_records+1);
        num_records += 1;
    }
    assert_eq!(num_records, json_strings.len());
    decoder.flush().transpose().unwrap()
}

It would be even nicer if the parse_json method could just become part of either arrow-json or arrow-compute, if parsing strings to JSON is deemed a general operation that deserves its own API call.

Describe alternatives you've considered

Tried shoving each string manually into a Decoder to produce a single RecordBatch, but the above-mentioned safety issues made it very brittle (wrong row counts, incorrect values, etc). Currently using the ugly/slow solution mentioned earlier, that creates and validates one RecordBatch per row, before concatenating them all into a single RecordBatch.

@scovich scovich added the enhancement Any new improvement worthy of a entry in the changelog label Oct 7, 2024
scovich added a commit to delta-io/delta-kernel-rs that referenced this issue Oct 7, 2024
It turned out the sync reader was not being exercised by basic read
tests. Enabling it exposed a broken json parsing algo that had already
been fixed in the default reader.

Factor out the json parsing to a shared function that both engines can
use.

While we're at it, factor out sync reader logic that both parquet and
json readers can use.

Update the basic read unit tests to use both readers.

Fixes #372

Relevant upstream feature request:
apache/arrow-rs#6522

---------

Co-authored-by: Nick Lanham <[email protected]>
@tustvold
Copy link
Contributor

Sorry this one managed to slip through, adding num_buffered_rows and has_partial_record seems perfectly reasonable to me

@jatin510
Copy link

take

@jatin510
Copy link

Hi,
@scovich @tustvold I am currently looking into this.

Do we need to duplicate the changes, which are implemented in https://github.com/delta-io/delta-kernel-rs/pull/373/files
in the arrow-rs, along with num_buffered_rows has_partial_record functions ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers help wanted
Projects
None yet
Development

No branches or pull requests

3 participants