Skip to content

Commit

Permalink
Merge branch 'main' into v2-integration-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nightscape authored Nov 7, 2022
2 parents 18f3823 + f9dd22d commit 00e4ab3
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 117 deletions.
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Scala Steward: Reformat with scalafmt 3.6.1
a834cf94453ed2f3ab1b87818c2fd124fe87fa2a
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.5.9
version = 3.6.1
style = default
runner.dialect=scala212
maxColumn = 120
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ Next
====


## [v0.18.4] - 2022-11-07
### :sparkles: New Features
- [`4ceca4f`](https://github.com/crealytics/spark-excel/commit/4ceca4f18434652a9ecaab076ea381ca927588d6) - V2 streaming read *(PR [#653](https://github.com/crealytics/spark-excel/pull/653) by [@pjfanning](https://github.com/pjfanning))*

### :wrench: Chores
- [`b86ce12`](https://github.com/crealytics/spark-excel/commit/b86ce1267c4831e64766f7172a63d136457a2a42) - Update scalafmt-core from 3.5.9 to 3.6.1 *(PR [#678](https://github.com/crealytics/spark-excel/pull/678) by [@scala-steward](https://github.com/scala-steward))*
- [`d09d232`](https://github.com/crealytics/spark-excel/commit/d09d23294ecfcb9894a8e3d7c584c011f9c042e8) - Update scalatest from 3.2.13 to 3.2.14 *(PR [#661](https://github.com/crealytics/spark-excel/pull/661) by [@scala-steward](https://github.com/scala-steward))*
- [`2344853`](https://github.com/crealytics/spark-excel/commit/2344853b1f7e14e29118a66985c546f547dddc0e) - Update poi-shared-strings from 2.5.4 to 2.5.5 *(PR [#659](https://github.com/crealytics/spark-excel/pull/659) by [@scala-steward](https://github.com/scala-steward))*
- [`1cd676e`](https://github.com/crealytics/spark-excel/commit/1cd676e6a14fb7e6ca0c9d0f7534526e4f7b57ae) - Update sbt-assembly from 1.2.0 to 2.0.0 *(PR [#665](https://github.com/crealytics/spark-excel/pull/665) by [@scala-steward](https://github.com/scala-steward))*
- [`ed97118`](https://github.com/crealytics/spark-excel/commit/ed97118a1ba7df720c18971d5c2e9635d0a8e0bb) - Update excel-streaming-reader from 4.0.2 to 4.0.4 *(PR [#670](https://github.com/crealytics/spark-excel/pull/670) by [@scala-steward](https://github.com/scala-steward))*
- [`7c96184`](https://github.com/crealytics/spark-excel/commit/7c96184398cf4e190208a61c187e06353d739f2d) - Update commons-compress from 1.21 to 1.22 *(PR [#676](https://github.com/crealytics/spark-excel/pull/676) by [@scala-steward](https://github.com/scala-steward))*


## [v0.18.0] - 2022-08-29
### :wrench: Chores
- [`64521bb`](https://github.com/crealytics/spark-excel/commit/64521bb6f4a9c763d9ed7d4ff8689dfc7c44bbf8) - Update base version *(commit by [@nightscape](https://github.com/nightscape))*
Expand Down Expand Up @@ -154,3 +167,5 @@ Next


[v0.18.0]: https://github.com/crealytics/spark-excel/compare/v0.18.0-beta2...v0.18.0

[v0.18.4]: https://github.com/crealytics/spark-excel/compare/v0.18.3-beta1...v0.18.4
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ shadedDeps ++= Seq(
"org.apache.poi" % "poi-ooxml-lite" % poiVersion,
"org.apache.xmlbeans" % "xmlbeans" % "5.1.1",
"com.norbitltd" %% "spoiwo" % "2.2.1",
"com.github.pjfanning" % "excel-streaming-reader" % "4.0.2",
"com.github.pjfanning" % "poi-shared-strings" % "2.5.4",
"com.github.pjfanning" % "excel-streaming-reader" % "4.0.4",
"com.github.pjfanning" % "poi-shared-strings" % "2.5.5",
"commons-io" % "commons-io" % "2.11.0",
"org.apache.commons" % "commons-compress" % "1.21",
"org.apache.commons" % "commons-compress" % "1.22",
"org.apache.logging.log4j" % "log4j-api" % "2.19.0",
"com.zaxxer" % "SparseBitSet" % "1.2",
"org.apache.commons" % "commons-collections4" % "4.4",
Expand All @@ -83,7 +83,7 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-hive" % testSparkVersion.value % "provided",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1",
"org.typelevel" %% "cats-core" % "2.8.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.13" % Test,
"org.scalatest" %% "scalatest" % "3.2.14" % Test,
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.11.0" % Test,
"org.scalacheck" %% "scalacheck" % "1.17.0" % Test,
"com.github.alexarchambault" %% "scalacheck-shapeless_1.15" % "1.3.0" % Test,
Expand Down
15 changes: 6 additions & 9 deletions build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,20 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule
val sparkDeps = Agg(
ivy"org.apache.spark::spark-core:$sparkVersion",
ivy"org.apache.spark::spark-sql:$sparkVersion",
ivy"org.apache.spark::spark-hive:$sparkVersion",
)
override def compileIvyDeps = sparkDeps ++ Agg(
ivy"org.slf4j:slf4j-api:1.7.36".excludeOrg("stax")
ivy"org.apache.spark::spark-hive:$sparkVersion"
)
override def compileIvyDeps = sparkDeps ++ Agg(ivy"org.slf4j:slf4j-api:1.7.36".excludeOrg("stax"))
val poiVersion = "5.2.3"
override def ivyDeps = Agg(
ivy"org.apache.poi:poi:$poiVersion",
ivy"org.apache.poi:poi-ooxml:$poiVersion",
ivy"org.apache.poi:poi-ooxml-lite:$poiVersion",
ivy"org.apache.xmlbeans:xmlbeans:5.1.1",
ivy"com.norbitltd::spoiwo:2.2.1",
ivy"com.github.pjfanning:excel-streaming-reader:4.0.2",
ivy"com.github.pjfanning:poi-shared-strings:2.5.4",
ivy"com.github.pjfanning:excel-streaming-reader:4.0.4",
ivy"com.github.pjfanning:poi-shared-strings:2.5.5",
ivy"commons-io:commons-io:2.11.0",
ivy"org.apache.commons:commons-compress:1.21",
ivy"org.apache.commons:commons-compress:1.22",
ivy"org.apache.logging.log4j:log4j-api:2.19.0",
ivy"com.zaxxer:SparseBitSet:1.2",
ivy"org.apache.commons:commons-collections4:4.4",
Expand All @@ -83,7 +81,7 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule
def repositoriesTask = T.task { super.repositoriesTask() ++ Seq(MavenRepository("https://jitpack.io")) }
def ivyDeps = sparkDeps ++ Agg(
ivy"org.typelevel::cats-core:2.8.0",
ivy"org.scalatest::scalatest:3.2.13",
ivy"org.scalatest::scalatest:3.2.14",
ivy"org.scalatestplus::scalacheck-1-15:3.2.11.0",
ivy"org.scalacheck::scalacheck:1.17.0",
ivy"com.github.alexarchambault::scalacheck-shapeless_1.15:1.3.0",
Expand All @@ -93,7 +91,6 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule
}
}


val scala213 = "2.13.10"
val scala212 = "2.12.17"
val spark24 = List("2.4.1", "2.4.7", "2.4.8")
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.4.14")
addSbtPlugin(
"org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0" excludeAll (ExclusionRule(organization = "com.danieltrinh"))
)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
Original file line number Diff line number Diff line change
Expand Up @@ -256,36 +256,16 @@ class ExcelDataSourceReader(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next)
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
rows = paths.tail.foldLeft(rows) { case (rs, path) =>
rs ++ excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}

/* Limit numer of rows to be used for schema infering */
rows = options.excerptSize.foldLeft(rows)(_ take _)

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
} finally {
sheetData.close()
}
}

Expand Down Expand Up @@ -327,9 +307,9 @@ class ExcelInputPartitionReader(
private val headerChecker =
new ExcelHeaderChecker(dataSchema, options, source = s"Excel file: ${path}")
private val excelHelper = ExcelHelper(options)
private val rows = excelHelper.getRows(new Configuration(), path)
private val sheetData = excelHelper.getSheetData(new Configuration(), path)

val reader = ExcelParser.parseIterator(rows, parser, headerChecker, dataSchema)
private val reader = ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, dataSchema)

private val fullSchema = requiredSchema
.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) ++
Expand All @@ -349,7 +329,9 @@ class ExcelInputPartitionReader(

override def next: Boolean = combinedReader.hasNext
override def get: InternalRow = combinedReader.next
override def close(): Unit = {}
override def close(): Unit = {
sheetData.close()
}
}

class ExcelDataSourceWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,36 +71,17 @@ case class ExcelTable(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next)
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
paths.tail.foreach(path => {
rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
})

/* Limit numer of rows to be used for schema infering */
rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
/* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}
} finally {
sheetData.close()
}
}
}
41 changes: 11 additions & 30 deletions src/main/3.2/scala/com/crealytics/spark/v2/excel/ExcelTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,17 @@ case class ExcelTable(
sample = if (sample < 1) 1 else sample
inputPaths.take(sample).map(_.getPath.toUri)
}
var rows = excelHelper.getRows(conf, paths.head)

if (rows.isEmpty) { /* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Prepare field names */
val colNames =
if (options.header) { /* Get column name from the first row */
val r = excelHelper.getColumnNames(rows.next())
rows = rows.drop(options.ignoreAfterHeader)
r
} else { /* Peek first row, then return back */
val headerRow = rows.next()
val r = excelHelper.getColumnNames(headerRow)
rows = Iterator(headerRow) ++ rows
r
}

/* Other files also be utilized (lazily) for field types, reuse field name
from the first file */
val numberOfRowToIgnore = if (options.header) (options.ignoreAfterHeader + 1) else 0
paths.tail.foreach(path => {
rows ++= excelHelper.getRows(conf, path).drop(numberOfRowToIgnore)
})

/* Limit numer of rows to be used for schema infering */
rows = if (options.excerptSize.isDefined) rows.take(options.excerptSize.get) else rows

/* Ready to infer schema */
ExcelInferSchema(options).infer(rows, colNames)
val (sheetData, colNames) = excelHelper.parseSheetData(conf, paths)
try {
if (sheetData.rowIterator.isEmpty) {
/* If the first file is empty, not checking further */
StructType(Seq.empty)
} else {
/* Ready to infer schema */
ExcelInferSchema(options).infer(sheetData.rowIterator, colNames)
}
} finally {
sheetData.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import java.net.URI
import scala.util.control.NonFatal

/** A factory used to create Excel readers.
*
Expand Down Expand Up @@ -64,8 +65,8 @@ case class ExcelPartitionReaderFactory(
val headerChecker =
new ExcelHeaderChecker(actualReadDataSchema, parsedOptions, source = s"Excel file: ${file.filePath}")
val iter = readFile(conf, file, parser, headerChecker, readDataSchema)
val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
new PartitionReaderWithPartitionValues(fileReader, readDataSchema, partitionSchema, file.partitionValues)
val partitionReader = new SparkExcelPartitionReaderFromIterator(iter)
new PartitionReaderWithPartitionValues(partitionReader, readDataSchema, partitionSchema, file.partitionValues)
}

private def readFile(
Expand All @@ -74,10 +75,28 @@ case class ExcelPartitionReaderFactory(
parser: ExcelParser,
headerChecker: ExcelHeaderChecker,
requiredSchema: StructType
): Iterator[InternalRow] = {
): SheetData[InternalRow] = {
val excelHelper = ExcelHelper(parsedOptions)
val rows = excelHelper.getRows(conf, URI.create(file.filePath))
ExcelParser.parseIterator(rows, parser, headerChecker, requiredSchema)
val sheetData = excelHelper.getSheetData(conf, URI.create(file.filePath))
try {
SheetData(
ExcelParser.parseIterator(sheetData.rowIterator, parser, headerChecker, requiredSchema),
sheetData.resourcesToClose
)
} catch {
case NonFatal(t) => {
sheetData.close()
throw t
}
}
}

}

private class SparkExcelPartitionReaderFromIterator(sheetData: SheetData[InternalRow])
extends PartitionReaderFromIterator[InternalRow](sheetData.rowIterator) {
override def close(): Unit = {
super.close()
sheetData.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.poi.hssf.usermodel.HSSFWorkbookFactory
import org.apache.poi.openxml4j.util.ZipInputStreamZipEntrySource
import org.apache.poi.util.IOUtils
import org.apache.poi.xssf.usermodel.XSSFWorkbookFactory
import scala.collection.JavaConverters.mapAsScalaMapConverter

trait WorkbookReader {
protected def openWorkbook(): Workbook
Expand Down Expand Up @@ -54,6 +55,10 @@ object WorkbookReader {
WorkbookFactory.addProvider(new HSSFWorkbookFactory)
WorkbookFactory.addProvider(new XSSFWorkbookFactory)

def apply(parameters: java.util.HashMap[String, String], hadoopConfiguration: Configuration): WorkbookReader = {
apply(parameters.asScala.toMap, hadoopConfiguration)
}

def apply(parameters: Map[String, String], hadoopConfiguration: Configuration): WorkbookReader = {
def readFromHadoop(location: String) = {
val path = new Path(location)
Expand Down
Loading

0 comments on commit 00e4ab3

Please sign in to comment.