From 557c253b050a982d1a5c62caedefd300b2b6af1d Mon Sep 17 00:00:00 2001 From: William Phillips <67801903+williamdphillips@users.noreply.github.com> Date: Mon, 17 Oct 2022 16:39:26 -0500 Subject: [PATCH 01/10] Java-based Constructor for WorkbookReader Add additional constructor for WorkbookReader which takes java hashmap as parameter for easy instantiation from python --- .../scala/com/crealytics/spark/excel/WorkbookReader.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala b/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala index c9e30f34..275ea2c4 100644 --- a/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala +++ b/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala @@ -26,6 +26,8 @@ import org.apache.poi.hssf.usermodel.HSSFWorkbookFactory import org.apache.poi.openxml4j.util.ZipInputStreamZipEntrySource import org.apache.poi.util.IOUtils import org.apache.poi.xssf.usermodel.XSSFWorkbookFactory +import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter trait WorkbookReader { protected def openWorkbook(): Workbook @@ -53,6 +55,10 @@ object WorkbookReader { WorkbookFactory.addProvider(new HSSFWorkbookFactory) WorkbookFactory.addProvider(new XSSFWorkbookFactory) + + def apply(parameters: util.HashMap[String, String], hadoopConfiguration: Configuration): WorkbookReader = { + apply(parameters.asScala.toMap, hadoopConfiguration) + } def apply(parameters: Map[String, String], hadoopConfiguration: Configuration): WorkbookReader = { def readFromHadoop(location: String) = { From 019f639e0cbb0f5457257d353ac151b6001230b1 Mon Sep 17 00:00:00 2001 From: William Phillips <67801903+williamdphillips@users.noreply.github.com> Date: Mon, 17 Oct 2022 16:49:34 -0500 Subject: [PATCH 02/10] Remove java import ant add directly to constructor --- src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala b/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala index 275ea2c4..b7d34020 100644 --- a/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala +++ b/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala @@ -26,7 +26,6 @@ import org.apache.poi.hssf.usermodel.HSSFWorkbookFactory import org.apache.poi.openxml4j.util.ZipInputStreamZipEntrySource import org.apache.poi.util.IOUtils import org.apache.poi.xssf.usermodel.XSSFWorkbookFactory -import java.util import scala.collection.JavaConverters.mapAsScalaMapConverter trait WorkbookReader { @@ -56,7 +55,7 @@ object WorkbookReader { WorkbookFactory.addProvider(new HSSFWorkbookFactory) WorkbookFactory.addProvider(new XSSFWorkbookFactory) - def apply(parameters: util.HashMap[String, String], hadoopConfiguration: Configuration): WorkbookReader = { + def apply(parameters: java.util.HashMap[String, String], hadoopConfiguration: Configuration): WorkbookReader = { apply(parameters.asScala.toMap, hadoopConfiguration) } From b86ce1267c4831e64766f7172a63d136457a2a42 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Wed, 2 Nov 2022 16:30:01 +0100 Subject: [PATCH 03/10] chore: Update scalafmt-core from 3.5.9 to 3.6.1 (#678) * chore: Update scalafmt-core from 3.5.9 to 3.6.1 * Reformat with scalafmt 3.6.1 Executed command: scalafmt --non-interactive * Add 'Reformat with scalafmt 3.6.1' to .git-blame-ignore-revs --- .git-blame-ignore-revs | 2 ++ .scalafmt.conf | 2 +- build.sc | 7 ++----- .../scala/com/crealytics/spark/excel/WorkbookReader.scala | 2 +- .../spark/v2/excel/DataFrameWriterApiComplianceSuite.scala | 3 ++- 5 files changed, 8 insertions(+), 8 deletions(-) create mode 100644 .git-blame-ignore-revs diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 00000000..5ed8b217 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# Scala Steward: Reformat with scalafmt 3.6.1 +a834cf94453ed2f3ab1b87818c2fd124fe87fa2a diff --git a/.scalafmt.conf b/.scalafmt.conf index 9b645ca2..3060f93f 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 3.5.9 +version = 3.6.1 style = default runner.dialect=scala212 maxColumn = 120 diff --git a/build.sc b/build.sc index 905c819b..470349b8 100644 --- a/build.sc +++ b/build.sc @@ -48,11 +48,9 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule val sparkDeps = Agg( ivy"org.apache.spark::spark-core:$sparkVersion", ivy"org.apache.spark::spark-sql:$sparkVersion", - ivy"org.apache.spark::spark-hive:$sparkVersion", - ) - override def compileIvyDeps = sparkDeps ++ Agg( - ivy"org.slf4j:slf4j-api:1.7.36".excludeOrg("stax") + ivy"org.apache.spark::spark-hive:$sparkVersion" ) + override def compileIvyDeps = sparkDeps ++ Agg(ivy"org.slf4j:slf4j-api:1.7.36".excludeOrg("stax")) val poiVersion = "5.2.3" override def ivyDeps = Agg( ivy"org.apache.poi:poi:$poiVersion", @@ -93,7 +91,6 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule } } - val scala213 = "2.13.10" val scala212 = "2.12.17" val spark24 = List("2.4.1", "2.4.7", "2.4.8") diff --git a/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala b/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala index b7d34020..24b6b917 100644 --- a/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala +++ b/src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala @@ -54,7 +54,7 @@ object WorkbookReader { WorkbookFactory.addProvider(new HSSFWorkbookFactory) WorkbookFactory.addProvider(new XSSFWorkbookFactory) - + def apply(parameters: java.util.HashMap[String, String], hadoopConfiguration: Configuration): WorkbookReader = { apply(parameters.asScala.toMap, hadoopConfiguration) } diff --git a/src/test/scala/com/crealytics/spark/v2/excel/DataFrameWriterApiComplianceSuite.scala b/src/test/scala/com/crealytics/spark/v2/excel/DataFrameWriterApiComplianceSuite.scala index 7139ff1e..dc9bb46a 100644 --- a/src/test/scala/com/crealytics/spark/v2/excel/DataFrameWriterApiComplianceSuite.scala +++ b/src/test/scala/com/crealytics/spark/v2/excel/DataFrameWriterApiComplianceSuite.scala @@ -125,7 +125,8 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB .save() val orderedSchemaColumns = dfCsv.schema.fields.map(f => f.name).sorted - val expectedDf = dfCsv.union(dfCsv).select(orderedSchemaColumns.head, orderedSchemaColumns.tail.toIndexedSeq: _*) + val expectedDf = + dfCsv.union(dfCsv).select(orderedSchemaColumns.head, orderedSchemaColumns.tail.toIndexedSeq: _*) assertWrittenExcelData(expectedDf, targetDir) } From d09d23294ecfcb9894a8e3d7c584c011f9c042e8 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Sat, 5 Nov 2022 02:26:26 +0100 Subject: [PATCH 04/10] chore: Update scalatest from 3.2.13 to 3.2.14 (#661) * chore: Update scalatest from 3.2.13 to 3.2.14 * Revert commit(s) cca7d0e * chore: Update scalatest from 3.2.13 to 3.2.14 * Revert commit(s) 4eab48b * chore: Update scalatest from 3.2.13 to 3.2.14 --- build.sbt | 2 +- build.sc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index c96745fc..a3b2e94d 100644 --- a/build.sbt +++ b/build.sbt @@ -83,7 +83,7 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-hive" % testSparkVersion.value % "provided", "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1", "org.typelevel" %% "cats-core" % "2.8.0" % Test, - "org.scalatest" %% "scalatest" % "3.2.13" % Test, + "org.scalatest" %% "scalatest" % "3.2.14" % Test, "org.scalatestplus" %% "scalacheck-1-15" % "3.2.11.0" % Test, "org.scalacheck" %% "scalacheck" % "1.17.0" % Test, "com.github.alexarchambault" %% "scalacheck-shapeless_1.15" % "1.3.0" % Test, diff --git a/build.sc b/build.sc index 470349b8..6fd4c94d 100644 --- a/build.sc +++ b/build.sc @@ -81,7 +81,7 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule def repositoriesTask = T.task { super.repositoriesTask() ++ Seq(MavenRepository("https://jitpack.io")) } def ivyDeps = sparkDeps ++ Agg( ivy"org.typelevel::cats-core:2.8.0", - ivy"org.scalatest::scalatest:3.2.13", + ivy"org.scalatest::scalatest:3.2.14", ivy"org.scalatestplus::scalacheck-1-15:3.2.11.0", ivy"org.scalacheck::scalacheck:1.17.0", ivy"com.github.alexarchambault::scalacheck-shapeless_1.15:1.3.0", From 2344853b1f7e14e29118a66985c546f547dddc0e Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Sat, 5 Nov 2022 02:27:06 +0100 Subject: [PATCH 05/10] chore: Update poi-shared-strings from 2.5.4 to 2.5.5 (#659) * chore: Update poi-shared-strings from 2.5.4 to 2.5.5 * Revert commit(s) 347220b * chore: Update poi-shared-strings from 2.5.4 to 2.5.5 * Revert commit(s) 5b4e61b * chore: Update poi-shared-strings from 2.5.4 to 2.5.5 --- build.sbt | 2 +- build.sc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index a3b2e94d..7bae2010 100644 --- a/build.sbt +++ b/build.sbt @@ -59,7 +59,7 @@ shadedDeps ++= Seq( "org.apache.xmlbeans" % "xmlbeans" % "5.1.1", "com.norbitltd" %% "spoiwo" % "2.2.1", "com.github.pjfanning" % "excel-streaming-reader" % "4.0.2", - "com.github.pjfanning" % "poi-shared-strings" % "2.5.4", + "com.github.pjfanning" % "poi-shared-strings" % "2.5.5", "commons-io" % "commons-io" % "2.11.0", "org.apache.commons" % "commons-compress" % "1.21", "org.apache.logging.log4j" % "log4j-api" % "2.19.0", diff --git a/build.sc b/build.sc index 6fd4c94d..89c1bdb3 100644 --- a/build.sc +++ b/build.sc @@ -59,7 +59,7 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule ivy"org.apache.xmlbeans:xmlbeans:5.1.1", ivy"com.norbitltd::spoiwo:2.2.1", ivy"com.github.pjfanning:excel-streaming-reader:4.0.2", - ivy"com.github.pjfanning:poi-shared-strings:2.5.4", + ivy"com.github.pjfanning:poi-shared-strings:2.5.5", ivy"commons-io:commons-io:2.11.0", ivy"org.apache.commons:commons-compress:1.21", ivy"org.apache.logging.log4j:log4j-api:2.19.0", From 1cd676e6a14fb7e6ca0c9d0f7534526e4f7b57ae Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Sat, 5 Nov 2022 02:28:33 +0100 Subject: [PATCH 06/10] chore: Update sbt-assembly from 1.2.0 to 2.0.0 (#665) * chore: Update sbt-assembly from 1.2.0 to 2.0.0 * Revert commit(s) 919788e * chore: Update sbt-assembly from 1.2.0 to 2.0.0 --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index d2852465..c6aa8ff2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,6 +3,6 @@ addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.4.14") addSbtPlugin( "org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0" excludeAll (ExclusionRule(organization = "com.danieltrinh")) ) -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2") From ed97118a1ba7df720c18971d5c2e9635d0a8e0bb Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Mon, 7 Nov 2022 10:24:25 +0100 Subject: [PATCH 07/10] chore: Update excel-streaming-reader from 4.0.2 to 4.0.4 (#670) --- build.sbt | 2 +- build.sc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 7bae2010..d6e36c33 100644 --- a/build.sbt +++ b/build.sbt @@ -58,7 +58,7 @@ shadedDeps ++= Seq( "org.apache.poi" % "poi-ooxml-lite" % poiVersion, "org.apache.xmlbeans" % "xmlbeans" % "5.1.1", "com.norbitltd" %% "spoiwo" % "2.2.1", - "com.github.pjfanning" % "excel-streaming-reader" % "4.0.2", + "com.github.pjfanning" % "excel-streaming-reader" % "4.0.4", "com.github.pjfanning" % "poi-shared-strings" % "2.5.5", "commons-io" % "commons-io" % "2.11.0", "org.apache.commons" % "commons-compress" % "1.21", diff --git a/build.sc b/build.sc index 89c1bdb3..9218e110 100644 --- a/build.sc +++ b/build.sc @@ -58,7 +58,7 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule ivy"org.apache.poi:poi-ooxml-lite:$poiVersion", ivy"org.apache.xmlbeans:xmlbeans:5.1.1", ivy"com.norbitltd::spoiwo:2.2.1", - ivy"com.github.pjfanning:excel-streaming-reader:4.0.2", + ivy"com.github.pjfanning:excel-streaming-reader:4.0.4", ivy"com.github.pjfanning:poi-shared-strings:2.5.5", ivy"commons-io:commons-io:2.11.0", ivy"org.apache.commons:commons-compress:1.21", From 7c96184398cf4e190208a61c187e06353d739f2d Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Mon, 7 Nov 2022 10:25:11 +0100 Subject: [PATCH 08/10] chore: Update commons-compress from 1.21 to 1.22 (#676) * chore: Update commons-compress from 1.21 to 1.22 * Revert commit(s) ca4e486 * chore: Update commons-compress from 1.21 to 1.22 Co-authored-by: Martin Mauch --- build.sbt | 2 +- build.sc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index d6e36c33..613b58f7 100644 --- a/build.sbt +++ b/build.sbt @@ -61,7 +61,7 @@ shadedDeps ++= Seq( "com.github.pjfanning" % "excel-streaming-reader" % "4.0.4", "com.github.pjfanning" % "poi-shared-strings" % "2.5.5", "commons-io" % "commons-io" % "2.11.0", - "org.apache.commons" % "commons-compress" % "1.21", + "org.apache.commons" % "commons-compress" % "1.22", "org.apache.logging.log4j" % "log4j-api" % "2.19.0", "com.zaxxer" % "SparseBitSet" % "1.2", "org.apache.commons" % "commons-collections4" % "4.4", diff --git a/build.sc b/build.sc index 9218e110..367908f3 100644 --- a/build.sc +++ b/build.sc @@ -61,7 +61,7 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule ivy"com.github.pjfanning:excel-streaming-reader:4.0.4", ivy"com.github.pjfanning:poi-shared-strings:2.5.5", ivy"commons-io:commons-io:2.11.0", - ivy"org.apache.commons:commons-compress:1.21", + ivy"org.apache.commons:commons-compress:1.22", ivy"org.apache.logging.log4j:log4j-api:2.19.0", ivy"com.zaxxer:SparseBitSet:1.2", ivy"org.apache.commons:commons-collections4:4.4", From 4ceca4f18434652a9ecaab076ea381ca927588d6 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 7 Nov 2022 23:04:14 +0100 Subject: [PATCH 09/10] feat: V2 streaming read (#653) * v1 streaming read test * Update MaxNumRowsSuite.scala * Update MaxNumRowsSuite.scala * Update MaxRowsReadSuite.scala * fix issue with v2 data source when streaming excel * wip * fix spark 2.4 build * Update ExcelTable.scala * fix for TODO issue * review items * Update MaxRowsReadSuite.scala * Update src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala Co-authored-by: Martin Mauch * some review comments * try to centralise some code * Update ExcelTable.scala * continue refactor * license * update tests * compile issues * delete large file test Co-authored-by: Martin Mauch --- .../spark/v2/excel/ExcelDataSource.scala | 46 ++++-------- .../spark/v2/excel/ExcelTable.scala | 41 +++-------- .../spark/v2/excel/ExcelTable.scala | 41 +++-------- .../excel/ExcelPartitionReaderFactory.scala | 29 ++++++-- .../spark/v2/excel/ExcelHelper.scala | 72 +++++++++++++++++-- .../crealytics/spark/v2/excel/SheetData.scala | 26 +++++++ 6 files changed, 154 insertions(+), 101 deletions(-) create mode 100644 src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala diff --git a/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala b/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala index 69a07ced..745a80d2 100644 --- a/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala +++ b/src/main/2.4/scala/com/crealytics/spark/v2/excel/ExcelDataSource.scala @@ -256,36 +256,16 @@ class ExcelDataSourceReader( sample = if (sample < 1) 1 else sample inputPaths.take(sample).map(_.getPath.toUri) } - var rows = excelHelper.getRows(conf, paths.head) - - if (rows.isEmpty) { /* If the first file is empty, not checking further */ - StructType(Seq.empty) - } else { - /* Prepare field names */ - val colNames = - if (options.header) { /* Get column name from the first row */ - val r = excelHelper.getColumnNames(rows.next) - rows = rows.drop(options.ignoreAfterHeader) - r - } else { /* Peek first row, then return back */ - val headerRow = rows.next - val r = excelHelper.getColumnNames(headerRow) - rows = Iterator(headerRow) ++ rows - r - } - - /* Other files also be utilized (lazily) for field types, reuse field name - from the first file */ - val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 - rows = paths.tail.foldLeft(rows) { case (rs, path) => - rs ++ excelHelper.getRows(conf, path).drop(numberOfRowToIgnore) + val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths) + try { + if (sheetData.rowIterator.isEmpty) { + StructType(Seq.empty) + } else { + /* Ready to infer schema */ + ExcelInferSchema(options).infer(sheetData.rowIterator, colNames) } - - /* Limit numer of rows to be used for schema infering */ - rows = options.excerptSize.foldLeft(rows)(_ take _) - - /* Ready to infer schema */ - ExcelInferSchema(options).infer(rows, colNames) + } finally { + sheetData.close() } } @@ -327,9 +307,9 @@ class ExcelInputPartitionReader( private val headerChecker = new ExcelHeaderChecker(dataSchema, options, source = s"Excel file: ${path}") private val excelHelper = ExcelHelper(options) - private val rows = excelHelper.getRows(new Configuration(), path) + private val sheetData = excelHelper.getSheetData(new Configuration(), path) - val reader = ExcelParser.parseIterator(rows, parser, headerChecker, dataSchema) + private val reader = ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, dataSchema) private val fullSchema = requiredSchema .map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) ++ @@ -349,7 +329,9 @@ class ExcelInputPartitionReader( override def next: Boolean = combinedReader.hasNext override def get: InternalRow = combinedReader.next - override def close(): Unit = {} + override def close(): Unit = { + sheetData.close() + } } class ExcelDataSourceWriter( diff --git a/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala b/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala index 9e054689..174691be 100644 --- a/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala +++ b/src/main/3.0_3.1/scala/com/crealytics/spark/v2/excel/ExcelTable.scala @@ -71,36 +71,17 @@ case class ExcelTable( sample = if (sample < 1) 1 else sample inputPaths.take(sample).map(_.getPath.toUri) } - var rows = excelHelper.getRows(conf, paths.head) - - if (rows.isEmpty) { /* If the first file is empty, not checking further */ - StructType(Seq.empty) - } else { - /* Prepare field names */ - val colNames = - if (options.header) { /* Get column name from the first row */ - val r = excelHelper.getColumnNames(rows.next) - rows = rows.drop(options.ignoreAfterHeader) - r - } else { /* Peek first row, then return back */ - val headerRow = rows.next - val r = excelHelper.getColumnNames(headerRow) - rows = Iterator(headerRow) ++ rows - r - } - - /* Other files also be utilized (lazily) for field types, reuse field name - from the first file */ - val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 - paths.tail.foreach(path => { - rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore) - }) - - /* Limit numer of rows to be used for schema infering */ - rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows - - /* Ready to infer schema */ - ExcelInferSchema(options).infer(rows, colNames) + val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths) + try { + if (sheetData.rowIterator.isEmpty) { + /* If the first file is empty, not checking further */ + StructType(Seq.empty) + } else { + /* Ready to infer schema */ + ExcelInferSchema(options).infer(sheetData.rowIterator, colNames) + } + } finally { + sheetData.close() } } } diff --git a/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala b/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala index 81a052a0..2acc3f42 100644 --- a/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala +++ b/src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala @@ -74,36 +74,17 @@ case class ExcelTable( sample = if (sample < 1) 1 else sample inputPaths.take(sample).map(_.getPath.toUri) } - var rows = excelHelper.getRows(conf, paths.head) - - if (rows.isEmpty) { /* If the first file is empty, not checking further */ - StructType(Seq.empty) - } else { - /* Prepare field names */ - val colNames = - if (options.header) { /* Get column name from the first row */ - val r = excelHelper.getColumnNames(rows.next()) - rows = rows.drop(options.ignoreAfterHeader) - r - } else { /* Peek first row, then return back */ - val headerRow = rows.next() - val r = excelHelper.getColumnNames(headerRow) - rows = Iterator(headerRow) ++ rows - r - } - - /* Other files also be utilized (lazily) for field types, reuse field name - from the first file */ - val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 - paths.tail.foreach(path => { - rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore) - }) - - /* Limit numer of rows to be used for schema infering */ - rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows - - /* Ready to infer schema */ - ExcelInferSchema(options).infer(rows, colNames) + val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths) + try { + if (sheetData.rowIterator.isEmpty) { + /* If the first file is empty, not checking further */ + StructType(Seq.empty) + } else { + /* Ready to infer schema */ + ExcelInferSchema(options).infer(sheetData.rowIterator, colNames) + } + } finally { + sheetData.close() } } } diff --git a/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala b/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala index 6302ec89..bab98bce 100644 --- a/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala +++ b/src/main/3.x/scala/com/crealytics/spark/v2/excel/ExcelPartitionReaderFactory.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration import java.net.URI +import scala.util.control.NonFatal /** A factory used to create Excel readers. * @@ -64,8 +65,8 @@ case class ExcelPartitionReaderFactory( val headerChecker = new ExcelHeaderChecker(actualReadDataSchema, parsedOptions, source = s"Excel file: ${file.filePath}") val iter = readFile(conf, file, parser, headerChecker, readDataSchema) - val fileReader = new PartitionReaderFromIterator[InternalRow](iter) - new PartitionReaderWithPartitionValues(fileReader, readDataSchema, partitionSchema, file.partitionValues) + val partitionReader = new SparkExcelPartitionReaderFromIterator(iter) + new PartitionReaderWithPartitionValues(partitionReader, readDataSchema, partitionSchema, file.partitionValues) } private def readFile( @@ -74,10 +75,28 @@ case class ExcelPartitionReaderFactory( parser: ExcelParser, headerChecker: ExcelHeaderChecker, requiredSchema: StructType - ): Iterator[InternalRow] = { + ): SheetData[InternalRow] = { val excelHelper = ExcelHelper(parsedOptions) - val rows = excelHelper.getRows(conf, URI.create(file.filePath)) - ExcelParser.parseIterator(rows, parser, headerChecker, requiredSchema) + val sheetData = excelHelper.getSheetData(conf, URI.create(file.filePath)) + try { + SheetData( + ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, requiredSchema), + sheetData.resourcesToClose + ) + } catch { + case NonFatal(t) => { + sheetData.close() + throw t + } + } } } + +private class SparkExcelPartitionReaderFromIterator(sheetData: SheetData[InternalRow]) + extends PartitionReaderFromIterator[InternalRow](sheetData.rowIterator) { + override def close(): Unit = { + super.close() + sheetData.close() + } +} diff --git a/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala b/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala index 6cee0f76..f73d0f8d 100644 --- a/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala +++ b/src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala @@ -32,6 +32,7 @@ import java.net.URI import java.text.{FieldPosition, Format, ParsePosition} import java.util.concurrent.atomic.AtomicBoolean import scala.util.Try +import scala.util.control.NonFatal /** A format that formats a double as a plain string without rounding and scientific notation. All other operations are * unsupported. @@ -130,13 +131,76 @@ class ExcelHelper private (options: ExcelOptions) { * @param uri * to the file, this can be on any support file system back end * @return - * cell-row iterator + * Sheet Data with row iterator (must be closed after use) */ - def getRows(conf: Configuration, uri: URI): Iterator[Vector[Cell]] = { + def getSheetData(conf: Configuration, uri: URI): SheetData[Vector[Cell]] = { val workbook = getWorkbook(conf, uri) val excelReader = DataLocator(options) - try { excelReader.readFrom(workbook) } - finally workbook.close() + try { + val rowIter = excelReader.readFrom(workbook) + SheetData(rowIter, Seq(workbook)) + } catch { + case NonFatal(t) => { + workbook.close() + throw t + } + } + } + + /** Get cell-row iterator for excel file in given URIs + * + * @param conf + * Hadoop configuration + * @param uris + * a seq of files, this can be on any support file system back end + * @return + * A tuple of Sheet Data with row iterator (must be closed after use) and Vector of column names + */ + def parseSheetData(conf: Configuration, uris: Seq[URI]): (SheetData[Vector[Cell]], Vector[String]) = { + var sheetData = getSheetData(conf, uris.head) + + val colNames = if (sheetData.rowIterator.isEmpty) { + Vector.empty + } else { + /* If the first file is empty, not checking further */ + try { + /* Prepare field names */ + val colNames = + if (options.header) { + /* Get column name from the first row */ + val r = getColumnNames(sheetData.rowIterator.next) + sheetData = sheetData.modifyIterator(_.drop(options.ignoreAfterHeader)) + r + } else { + /* Peek first row, then return back */ + val headerRow = sheetData.rowIterator.next + val r = getColumnNames(headerRow) + sheetData = sheetData.modifyIterator(iter => Iterator(headerRow) ++ iter) + r + } + + /* Other files also be utilized (lazily) for field types, reuse field name + from the first file */ + val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0 + sheetData = uris.tail.foldLeft(sheetData) { case (rs, path) => + val newRows = getSheetData(conf, path).modifyIterator(_.drop(numberOfRowToIgnore)) + rs.append(newRows) + } + + /* Limit numer of rows to be used for schema infering */ + options.excerptSize.foreach { excerptSize => + sheetData = sheetData.modifyIterator(_.take(excerptSize)) + } + + colNames + } catch { + case NonFatal(t) => { + sheetData.close() + throw t + } + } + } + (sheetData, colNames) } /** Get column name by list of cells (row) diff --git a/src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala b/src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala new file mode 100644 index 00000000..ea22d427 --- /dev/null +++ b/src/main/scala/com/crealytics/spark/v2/excel/SheetData.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 Martin Mauch (@nightscape) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.crealytics.spark.v2.excel + +import java.io.Closeable + +case class SheetData[T](rowIterator: Iterator[T], resourcesToClose: Seq[Closeable] = Seq.empty) extends Closeable { + def modifyIterator(f: Iterator[T] => Iterator[T]): SheetData[T] = SheetData(f(rowIterator), resourcesToClose) + def append(other: SheetData[T]): SheetData[T] = + SheetData(rowIterator ++ other.rowIterator, resourcesToClose ++ other.resourcesToClose) + override def close(): Unit = resourcesToClose.foreach(_.close()) +} From f9dd22d46ab93a228a70dc72971018cb9333c07a Mon Sep 17 00:00:00 2001 From: nightscape Date: Mon, 7 Nov 2022 22:06:37 +0000 Subject: [PATCH 10/10] docs: update CHANGELOG.md for v0.18.4 [skip ci] --- CHANGELOG.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e200c33..be8f647b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,19 @@ Next ==== +## [v0.18.4] - 2022-11-07 +### :sparkles: New Features +- [`4ceca4f`](https://github.com/crealytics/spark-excel/commit/4ceca4f18434652a9ecaab076ea381ca927588d6) - V2 streaming read *(PR [#653](https://github.com/crealytics/spark-excel/pull/653) by [@pjfanning](https://github.com/pjfanning))* + +### :wrench: Chores +- [`b86ce12`](https://github.com/crealytics/spark-excel/commit/b86ce1267c4831e64766f7172a63d136457a2a42) - Update scalafmt-core from 3.5.9 to 3.6.1 *(PR [#678](https://github.com/crealytics/spark-excel/pull/678) by [@scala-steward](https://github.com/scala-steward))* +- [`d09d232`](https://github.com/crealytics/spark-excel/commit/d09d23294ecfcb9894a8e3d7c584c011f9c042e8) - Update scalatest from 3.2.13 to 3.2.14 *(PR [#661](https://github.com/crealytics/spark-excel/pull/661) by [@scala-steward](https://github.com/scala-steward))* +- [`2344853`](https://github.com/crealytics/spark-excel/commit/2344853b1f7e14e29118a66985c546f547dddc0e) - Update poi-shared-strings from 2.5.4 to 2.5.5 *(PR [#659](https://github.com/crealytics/spark-excel/pull/659) by [@scala-steward](https://github.com/scala-steward))* +- [`1cd676e`](https://github.com/crealytics/spark-excel/commit/1cd676e6a14fb7e6ca0c9d0f7534526e4f7b57ae) - Update sbt-assembly from 1.2.0 to 2.0.0 *(PR [#665](https://github.com/crealytics/spark-excel/pull/665) by [@scala-steward](https://github.com/scala-steward))* +- [`ed97118`](https://github.com/crealytics/spark-excel/commit/ed97118a1ba7df720c18971d5c2e9635d0a8e0bb) - Update excel-streaming-reader from 4.0.2 to 4.0.4 *(PR [#670](https://github.com/crealytics/spark-excel/pull/670) by [@scala-steward](https://github.com/scala-steward))* +- [`7c96184`](https://github.com/crealytics/spark-excel/commit/7c96184398cf4e190208a61c187e06353d739f2d) - Update commons-compress from 1.21 to 1.22 *(PR [#676](https://github.com/crealytics/spark-excel/pull/676) by [@scala-steward](https://github.com/scala-steward))* + + ## [v0.18.0] - 2022-08-29 ### :wrench: Chores - [`64521bb`](https://github.com/crealytics/spark-excel/commit/64521bb6f4a9c763d9ed7d4ff8689dfc7c44bbf8) - Update base version *(commit by [@nightscape](https://github.com/nightscape))* @@ -154,3 +167,5 @@ Next [v0.18.0]: https://github.com/crealytics/spark-excel/compare/v0.18.0-beta2...v0.18.0 + +[v0.18.4]: https://github.com/crealytics/spark-excel/compare/v0.18.3-beta1...v0.18.4 \ No newline at end of file