Skip to content

Commit

Permalink
Merge pull request #2 from ghazi-naceur/csv-to-parquet
Browse files Browse the repository at this point in the history
Csv to parquet conversion
  • Loading branch information
ghazi-naceur authored Jul 26, 2020
2 parents 447b2f2 + 8f91ccf commit ce76bc6
Show file tree
Hide file tree
Showing 51 changed files with 619 additions and 190 deletions.
14 changes: 12 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@ name := "data-highway"

version := "0.1"

scalaVersion := "2.13.3"
scalaVersion := "2.12.12"

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq("com.github.pureconfig" %% "pureconfig" % "0.13.0",
"org.apache.poi" % "poi" % "4.1.2",
"org.apache.poi" % "poi-ooxml" % "4.1.2",
"org.scalatest" %% "scalatest" % "3.2.0",
"org.scalatest" %% "scalatest" % "3.2.0" % "test",
"org.typelevel" %% "cats-core" % "2.1.1",
"org.typelevel" %% "cats-effect" % "2.1.3"
"org.typelevel" %% "cats-effect" % "2.1.1",
"org.apache.spark" %% "spark-core" % "2.4.6",
"org.apache.spark" %% "spark-sql" % "2.4.6",
"org.apache.spark" %% "spark-hive" % "2.4.6",
"org.apache.spark" %% "spark-avro" % "2.4.6",
"org.apache.spark" %% "spark-streaming" % "2.4.6",
"MrPowers" % "spark-fast-tests" % "0.20.0-s_2.12"
)

scalacOptions += "-Ypartial-unification"
10 changes: 7 additions & 3 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
route {
type = xlsx-to-csv
in = src/test/resources/xlsx-data/input
out = src/test/resources/xlsx-data/output
// type = xlsx-to-csv
// in = src/test/resources/xlsx_to_csv-data/input
// out = src/test/resources/xlsx_to_csv-data/output
//
type = csv-to-parquet
in = src/test/resources/csv_to_parquet-data/input/
out = src/test/resources/csv_to_parquet-data/output/
}
15 changes: 12 additions & 3 deletions src/main/scala/io/oss/data/highway/App.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package io.oss.data.highway

import io.oss.data.highway.configuration.ConfLoader
import io.oss.data.highway.model.XlsxToCsv
import io.oss.data.highway.utils.XlsxCsvConverter
import io.oss.data.highway.model.{CsvToParquet, XlsxToCsv}
import io.oss.data.highway.utils.Constants.{
SEPARATOR,
XLSX_EXTENSION,
XLS_EXTENSION
}
import io.oss.data.highway.utils.{ParquetHandler, XlsxCsvConverter}
import org.apache.spark.sql.SaveMode.Overwrite

