Skip to content

Commit

Permalink
feat: V2 streaming read (#653)
Browse files Browse the repository at this point in the history
* v1 streaming read test

* Update MaxNumRowsSuite.scala

* Update MaxNumRowsSuite.scala

* Update MaxRowsReadSuite.scala

* fix issue with v2 data source when streaming excel

* wip

* fix spark 2.4 build

* Update ExcelTable.scala

* fix for TODO issue

* review items

* Update MaxRowsReadSuite.scala

* Update src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala

Co-authored-by: Martin Mauch <[email protected]>

* some review comments

* try to centralise some code

* Update ExcelTable.scala

* continue refactor

* license

* update tests

* compile issues

* delete large file test

Co-authored-by: Martin Mauch <[email protected]>
  • Loading branch information
pjfanning and nightscape authored Nov 7, 2022
1 parent 7c96184 commit 4ceca4f
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,36 +256,16 @@ class ExcelDataSourceReader(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next)
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
rows = paths.tail.foldLeft(rows) { case (rs, path) =>
rs ++ excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}

/* Limit numer of rows to be used for schema infering */
rows = options.excerptSize.foldLeft(rows)(_ take _)

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
} finally {
sheetData.close()
}
}

Expand Down Expand Up @@ -327,9 +307,9 @@ class ExcelInputPartitionReader(
private val headerChecker =
new ExcelHeaderChecker(dataSchema, options, source = s"Excel file: ${path}")
private val excelHelper = ExcelHelper(options)
private val rows = excelHelper.getRows(new Configuration(), path)
private val sheetData = excelHelper.getSheetData(new Configuration(), path)

val reader = ExcelParser.parseIterator(rows, parser, headerChecker, dataSchema)
private val reader = ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, dataSchema)

private val fullSchema = requiredSchema
.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) ++
Expand All @@ -349,7 +329,9 @@ class ExcelInputPartitionReader(

override def next: Boolean = combinedReader.hasNext
override def get: InternalRow = combinedReader.next
override def close(): Unit = {}
override def close(): Unit = {
sheetData.close()
}
}

class ExcelDataSourceWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,36 +71,17 @@ case class ExcelTable(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next)
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
paths.tail.foreach(path => {
rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
})

