diff --git a/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala b/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala index 69a07ced..745a80d2 100644 --- a/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala +++ b/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala @@ -256,36 +256,16 @@ class ExcelDataSourceReader( sample = if (sample < 1) 1 else sample inputPaths.take(sample).map(_.getPath.toUri) } - var rows = excelHelper.getRows(conf, paths.head) - - if (rows.isEmpty) { /* If the first file is empty, not checking further */ - StructType(Seq.empty) - } else { - /* Prepare field names */ - val colNames = - if (options.header) { /* Get column name from the first row */ - val r = excelHelper.getColumnNames(rows.next) - rows = rows.drop(options.ignoreAfterHeader) - r - } else { /* Peek first row, then return back */ - val headerRow = rows.next - val r = excelHelper.getColumnNames(headerRow) - rows = Iterator(headerRow) ++ rows - r - } - - /* Other files also be utilized (lazily) for field types, reuse field name - from the first file */ - val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 - rows = paths.tail.foldLeft(rows) { case (rs, path) => - rs ++ excelHelper.getRows(conf, path).drop(numberOfRowToIgnore) + val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths) + try { + if (sheetData.rowIterator.isEmpty) { + StructType(Seq.empty) + } else { + /* Ready to infer schema */ + ExcelInferSchema(options).infer(sheetData.rowIterator, colNames) } - - /* Limit numer of rows to be used for schema infering */ - rows = options.excerptSize.foldLeft(rows)(_ take _) - - /* Ready to infer schema */ - ExcelInferSchema(options).infer(rows, colNames) + } finally { + sheetData.close() } } @@ -327,9 +307,9 @@ class ExcelInputPartitionReader( private val headerChecker = new ExcelHeaderChecker(dataSchema, options, source = s"Excel file: ${path}") private val excelHelper = ExcelHelper(options) - private val rows = excelHelper.getRows(new Configuration(), path) + private val sheetData = excelHelper.getSheetData(new Configuration(), path) - val reader = ExcelParser.parseIterator(rows, parser, headerChecker, dataSchema) + private val reader = ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, dataSchema) private val fullSchema = requiredSchema .map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) ++ @@ -349,7 +329,9 @@ class ExcelInputPartitionReader( override def next: Boolean = combinedReader.hasNext override def get: InternalRow = combinedReader.next - override def close(): Unit = {} + override def close(): Unit = { + sheetData.close() + } } class ExcelDataSourceWriter( diff --git a/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala b/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala index 9e054689..174691be 100644 --- a/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala +++ b/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala @@ -71,36 +71,17 @@ case class ExcelTable( sample = if (sample < 1) 1 else sample inputPaths.take(sample).map(_.getPath.toUri) } - var rows = excelHelper.getRows(conf, paths.head) - - if (rows.isEmpty) { /* If the first file is empty, not checking further */ - StructType(Seq.empty) - } else { - /* Prepare field names */ - val colNames = - if (options.header) { /* Get column name from the first row */ - val r = excelHelper.getColumnNames(rows.next) - rows = rows.drop(options.ignoreAfterHeader) - r - } else { /* Peek first row, then return back */ - val headerRow = rows.next - val r = excelHelper.getColumnNames(headerRow) - rows = Iterator(headerRow) ++ rows - r - } - - /* Other files also be utilized (lazily) for field types, reuse field name - from the first file */ - val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 - paths.tail.foreach(path => { - rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore) - }) - - /* Limit numer of rows to be used for schema infering */ - rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows - - /* Ready to infer schema */ - ExcelInferSchema(options).infer(rows, colNames) + val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths) + try { + if (sheetData.rowIterator.isEmpty) { + /* If the first file is empty, not checking further */ + StructType(Seq.empty) + } else { + /* Ready to infer schema */ + ExcelInferSchema(options).infer(sheetData.rowIterator, colNames) + } + } finally { + sheetData.close() } } } diff --git a/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala b/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala index 81a052a0..2acc3f42 100644 --- a/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala +++ b/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala @@ -74,36 +74,17 @@ case class ExcelTable( sample = if (sample < 1) 1 else sample inputPaths.take(sample).map(_.getPath.toUri) } - var rows = excelHelper.getRows(conf, paths.head) - - if (rows.isEmpty) { /* If the first file is empty, not checking further */ - StructType(Seq.empty) - } else { - /* Prepare field names */ - val colNames = - if (options.header) { /* Get column name from the first row */ - val r = excelHelper.getColumnNames(rows.next()) - rows = rows.drop(options.ignoreAfterHeader) - r - } else { /* Peek first row, then return back */ - val headerRow = rows.next() - val r = excelHelper.getColumnNames(headerRow) - rows = Iterator(headerRow) ++ rows - r - } - - /* Other files also be utilized (lazily) for field types, reuse field name - from the first file */ - val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 - paths.tail.foreach(path => { - rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore) - }) - - /* Limit numer of rows to be used for schema infering */ - rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows - - /* Ready to infer schema */ - ExcelInferSchema(options).infer(rows, colNames) + val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths) + try { + if (sheetData.rowIterator.isEmpty) { + /* If the first file is empty, not checking further */ + StructType(Seq.empty) + } else { + /* Ready to infer schema */ + ExcelInferSchema(options).infer(sheetData.rowIterator, colNames) + } + } finally { + sheetData.close() } } } diff --git a/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala b/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala index 6302ec89..bab98bce 100644 --- a/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala +++ b/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration import java.net.URI +import scala.util.control.NonFatal /** A factory used to create Excel readers. * @@ -64,8 +65,8 @@ case class ExcelPartitionReaderFactory( val headerChecker = new ExcelHeaderChecker(actualReadDataSchema, parsedOptions, source = s"Excel file: ${file.filePath}") val iter = readFile(conf, file, parser, headerChecker, readDataSchema) - val fileReader = new PartitionReaderFromIterator[InternalRow](iter) - new PartitionReaderWithPartitionValues(fileReader, readDataSchema, partitionSchema, file.partitionValues) + val partitionReader = new SparkExcelPartitionReaderFromIterator(iter) + new PartitionReaderWithPartitionValues(partitionReader, readDataSchema, partitionSchema, file.partitionValues) } private def readFile( @@ -74,10 +75,28 @@ case class ExcelPartitionReaderFactory( parser: ExcelParser, headerChecker: ExcelHeaderChecker, requiredSchema: StructType - ): Iterator[InternalRow] = { + ): SheetData[InternalRow] = { val excelHelper = ExcelHelper(parsedOptions) - val rows = excelHelper.getRows(conf, URI.create(file.filePath)) - ExcelParser.parseIterator(rows, parser, headerChecker, requiredSchema) + val sheetData = excelHelper.getSheetData(conf, URI.create(file.filePath)) + try { + SheetData( + ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, requiredSchema), + sheetData.resourcesToClose + ) + } catch { + case NonFatal(t) => { + sheetData.close() + throw t + } + } } } + +private class SparkExcelPartitionReaderFromIterator(sheetData: SheetData[InternalRow]) + extends PartitionReaderFromIterator[InternalRow](sheetData.rowIterator) { + override def close(): Unit = { + super.close() + sheetData.close() + } +} diff --git a/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala b/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala index 6cee0f76..f73d0f8d 100644 --- a/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala +++ b/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala @@ -32,6 +32,7 @@ import java.net.URI import java.text.{FieldPosition, Format, ParsePosition} import java.util.concurrent.atomic.AtomicBoolean import scala.util.Try +import scala.util.control.NonFatal /** A format that formats a double as a plain string without rounding and scientific notation. All other operations are * unsupported. @@ -130,13 +131,76 @@ class ExcelHelper private (options: ExcelOptions) { * @param uri * to the file, this can be on any support file system back end * @return - * cell-row iterator + * Sheet Data with row iterator (must be closed after use) */ - def getRows(conf: Configuration, uri: URI): Iterator[Vector[Cell]] = { + def getSheetData(conf: Configuration, uri: URI): SheetData[Vector[Cell]] = { val workbook = getWorkbook(conf, uri) val excelReader = DataLocator(options) - try { excelReader.readFrom(workbook) } - finally workbook.close() + try { + val rowIter = excelReader.readFrom(workbook) + SheetData(rowIter, Seq(workbook)) + } catch { + case NonFatal(t) => { + workbook.close() + throw t + } + } + } + + /** Get cell-row iterator for excel file in given URIs + * + * @param conf + * Hadoop configuration + * @param uris + * a seq of files, this can be on any support file system back end + * @return + * A tuple of Sheet Data with row iterator (must be closed after use) and Vector of column names + */ + def parseSheetData(conf: Configuration, uris: Seq[URI]): (SheetData[Vector[Cell]], Vector[String]) = { + var sheetData = getSheetData(conf, uris.head) + + val colNames = if (sheetData.rowIterator.isEmpty) { + Vector.empty + } else { + /* If the first file is empty, not checking further */ + try { + /* Prepare field names */ + val colNames = + if (options.header) { + /* Get column name from the first row */ + val r = getColumnNames(sheetData.rowIterator.next) + sheetData = sheetData.modifyIterator(_.drop(options.ignoreAfterHeader)) + r + } else { + /* Peek first row, then return back */ + val headerRow = sheetData.rowIterator.next + val r = getColumnNames(headerRow) + sheetData = sheetData.modifyIterator(iter => Iterator(headerRow) ++ iter) + r + } + + /* Other files also be utilized (lazily) for field types, reuse field name + from the first file */ + val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 + sheetData = uris.tail.foldLeft(sheetData) { case (rs, path) => + val newRows = getSheetData(conf, path).modifyIterator(_.drop(numberOfRowToIgnore)) + rs.append(newRows) + } + + /* Limit numer of rows to be used for schema infering */ + options.excerptSize.foreach { excerptSize => + sheetData = sheetData.modifyIterator(_.take(excerptSize)) + } + + colNames + } catch { + case NonFatal(t) => { + sheetData.close() + throw t + } + } + } + (sheetData, colNames) } /** Get column name by list of cells (row) diff --git a/src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala b/src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala new file mode 100644 index 00000000..ea22d427 --- /dev/null +++ b/src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 Martin Mauch (@nightscape) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.crealytics.spark.v2.excel + +import java.io.Closeable + +case class SheetData[T](rowIterator: Iterator[T], resourcesToClose: Seq[Closeable] = Seq.empty) extends Closeable { + def modifyIterator(f: Iterator[T] => Iterator[T]): SheetData[T] = SheetData(f(rowIterator), resourcesToClose) + def append(other: SheetData[T]): SheetData[T] = + SheetData(rowIterator ++ other.rowIterator, resourcesToClose ++ other.resourcesToClose) + override def close(): Unit = resourcesToClose.foreach(_.close()) +}