object App {

def main(args: Array[String]): Unit = {
for {
conf <- ConfLoader.loadConf()
_ <- conf.route match {
case XlsxToCsv(in, out) => XlsxCsvConverter.apply(in, out)
case XlsxToCsv(in, out) =>
XlsxCsvConverter.apply(in, out, Seq(XLS_EXTENSION, XLSX_EXTENSION))
case CsvToParquet(in, out) =>
ParquetHandler.apply(in, out, SEPARATOR, Overwrite)
}
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.oss.data.highway.configuration
import pureconfig.ConfigReader.Result
import pureconfig.ConfigSource


object ConfLoader {

def loadConf(): Result[Conf] = {
Expand Down
16 changes: 14 additions & 2 deletions src/main/scala/io/oss/data/highway/model/DataHighwayError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ trait DataHighwayError extends Throwable {

object DataHighwayError {

case class CsvGenerationError(message: String, cause: Throwable, stacktrace: Array[StackTraceElement]) extends DataHighwayError {
override def asString: String = s"message: $message \n cause: $cause \n stacktrace: ${stacktrace.mkString("\n")}"
case class ReadFileError(message: String,
cause: Throwable,
stacktrace: Array[StackTraceElement])
extends DataHighwayError {
override def asString: String =
s"- Message: $message \n- Cause: $cause \n- Stacktrace: ${stacktrace.mkString("\n")}"
}

case class PathNotFound(path: String) extends DataHighwayError {
Expand All @@ -20,4 +24,12 @@ object DataHighwayError {
override val stacktrace: Array[StackTraceElement] = null
override def asString: String = s"The provided path '$path' does not exist."
}

case class ParquetError(message: String,
cause: Throwable,
stacktrace: Array[StackTraceElement])
extends DataHighwayError {
override def asString: String =
s"- Message: $message \n- Cause: $cause \n- Stacktrace: ${stacktrace.mkString("\n")}"
}
}
1 change: 1 addition & 0 deletions src/main/scala/io/oss/data/highway/model/Route.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ package io.oss.data.highway.model
sealed trait Route

case class XlsxToCsv(in: String, out: String) extends Route
case class CsvToParquet(in: String, out: String) extends Route
6 changes: 3 additions & 3 deletions src/main/scala/io/oss/data/highway/utils/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ object Constants {
val EMPTY = ""
val FORMAT = "UTF-8"
val SEPARATOR = ";"
val CSV_EXTENSION = ".csv"
val XLS_EXTENSION = ".xls"
val XLSX_EXTENSION = ".xlsx"
val CSV_EXTENSION = "csv"
val XLS_EXTENSION = "xls"
val XLSX_EXTENSION = "xlsx"
val PATH_WITHOUT_EXTENSION = "[.][^.]+$"
}
97 changes: 97 additions & 0 deletions src/main/scala/io/oss/data/highway/utils/FilesUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.oss.data.highway.utils

import java.io.File

import io.oss.data.highway.model.DataHighwayError.ReadFileError
import cats.syntax.either._

object FilesUtils {

/**
* Gets files' names located in a provided path
*
* @param path The provided path
* @return a list of files names without the extension
*/
private[utils] def getFilesFromPath(
path: String,
extensions: Seq[String]): Either[ReadFileError, List[String]] = {
Either
.catchNonFatal {
listFilesRecursively(new File(path), extensions).map(_.getPath).toList
}
.leftMap(thr =>
ReadFileError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Lists files recursively from a path
*
* @param path The provided path
* @return a Seq of files
*/
private[utils] def listFilesRecursively(
path: File,
extensions: Seq[String]): Seq[File] = {
val files = path.listFiles
val result = files
.filter(_.isFile)
.filter(file => {
filterByExtension(file.getPath, extensions)
})
result ++
files
.filter(_.isDirectory)
.flatMap(f => listFilesRecursively(f, extensions))
}

/**
* Checks that the provided file has an extension that belongs to the provided ones
*
* @param file The provided file
* @param extensions The provided extensions
* @return True if the file has a valid extension, otherwise False
*/
private[utils] def filterByExtension(file: String,
extensions: Seq[String]): Boolean = {
val fileName = file.split("/").last
extensions.contains(fileName.substring(fileName.lastIndexOf(".") + 1))
}

/**
* Lists folders recursively from a path
*
* @param path The provided path
* @return a Seq of folders
*/
private[utils] def listFoldersRecursively(
path: String): Either[ReadFileError, List[String]] = {
@scala.annotation.tailrec
def getFolders(path: List[File], results: List[File]): Seq[File] =
path match {
case head :: tail =>
val files = head.listFiles
val directories = files.filter(_.isDirectory)
val updated =
if (files.size == directories.length) results else head :: results
getFolders(tail ++ directories, updated)
case _ => results
}

Either
.catchNonFatal {
getFolders(new File(path) :: Nil, Nil).map(_.getPath).reverse.toList
}
.leftMap(thr =>
ReadFileError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Replaces each backslash by a slash
*
* @param path The provided path
* @return a path with slash as file separator
*/
def reversePathSeparator(path: String): String =
path.replace("\\", "/")
}
86 changes: 86 additions & 0 deletions src/main/scala/io/oss/data/highway/utils/ParquetHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.oss.data.highway.utils

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import cats.implicits._
import io.oss.data.highway.model.DataHighwayError
import io.oss.data.highway.model.DataHighwayError.{ParquetError, ReadFileError}

object ParquetHandler {

val ss: SparkSession = SparkSession
.builder()
.appName("parquet-handler")
.master("local[*]")
.getOrCreate()
ss.sparkContext.setLogLevel("WARN")

/**
* Save a csv file as parquet
* @param in The input csv path
* @param out The generated parquet file path
* @param columnSeparator The column separator for each line in the csv file
* @param saveMode The file saving mode
* @return Unit if successful, otherwise Error
*/
def saveCsvAsParquet(in: String,
out: String,
columnSeparator: String,
saveMode: SaveMode): Either[ParquetError, Unit] = {
Either
.catchNonFatal {
ss.read
.option("inferSchema", "true")
.option("header", "true")
.option("sep", columnSeparator)
.csv(in)
.write
.mode(saveMode)
.parquet(out)
}
.leftMap(thr =>
ParquetError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Reads parquet file
* @param path The parquet file path
* @return DataFrame, otherwise Error
*/
def readParquet(path: String): Either[ParquetError, DataFrame] = {
Either
.catchNonFatal {
ss.read.parquet(path)
}
.leftMap(thr =>
ParquetError(thr.getMessage, thr.getCause, thr.getStackTrace))
}

/**
* Converts csv files to parquet files
*
* @param in The input csv path
* @param out The generated parquet file path
* @param columnSeparator The column separator for each line in the csv file
* @param saveMode The file saving mode
* @return List[Unit], otherwise Error
*/
def apply(in: String,
out: String,
columnSeparator: String,
saveMode: SaveMode): Either[DataHighwayError, List[Unit]] = {
for {
folders <- FilesUtils.listFoldersRecursively(in)
list <- folders
.traverse(folder => {
val suffix = FilesUtils.reversePathSeparator(folder).split("/").last
ParquetHandler
.saveCsvAsParquet(folder,
s"$out/$suffix",
columnSeparator,
saveMode)
})
.leftMap(error =>
ParquetError(error.message, error.cause, error.stacktrace))
} yield list
}
}
Loading

0 comments on commit ce76bc6

Please sign in to comment.