/* Limit numer of rows to be used for schema infering */
rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
/* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}
} finally {
sheetData.close()
}
}
}
41 changes: 11 additions & 30 deletions src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,17 @@ case class ExcelTable(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next())
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next()
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
paths.tail.foreach(path => {
rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
})

/* Limit numer of rows to be used for schema infering */
rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
/* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}
} finally {
sheetData.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import java.net.URI
import scala.util.control.NonFatal

/** A factory used to create Excel readers.
*
Expand Down Expand Up @@ -64,8 +65,8 @@ case class ExcelPartitionReaderFactory(
val headerChecker =
new ExcelHeaderChecker(actualReadDataSchema, parsedOptions, source = s"Excel file: ${file.filePath}")
val iter = readFile(conf, file, parser, headerChecker, readDataSchema)
val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
new PartitionReaderWithPartitionValues(fileReader, readDataSchema, partitionSchema, file.partitionValues)
val partitionReader = new SparkExcelPartitionReaderFromIterator(iter)
new PartitionReaderWithPartitionValues(partitionReader, readDataSchema, partitionSchema, file.partitionValues)
}

private def readFile(
Expand All @@ -74,10 +75,28 @@ case class ExcelPartitionReaderFactory(
parser: ExcelParser,
headerChecker: ExcelHeaderChecker,
requiredSchema: StructType
): Iterator[InternalRow] = {
): SheetData[InternalRow] = {
val excelHelper = ExcelHelper(parsedOptions)
val rows = excelHelper.getRows(conf, URI.create(file.filePath))
ExcelParser.parseIterator(rows, parser, headerChecker, requiredSchema)
val sheetData = excelHelper.getSheetData(conf, URI.create(file.filePath))
try {
SheetData(
ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, requiredSchema),
sheetData.resourcesToClose
)
} catch {
case NonFatal(t) => {
sheetData.close()
throw t
}
}
}

}

private class SparkExcelPartitionReaderFromIterator(sheetData: SheetData[InternalRow])
extends PartitionReaderFromIterator[InternalRow](sheetData.rowIterator) {
override def close(): Unit = {
super.close()
sheetData.close()
}
}
72 changes: 68 additions & 4 deletions src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import java.net.URI
import java.text.{FieldPosition, Format, ParsePosition}
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.Try
import scala.util.control.NonFatal

/** A format that formats a double as a plain string without rounding and scientific notation. All other operations are
* unsupported.
Expand Down Expand Up @@ -130,13 +131,76 @@ class ExcelHelper private (options: ExcelOptions) {
* @param uri
* to the file, this can be on any support file system back end
* @return
* cell-row iterator
* Sheet Data with row iterator (must be closed after use)
*/
def getRows(conf: Configuration, uri: URI): Iterator[Vector[Cell]] = {
def getSheetData(conf: Configuration, uri: URI): SheetData[Vector[Cell]] = {
val workbook = getWorkbook(conf, uri)
val excelReader = DataLocator(options)
try { excelReader.readFrom(workbook) }
finally workbook.close()
try {
val rowIter = excelReader.readFrom(workbook)
SheetData(rowIter, Seq(workbook))
} catch {
case NonFatal(t) => {
workbook.close()
throw t
}
}
}

/** Get cell-row iterator for excel file in given URIs
*
* @param conf
* Hadoop configuration
* @param uris
* a seq of files, this can be on any support file system back end
* @return
* A tuple of Sheet Data with row iterator (must be closed after use) and Vector of column names
*/
def parseSheetData(conf: Configuration, uris: Seq[URI]): (SheetData[Vector[Cell]], Vector[String]) = {
var sheetData = getSheetData(conf, uris.head)

val colNames = if (sheetData.rowIterator.isEmpty) {
Vector.empty
} else {
/* If the first file is empty, not checking further */
try {
/* Prepare field names */
val colNames =
if (options.header) {
/* Get column name from the first row */
val r = getColumnNames(sheetData.rowIterator.next)
sheetData = sheetData.modifyIterator(_.drop(options.ignoreAfterHeader))
r
} else {
/* Peek first row, then return back */
val headerRow = sheetData.rowIterator.next
val r = getColumnNames(headerRow)
sheetData = sheetData.modifyIterator(iter => Iterator(headerRow) ++ iter)
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
sheetData = uris.tail.foldLeft(sheetData) { case (rs, path) =>
val newRows = getSheetData(conf, path).modifyIterator(_.drop(numberOfRowToIgnore))
rs.append(newRows)
}

/* Limit numer of rows to be used for schema infering */
options.excerptSize.foreach { excerptSize =>
sheetData = sheetData.modifyIterator(_.take(excerptSize))
}

colNames
} catch {
case NonFatal(t) => {
sheetData.close()
throw t
}
}
}
(sheetData, colNames)
}

/** Get column name by list of cells (row)
Expand Down
26 changes: 26 additions & 0 deletions src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Martin Mauch (@nightscape)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.crealytics.spark.v2.excel

import java.io.Closeable

case class SheetData[T](rowIterator: Iterator[T], resourcesToClose: Seq[Closeable] = Seq.empty) extends Closeable {
def modifyIterator(f: Iterator[T] => Iterator[T]): SheetData[T] = SheetData(f(rowIterator), resourcesToClose)
def append(other: SheetData[T]): SheetData[T] =
SheetData(rowIterator ++ other.rowIterator, resourcesToClose ++ other.resourcesToClose)
override def close(): Unit = resourcesToClose.foreach(_.close())
}

0 comments on commit 4ceca4f

Please sign in to comment.