diff --git a/src/main/scala/com/high-performance-spark-examples/GoldiLocks/GoldiLocksFirstTry.scala b/src/main/scala/com/high-performance-spark-examples/GoldiLocks/GoldiLocksFirstTry.scala index 873898d..2192302 100644 --- a/src/main/scala/com/high-performance-spark-examples/GoldiLocks/GoldiLocksFirstTry.scala +++ b/src/main/scala/com/high-performance-spark-examples/GoldiLocks/GoldiLocksFirstTry.scala @@ -1,19 +1,19 @@ package com.highperformancespark.examples.goldilocks -import org.apache.spark.Partition import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.storage.StorageLevel import scala.collection.mutable import scala.collection.mutable.ArrayBuffer - +import scala.collection.Map; +import scala.collection.mutable.MutableList; object GoldiLocksGroupByKey { //tag::groupByKey[] def findRankStatistics( pairRDD: RDD[(Int, Double)], - ranks: List[Long]): scala.collection.Map[Int, List[Double]] = { + ranks: List[Long]): Map[Int, List[Double]] = { pairRDD.groupByKey().mapValues(iter => { val ar = iter.toArray.sorted ranks.map(n => ar(n.toInt)) @@ -25,42 +25,93 @@ object GoldiLocksGroupByKey { //tag::firstTry[] object GoldiLocksFirstTry { - def findQuantiles( dataFrame: DataFrame, targetRanks: List[Long] ) = { - val n = dataFrame.schema.length - val valPairs: RDD[(Double, Int)] = getPairs(dataFrame) - val sorted = valPairs.sortByKey() - sorted.persist(StorageLevel.MEMORY_AND_DISK) - val parts : Array[Partition] = sorted.partitions - val map1 = getTotalsForeachPart(sorted, parts.length, n ) - val map2 = getLocationsOfRanksWithinEachPart(targetRanks, map1, n) - val result = findElementsIteratively(sorted, map2) + /** + * Find nth target rank for every column. + * + * For example: + * + * dataframe: + * (0.0, 4.5, 7.7, 5.0) + * (1.0, 5.5, 6.7, 6.0) + * (2.0, 5.5, 1.5, 7.0) + * (3.0, 5.5, 0.5, 7.0) + * (4.0, 5.5, 0.5, 8.0) + * + * targetRanks: + * 1, 3 + * + * The output will be: + * 0 -> (0.0, 2.0) + * 1 -> (4.5, 5.5) + * 2 -> (7.7, 1.5) + * 3 -> (5.0, 7.0) + * + * @param dataFrame dataframe of doubles + * @param targetRanks the required ranks for every column + * + * @return map of (column index, list of target ranks) + */ + def findQuantiles(dataFrame: DataFrame, targetRanks: List[Long]): + Map[Int, Iterable[Double]] = { + + val valueColumnPairs: RDD[(Double, Int)] = getValueColumnIndexPairs(dataFrame) + val sortedValueColumnPairs = valueColumnPairs.sortByKey() + sortedValueColumnPairs.persist(StorageLevel.MEMORY_AND_DISK) + + val numOfColumns = dataFrame.schema.length + val partitionColumnsFreq = getColumnFreqPerPartition(sortedValueColumnPairs, numOfColumns) + val ranksLocations = getLocationsOfRanksWithinEachPart(targetRanks, partitionColumnsFreq, numOfColumns) + val result = findElementsIteratively(sortedValueColumnPairs, ranksLocations) result.groupByKey().collectAsMap() } /** - * Step 1. Map the rows to pairs of (value, colIndex) - * @param dataFrame of double columns to compute the rank satistics for - * @return + * Step 1. Map the rows to pairs of (value, column Index). + * + * For example: + * + * dataFrame: + * 1.5, 1.25, 2.0 + * 5.25, 2.5, 1.5 + * + * The output RDD will be: + * (1.5, 0) (1.25, 1) (2.0, 2) (5.25, 0) (2.5, 1) (1.5, 2) + * + * @param dataFrame dateframe of doubles + * + * @return RDD of pairs (value, column Index) */ - private def getPairs(dataFrame : DataFrame ): RDD[(Double, Int )] ={ - dataFrame.flatMap( row => row.toSeq.zipWithIndex.map{ case (v, index ) => - (v.toString.toDouble, index )}) + private def getValueColumnIndexPairs(dataFrame : DataFrame): RDD[(Double, Int)] = { + dataFrame.flatMap(row => row.toSeq.zipWithIndex.map{ case (v, index) => + (v.toString.toDouble, index)}) } /** - * Step 2. Find the number of elements for each column in each partition - * @param sorted - the sorted (value, colIndex) pairs - * @param numPartitions - * @param n the number of columns - * @return an RDD the length of the number of partitions, where each row contains - * - the partition index - * - an array, totalsPerPart where totalsPerPart(i) = the number of elements in column - * i on this partition + * Step 2. Find the number of elements for each column in each partition. + * + * For Example: + * + * sortedValueColumnPairs: + * Partition 1: (1.5, 0) (1.25, 1) (2.0, 2) (5.25, 0) + * Partition 2: (7.5, 1) (9.5, 2) + * + * numOfColumns: 3 + * + * The output will be: + * [(0, [2, 1, 1]), (1, [0, 1, 1])] + * + * @param sortedValueColumnPairs - sorted RDD of (value, column Index) pairs + * @param numOfColumns the number of columns + * + * @return Array that contains (partition index, number of elements from every column on this partition) */ - private def getTotalsForeachPart(sorted: RDD[(Double, Int)], numPartitions: Int, n : Int ) = { - val zero = Array.fill[Long](n)(0) - sorted.mapPartitionsWithIndex((partitionIndex : Int, it : Iterator[(Double, Int)]) => { - val totalsPerPart : Array[Long] = it.aggregate(zero)( + private def getColumnFreqPerPartition(sortedValueColumnPairs: RDD[(Double, Int)], numOfColumns : Int): + Array[(Int, Array[Long])] = { + + val zero = Array.fill[Long](numOfColumns)(0) + + def aggregateColumnFrequencies (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) = { + val totalsPerPart : Array[Long] = valueColumnPairs.aggregate(zero)( (a : Array[Long], v : (Double ,Int)) => { val (value, colIndex) = v a(colIndex) = a(colIndex) + 1L @@ -70,62 +121,85 @@ object GoldiLocksFirstTry { require(a.length == b.length) a.zip(b).map{ case(aVal, bVal) => aVal + bVal} }) + Iterator((partitionIndex, totalsPerPart)) - }).collect() + } + + sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect() } + /** * Step 3: For each Partition determine the index of the elements that are desired rank statistics - * @param partitionMap- the result of the previous method - * @return an Array, the length of the number of partitions where each row contains - * - the partition index - * - a list, relevantIndexList where relevantIndexList(i) = the index of an element on this - * partition that matches one of the target ranks + * + * For Example: + * targetRanks: 5 + * partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])] + * numOfColumns: 2 + * + * The output will be: + * [(0, []), (1, [(0, 3)]), (2, [(1, 1)])] + * + * @param partitionColumnsFreq Array of (partition index, columns frequencies per this partition) + * + * @return Array that contains (partition index, relevantIndexList where relevantIndexList(i) = the index + * of an element on this partition that matches one of the target ranks) */ private def getLocationsOfRanksWithinEachPart(targetRanks : List[Long], - partitionMap : Array[(Int, Array[Long])], n : Int ) : Array[(Int, List[(Int, Long)])] = { - val runningTotal = Array.fill[Long](n)(0) - partitionMap.sortBy(_._1).map { case (partitionIndex, totals)=> - val relevantIndexList = new scala.collection.mutable.MutableList[(Int, Long)]() - totals.zipWithIndex.foreach{ case (colCount, colIndex) => { + partitionColumnsFreq : Array[(Int, Array[Long])], numOfColumns : Int) : Array[(Int, List[(Int, Long)])] = { + + val runningTotal = Array.fill[Long](numOfColumns)(0) + + partitionColumnsFreq.sortBy(_._1).map { case (partitionIndex, columnsFreq) => + val relevantIndexList = new MutableList[(Int, Long)]() + + columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) => { val runningTotalCol = runningTotal(colIndex) + val ranksHere: List[Long] = targetRanks.filter(rank => (runningTotalCol < rank && runningTotalCol + colCount >= rank)) + + // for each of the rank statistics present add this column index and the index it will be at + // on this partition (the rank - the running total) + relevantIndexList ++= ranksHere.map(rank => (colIndex, rank - runningTotalCol)) + runningTotal(colIndex) += colCount - val ranksHere = targetRanks.filter(rank => - runningTotalCol <= rank && runningTotalCol + colCount >= rank - ) - //for each of the rank statistics present add this column index and the index it will be - //at on this partition (the rank - the running total) - ranksHere.foreach(rank => { - relevantIndexList += ((colIndex, rank-runningTotalCol)) - }) }} + (partitionIndex, relevantIndexList.toList) } } /** - * Step4: Using the results of the previous method, scan the data and return the elements - * which correspond to the rank statistics we are looking for in each column - */ - private def findElementsIteratively(sorted : RDD[(Double, Int)], locations : Array[(Int, List[(Int, Long)])]) = { - sorted.mapPartitionsWithIndex((index : Int, it : Iterator[(Double, Int)]) => { - val targetsInThisPart = locations(index)._2 - val len = targetsInThisPart.length - if (len > 0) { - val partMap = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2)) - val keysInThisPart = targetsInThisPart.map(_._1).distinct + * Finds rank statistics elements using ranksLocations. + * + * @param sortedValueColumnPairs - sorted RDD of (value, colIndex) pairs + * @param ranksLocations Array of (partition Index, list of (column index, rank index of this column at this partition)) + * + * @return + */ + private def findElementsIteratively(sortedValueColumnPairs : RDD[(Double, Int)], + ranksLocations : Array[(Int, List[(Int, Long)])]): RDD[(Int, Double)] = { + + sortedValueColumnPairs.mapPartitionsWithIndex((partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => { + val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2 + if (!targetsInThisPart.isEmpty) { + val columnsRelativeIndex: Map[Int, List[Long]] = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2)) + val columnsInThisPart = targetsInThisPart.map(_._1).distinct + val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap() - keysInThisPart.foreach(key => runningTotals+=((key, 0L))) - val newIt : ArrayBuffer[(Int, Double)] = new scala.collection.mutable.ArrayBuffer() - it.foreach{ case( value, colIndex) => { - if(runningTotals isDefinedAt colIndex){ + runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap + + val result : ArrayBuffer[(Int, Double)] = new scala.collection.mutable.ArrayBuffer() + + valueColumnPairs.foreach{ case(value, colIndex) => { + if (runningTotals isDefinedAt colIndex) { val total = runningTotals(colIndex) + 1L runningTotals.update(colIndex, total) - if(partMap(colIndex).contains(total)){ - newIt += ((colIndex,value )) - } + + if (columnsRelativeIndex(colIndex).contains(total)) + result += ((colIndex, value)) } }} - newIt.toIterator + + result.toIterator } else { Iterator.empty