Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 streaming read (alternative approach) #653

Merged
merged 20 commits into from
Nov 7, 2022
Merged

V2 streaming read (alternative approach) #653

merged 20 commits into from
Nov 7, 2022

Conversation

pjfanning
Copy link
Collaborator

@pjfanning pjfanning commented Oct 2, 2022

alternative to #651

@pjfanning pjfanning marked this pull request as draft October 2, 2022 20:13
@pjfanning pjfanning changed the title WIP: V2 streaming read (alternative approach) V2 streaming read (alternative approach) Oct 2, 2022
@pjfanning pjfanning self-assigned this Oct 2, 2022
@pjfanning pjfanning marked this pull request as ready for review October 2, 2022 20:45
@pjfanning
Copy link
Collaborator Author

@christianknoepfle this seems to be working now - have a look if you have time

@christianknoepfle
Copy link
Contributor

cool :) I think I can kill my PR ;) (the core idea of the ExcelPartitionReaderFromIterator has made it in your approach but with a cleaner interface, so I am happy ;) ).

The only thing that I didn't really liked is the fact that the CloseableIterator is not really an iterator. It is something that contains an iterator. So the name is a bit misleading (unfortunately I have no better idea for now). So finding another name or as an alternative one could implement it as an iterator
case class CloseableIterator[T] (val private iterator, ...) extends Iterator[T] ...
and just implement the interface forwarding the calls to the iterator. This would change all calls from it.iterator to it itself. Still you need to implement the ++ and maybe one or two other methods to make it work.

In any case: Thanks a lot for your effort. Hope the other folks are fine with your fixes and we see a new version soon (and then hopefully we could use spark excel it in our production code)

workbook match {
case _: StreamingWorkbook => CloseableIterator(rowIter, Seq(workbook))
case _ => {
workbook.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you close the workbook here and do not hand that to CloseableIterator()? Would be more consistent to me

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this workbook doesn't need to be kept open but I suppose it could be changed to work like the streaming workbook

val dfExcel = spark.read
.format("excel")
.option("path", "src/test/resources/v2readwritetest/large_excel/largefile-wide-single-sheet.xlsx")
.option("header", value = false)
Copy link
Contributor

@christianknoepfle christianknoepfle Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we test with inferSchema= true and header = true too to get greater coverage? See src/test/scala/com/crealytics/spark/v2/excel/MaxNumRowsSuite.scala in my PR as an idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you liek that, you will need the new xlsx too.. (has headers)

@pjfanning
Copy link
Collaborator Author

The only thing that I didn't really liked is the fact that the CloseableIterator is not really an iterator.

I agree that the name isn't great. I was thinking of renaming it to SheetData or something like and renaming the .iterator function to .rowIterator.

@pjfanning pjfanning mentioned this pull request Oct 3, 2022
@pjfanning
Copy link
Collaborator Author

@christianknoepfle I made the changes you asked for and copied your test changes. One issue is I get dataframe sizes that are 1 less than you - for each of the 3 sub tests. I haven't investigated yet whether my code strips a row that shouldn't be stripped or if your PR overcounts.

val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
paths.tail.foreach(path => {
val newRows = excelHelper.getSheetData(conf, path)
sheetData = SheetData(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of copied code here. I wonder at what point we DRY this and put the code that is shared for the different Spark versions into shared code.

val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
try {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. I had not seen that there is so much overlap in the code, but we should get rid of it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done some refactoring to reduce code duplication - probably more can be done

.option("path", "src/test/resources/v2readwritetest/large_excel/largefile-wide-single-sheet.xlsx")
.option("header", value = false)
// .option("dataAddress", "'Sheet1'!B7:M16")
.option("maxRowsInMemory", "200")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the other PR, the integration tests actually cover streaming reads:
https://github.com/crealytics/spark-excel/blob/main/src/test/scala/com/crealytics/spark/excel/IntegrationSuite.scala#L349
If there is anything missing there, I would prefer to extend them instead of adding new tests with large binary files.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been unable to get any of the existing tests to reproduce the issue - the datasets are too small

@nightscape
Copy link
Owner

Overall this Iterator stuff is dangerous territory. It's bitten me more than once, because at some point some code was reading from an Iterator from which another Iterator was derived...
It would be great to encapsulate that in such a way that such errors cannot occur. SheetData in principle could provide such isolation if the Iterator were not exposed, but it would still require some more logic to prevent double consumption.

@pjfanning
Copy link
Collaborator Author

Overall this Iterator stuff is dangerous territory. It's bitten me more than once, because at some point some code was reading from an Iterator from which another Iterator was derived... It would be great to encapsulate that in such a way that such errors cannot occur. SheetData in principle could provide such isolation if the Iterator were not exposed, but it would still require some more logic to prevent double consumption.

For me, test coverage is the main way to manage this. The code already relies on reading off headers and things like so I'm not sure how much can be done to control that the iterators are not consumed at the wrong time.

I covered some of the topics you asked me to look at but there is more I can do.

I haven't yet looked at a way to test the changes without the new file. I'll get back to that later.

@pjfanning pjfanning marked this pull request as draft October 3, 2022 21:57
@christianknoepfle
Copy link
Contributor

@christianknoepfle I made the changes you asked for and copied your test changes. One issue is I get dataframe sizes that are 1 less than you - for each of the 3 sub tests. I haven't investigated yet whether my code strips a row that shouldn't be stripped or if your PR overcounts.

you will need the new excel. I just added a header row to it. Download from here:

@pjfanning
Copy link
Collaborator Author

@christianknoepfle I made the changes you asked for and copied your test changes. One issue is I get dataframe sizes that are 1 less than you - for each of the 3 sub tests. I haven't investigated yet whether my code strips a row that shouldn't be stripped or if your PR overcounts.

you will need the new excel. I just added a header row to it. Download from here:

thanks - I've updated my PR to use your latest xlsx file

@pjfanning pjfanning marked this pull request as ready for review October 6, 2022 11:13
@nightscape
Copy link
Owner

Hey @pjfanning, I saw you picked up my changes regarding spark-testing-base to spark-fast-tests already 👍 👍

I created a branch which applies the integration tests to v2 as well.
It still needs some adaptations so that it doesn't assume that a single file gets written, but my hope is that it uncovers a few bugs (e.g. this one) that are still lurking in v2.

@christianknoepfle
Copy link
Contributor

Is there anything holding off this PR? I can say that it finally fixed one of our production issues and is key to get V2 stable. If you need help pls let me know

@nightscape
Copy link
Owner

If possible, I'd like to expose the bug without adding a huge .xlsx file.
#657 might already expose it, but it also has/exposes other issues which might cover this one.
So I would propose the following approach:

  1. @pjfanning removes the .xlsx file and the corresponding test from this PR.
  2. I squash-merge the PR.
  3. Whoever has some spare time tries to get test: Run integration tests against v2 as well #657 to work.
  4. If that PR does not expose the bug (one probably needs to temporarily break the fix from this PR to check) we add the test with the .xlsx.

WDYT?

@pjfanning
Copy link
Collaborator Author

I've removed the large xlsx and the tests that use it

@nightscape nightscape merged commit 4ceca4f into main Nov 7, 2022
@nightscape nightscape deleted the v2-streaming branch November 7, 2022 22:04
@nightscape
Copy link
Owner

@pjfanning thanks a lot for your efforts and continued support!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants