Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Csv to parquet conversion #2

Merged
merged 6 commits into from
Jul 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@ name := "data-highway"

version := "0.1"

scalaVersion := "2.13.3"
scalaVersion := "2.12.12"

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq("com.github.pureconfig" %% "pureconfig" % "0.13.0",
"org.apache.poi" % "poi" % "4.1.2",
"org.apache.poi" % "poi-ooxml" % "4.1.2",
"org.scalatest" %% "scalatest" % "3.2.0",
"org.scalatest" %% "scalatest" % "3.2.0" % "test",
"org.typelevel" %% "cats-core" % "2.1.1",
"org.typelevel" %% "cats-effect" % "2.1.3"
"org.typelevel" %% "cats-effect" % "2.1.1",
"org.apache.spark" %% "spark-core" % "2.4.6",
"org.apache.spark" %% "spark-sql" % "2.4.6",
"org.apache.spark" %% "spark-hive" % "2.4.6",
"org.apache.spark" %% "spark-avro" % "2.4.6",
"org.apache.spark" %% "spark-streaming" % "2.4.6",
"MrPowers" % "spark-fast-tests" % "0.20.0-s_2.12"
)

scalacOptions += "-Ypartial-unification"
10 changes: 7 additions & 3 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
route {
type = xlsx-to-csv
in = src/test/resources/xlsx-data/input
out = src/test/resources/xlsx-data/output
// type = xlsx-to-csv
// in = src/test/resources/xlsx_to_csv-data/input
// out = src/test/resources/xlsx_to_csv-data/output
//
type = csv-to-parquet
in = src/test/resources/csv_to_parquet-data/input/
out = src/test/resources/csv_to_parquet-data/output/
}
15 changes: 12 additions & 3 deletions src/main/scala/io/oss/data/highway/App.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package io.oss.data.highway

import io.oss.data.highway.configuration.ConfLoader
import io.oss.data.highway.model.XlsxToCsv
import io.oss.data.highway.utils.XlsxCsvConverter
import io.oss.data.highway.model.{CsvToParquet, XlsxToCsv}
import io.oss.data.highway.utils.Constants.{
SEPARATOR,
XLSX_EXTENSION,
XLS_EXTENSION
}
import io.oss.data.highway.utils.{ParquetHandler, XlsxCsvConverter}
import org.apache.spark.sql.SaveMode.Overwrite

object App {

def main(args: Array[String]): Unit = {
for {
conf <- ConfLoader.loadConf()
_ <- conf.route match {
case XlsxToCsv(in, out) => XlsxCsvConverter.apply(in, out)
case XlsxToCsv(in, out) =>
XlsxCsvConverter.apply(in, out, Seq(XLS_EXTENSION, XLSX_EXTENSION))
case CsvToParquet(in, out) =>
ParquetHandler.apply(in, out, SEPARATOR, Overwrite)
}
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.oss.data.highway.configuration
import pureconfig.ConfigReader.Result
import pureconfig.ConfigSource


object ConfLoader {

def loadConf(): Result[Conf] = {
Expand Down
16 changes: 14 additions & 2 deletions src/main/scala/io/oss/data/highway/model/DataHighwayError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ trait DataHighwayError extends Throwable {

object DataHighwayError {

case class CsvGenerationError(message: String, cause: Throwable, stacktrace: Array[StackTraceElement]) extends DataHighwayError {
override def asString: String = s"message: $message \n cause: $cause \n stacktrace: ${stacktrace.mkString("\n")}"
case class ReadFileError(message: String,
cause: Throwable,
stacktrace: Array[StackTraceElement])
extends DataHighwayError {
override def asString: String =
s"- Message: $message \n- Cause: $cause \n- Stacktrace: ${stacktrace.mkString("\n")}"
}

case class PathNotFound(path: String) extends DataHighwayError {
Expand All @@ -20,4 +24,12 @@ object DataHighwayError {
override val stacktrace: Array[StackTraceElement] = null
override def asString: String = s"The provided path '$path' does not exist."
}

case class ParquetError(message: String,
cause: Throwable,
stacktrace: Array[StackTraceElement])
extends DataHighwayError {
override def asString: String =
s"- Message: $message \n- Cause: $cause \n- Stacktrace: ${stacktrace.mkString("\n")}"
}
}
1 change: 1 addition & 0 deletions src/main/scala/io/oss/data/highway/model/Route.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ package io.oss.data.highway.model
sealed trait Route

case class XlsxToCsv(in: String, out: String) extends Route
case class CsvToParquet(in: String, out: String) extends Route
6 changes: 3 additions & 3 deletions src/main/scala/io/oss/data/highway/utils/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ object Constants {
val EMPTY = ""
val FORMAT = "UTF-8"
val SEPARATOR = ";"
val CSV_EXTENSION = ".csv"
val XLS_EXTENSION = ".xls"
val XLSX_EXTENSION = ".xlsx"
val CSV_EXTENSION = "csv"
val XLS_EXTENSION = "xls"
val XLSX_EXTENSION = "xlsx"
val PATH_WITHOUT_EXTENSION = "[.][^.]+$"
}
97 changes: 97 additions & 0 deletions src/main/scala/io/oss/data/highway/utils/FilesUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.oss.data.highway.utils

import java.io.File

import io.oss.data.highway.model.DataHighwayError.ReadFileError
import cats.syntax.either._

object FilesUtils {

/**
* Gets files' names located in a provided path
*
* @param path The provided path
* @return a list of files names without the extension
*/
private[utils] def getFilesFromPath(
path: String,
extensions: Seq[String]): Either[ReadFileError, List[String]] = {
Either
.catchNonFatal {
listFilesRecursively(new File(path), extensions).map(_.getPath).toList
}
.leftMap(thr =>
ReadFileError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Lists files recursively from a path
*
* @param path The provided path
* @return a Seq of files
*/
private[utils] def listFilesRecursively(
path: File,
extensions: Seq[String]): Seq[File] = {
val files = path.listFiles
val result = files
.filter(_.isFile)
.filter(file => {
filterByExtension(file.getPath, extensions)
})
result ++
files
.filter(_.isDirectory)
.flatMap(f => listFilesRecursively(f, extensions))
}

/**
* Checks that the provided file has an extension that belongs to the provided ones
*
* @param file The provided file
* @param extensions The provided extensions
* @return True if the file has a valid extension, otherwise False
*/
private[utils] def filterByExtension(file: String,
extensions: Seq[String]): Boolean = {
val fileName = file.split("/").last
extensions.contains(fileName.substring(fileName.lastIndexOf(".") + 1))
}

/**
* Lists folders recursively from a path
*
* @param path The provided path
* @return a Seq of folders
*/
private[utils] def listFoldersRecursively(
path: String): Either[ReadFileError, List[String]] = {
@scala.annotation.tailrec
def getFolders(path: List[File], results: List[File]): Seq[File] =
path match {
case head :: tail =>
val files = head.listFiles
val directories = files.filter(_.isDirectory)
val updated =
if (files.size == directories.length) results else head :: results
getFolders(tail ++ directories, updated)
case _ => results
}

Either
.catchNonFatal {
getFolders(new File(path) :: Nil, Nil).map(_.getPath).reverse.toList
}
.leftMap(thr =>
ReadFileError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Replaces each backslash by a slash
*
* @param path The provided path
* @return a path with slash as file separator
*/
def reversePathSeparator(path: String): String =
path.replace("\\", "/")
}
86 changes: 86 additions & 0 deletions src/main/scala/io/oss/data/highway/utils/ParquetHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.oss.data.highway.utils

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import cats.implicits._
import io.oss.data.highway.model.DataHighwayError
import io.oss.data.highway.model.DataHighwayError.{ParquetError, ReadFileError}

object ParquetHandler {

val ss: SparkSession = SparkSession
.builder()
.appName("parquet-handler")
.master("local[*]")
.getOrCreate()
ss.sparkContext.setLogLevel("WARN")

/**
* Save a csv file as parquet
* @param in The input csv path
* @param out The generated parquet file path
* @param columnSeparator The column separator for each line in the csv file
* @param saveMode The file saving mode
* @return Unit if successful, otherwise Error
*/
def saveCsvAsParquet(in: String,
out: String,
columnSeparator: String,
saveMode: SaveMode): Either[ParquetError, Unit] = {
Either
.catchNonFatal {
ss.read
.option("inferSchema", "true")
.option("header", "true")
.option("sep", columnSeparator)
.csv(in)
.write
.mode(saveMode)
.parquet(out)
}
.leftMap(thr =>
ParquetError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Reads parquet file
* @param path The parquet file path
* @return DataFrame, otherwise Error
*/
def readParquet(path: String): Either[ParquetError, DataFrame] = {
Either
.catchNonFatal {
ss.read.parquet(path)
}
.leftMap(thr =>
ParquetError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Converts csv files to parquet files
*
* @param in The input csv path
* @param out The generated parquet file path
* @param columnSeparator The column separator for each line in the csv file
* @param saveMode The file saving mode
* @return List[Unit], otherwise Error
*/
def apply(in: String,
out: String,
columnSeparator: String,
saveMode: SaveMode): Either[DataHighwayError, List[Unit]] = {
for {
folders <- FilesUtils.listFoldersRecursively(in)
list <- folders
.traverse(folder => {
val suffix = FilesUtils.reversePathSeparator(folder).split("/").last
ParquetHandler
.saveCsvAsParquet(folder,
s"$out/$suffix",
columnSeparator,
saveMode)
})
.leftMap(error =>
ParquetError(error.message, error.cause, error.stacktrace))
} yield list
}
}
Loading