Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 streaming read (alternative approach) #653

Merged
merged 20 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,36 +256,16 @@ class ExcelDataSourceReader(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next)
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
rows = paths.tail.foldLeft(rows) { case (rs, path) =>
rs ++ excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}

/* Limit numer of rows to be used for schema infering */
rows = options.excerptSize.foldLeft(rows)(_ take _)

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
} finally {
sheetData.close()
}
}

Expand Down Expand Up @@ -327,9 +307,9 @@ class ExcelInputPartitionReader(
private val headerChecker =
new ExcelHeaderChecker(dataSchema, options, source = s"Excel file: ${path}")
private val excelHelper = ExcelHelper(options)
private val rows = excelHelper.getRows(new Configuration(), path)
private val sheetData = excelHelper.getSheetData(new Configuration(), path)

val reader = ExcelParser.parseIterator(rows, parser, headerChecker, dataSchema)
private val reader = ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, dataSchema)

private val fullSchema = requiredSchema
.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) ++
Expand All @@ -349,7 +329,9 @@ class ExcelInputPartitionReader(

override def next: Boolean = combinedReader.hasNext
override def get: InternalRow = combinedReader.next
override def close(): Unit = {}
override def close(): Unit = {
sheetData.close()
}
}

class ExcelDataSourceWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,36 +71,17 @@ case class ExcelTable(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next)
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
paths.tail.foreach(path => {
rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
})

/* Limit numer of rows to be used for schema infering */
rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
/* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}
} finally {
sheetData.close()
}
}
}
41 changes: 11 additions & 30 deletions src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,17 @@ case class ExcelTable(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next())
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next()
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
paths.tail.foreach(path => {
rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
})

/* Limit numer of rows to be used for schema infering */
rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
/* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}
} finally {
sheetData.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import java.net.URI
import scala.util.control.NonFatal

/** A factory used to create Excel readers.
*
Expand Down Expand Up @@ -64,8 +65,8 @@ case class ExcelPartitionReaderFactory(
val headerChecker =
new ExcelHeaderChecker(actualReadDataSchema, parsedOptions, source = s"Excel file: ${file.filePath}")
val iter = readFile(conf, file, parser, headerChecker, readDataSchema)
val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
new PartitionReaderWithPartitionValues(fileReader, readDataSchema, partitionSchema, file.partitionValues)
val partitionReader = new SparkExcelPartitionReaderFromIterator(iter)
new PartitionReaderWithPartitionValues(partitionReader, readDataSchema, partitionSchema, file.partitionValues)
}

private def readFile(
Expand All @@ -74,10 +75,28 @@ case class ExcelPartitionReaderFactory(
parser: ExcelParser,
headerChecker: ExcelHeaderChecker,
requiredSchema: StructType
): Iterator[InternalRow] = {
): SheetData[InternalRow] = {
val excelHelper = ExcelHelper(parsedOptions)
val rows = excelHelper.getRows(conf, URI.create(file.filePath))
ExcelParser.parseIterator(rows, parser, headerChecker, requiredSchema)
val sheetData = excelHelper.getSheetData(conf, URI.create(file.filePath))
try {
SheetData(
ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, requiredSchema),
sheetData.resourcesToClose
)
} catch {
case NonFatal(t) => {
sheetData.close()
throw t
}
}
}

}

private class SparkExcelPartitionReaderFromIterator(sheetData: SheetData[InternalRow])
extends PartitionReaderFromIterator[InternalRow](sheetData.rowIterator) {
override def close(): Unit = {
super.close()
sheetData.close()
}
}
72 changes: 68 additions & 4 deletions src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import java.net.URI
import java.text.{FieldPosition, Format, ParsePosition}
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.Try
import scala.util.control.NonFatal

/** A format that formats a double as a plain string without rounding and scientific notation. All other operations are
* unsupported.
Expand Down Expand Up @@ -130,13 +131,76 @@ class ExcelHelper private (options: ExcelOptions) {
* @param uri
* to the file, this can be on any support file system back end
* @return
* cell-row iterator
* Sheet Data with row iterator (must be closed after use)
*/
def getRows(conf: Configuration, uri: URI): Iterator[Vector[Cell]] = {
def getSheetData(conf: Configuration, uri: URI): SheetData[Vector[Cell]] = {
val workbook = getWorkbook(conf, uri)
val excelReader = DataLocator(options)
try { excelReader.readFrom(workbook) }
finally workbook.close()
try {
val rowIter = excelReader.readFrom(workbook)
SheetData(rowIter, Seq(workbook))
} catch {
case NonFatal(t) => {
workbook.close()
throw t
}
}
}

/** Get cell-row iterator for excel file in given URIs
*
* @param conf
* Hadoop configuration
* @param uris
* a seq of files, this can be on any support file system back end
* @return
* A tuple of Sheet Data with row iterator (must be closed after use) and Vector of column names
*/
def parseSheetData(conf: Configuration, uris: Seq[URI]): (SheetData[Vector[Cell]], Vector[String]) = {
var sheetData = getSheetData(conf, uris.head)

val colNames = if (sheetData.rowIterator.isEmpty) {
Vector.empty
} else {
/* If the first file is empty, not checking further */
try {
/* Prepare field names */
val colNames =
if (options.header) {
/* Get column name from the first row */
val r = getColumnNames(sheetData.rowIterator.next)
sheetData = sheetData.modifyIterator(_.drop(options.ignoreAfterHeader))
r
} else {
/* Peek first row, then return back */
val headerRow = sheetData.rowIterator.next
val r = getColumnNames(headerRow)
sheetData = sheetData.modifyIterator(iter => Iterator(headerRow) ++ iter)
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
sheetData = uris.tail.foldLeft(sheetData) { case (rs, path) =>
val newRows = getSheetData(conf, path).modifyIterator(_.drop(numberOfRowToIgnore))
rs.append(newRows)
}

/* Limit numer of rows to be used for schema infering */
options.excerptSize.foreach { excerptSize =>
sheetData = sheetData.modifyIterator(_.take(excerptSize))
}

colNames
} catch {
case NonFatal(t) => {
sheetData.close()
throw t
}
}
}
(sheetData, colNames)
}

/** Get column name by list of cells (row)
Expand Down
26 changes: 26 additions & 0 deletions src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Martin Mauch (@nightscape)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.crealytics.spark.v2.excel

import java.io.Closeable

case class SheetData[T](rowIterator: Iterator[T], resourcesToClose: Seq[Closeable] = Seq.empty) extends Closeable {
def modifyIterator(f: Iterator[T] => Iterator[T]): SheetData[T] = SheetData(f(rowIterator), resourcesToClose)
def append(other: SheetData[T]): SheetData[T] =
SheetData(rowIterator ++ other.rowIterator, resourcesToClose ++ other.resourcesToClose)
override def close(): Unit = resourcesToClose.foreach(_.close())
}
pjfanning marked this conversation as resolved.
Show resolved Hide resolved