Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation and examples to GoldiLocksFirstTry #7

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.highperformancespark.examples.goldilocks

import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import scala.collection.Map;
import scala.collection.mutable.MutableList;

object GoldiLocksGroupByKey {
//tag::groupByKey[]
def findRankStatistics(
pairRDD: RDD[(Int, Double)],
ranks: List[Long]): scala.collection.Map[Int, List[Double]] = {
ranks: List[Long]): Map[Int, List[Double]] = {
pairRDD.groupByKey().mapValues(iter => {
val ar = iter.toArray.sorted
ranks.map(n => ar(n.toInt))
Expand All @@ -25,42 +25,93 @@ object GoldiLocksGroupByKey {
//tag::firstTry[]
object GoldiLocksFirstTry {

def findQuantiles( dataFrame: DataFrame, targetRanks: List[Long] ) = {
val n = dataFrame.schema.length
val valPairs: RDD[(Double, Int)] = getPairs(dataFrame)
val sorted = valPairs.sortByKey()
sorted.persist(StorageLevel.MEMORY_AND_DISK)
val parts : Array[Partition] = sorted.partitions
val map1 = getTotalsForeachPart(sorted, parts.length, n )
val map2 = getLocationsOfRanksWithinEachPart(targetRanks, map1, n)
val result = findElementsIteratively(sorted, map2)
/**
* Find nth target rank for every column.
*
* For example:
*
* dataframe:
* (0.0, 4.5, 7.7, 5.0)
* (1.0, 5.5, 6.7, 6.0)
* (2.0, 5.5, 1.5, 7.0)
* (3.0, 5.5, 0.5, 7.0)
* (4.0, 5.5, 0.5, 8.0)
*
* targetRanks:
* 1, 3
*
* The output will be:
* 0 -> (0.0, 2.0)
* 1 -> (4.5, 5.5)
* 2 -> (7.7, 1.5)
* 3 -> (5.0, 7.0)
*
* @param dataFrame dataframe of doubles
* @param targetRanks the required ranks for every column
*
* @return map of (column index, list of target ranks)
*/
def findQuantiles(dataFrame: DataFrame, targetRanks: List[Long]):
Map[Int, Iterable[Double]] = {

val valueColumnPairs: RDD[(Double, Int)] = getValueColumnIndexPairs(dataFrame)
val sortedValueColumnPairs = valueColumnPairs.sortByKey()
sortedValueColumnPairs.persist(StorageLevel.MEMORY_AND_DISK)

val numOfColumns = dataFrame.schema.length
val partitionColumnsFreq = getColumnFreqPerPartition(sortedValueColumnPairs, numOfColumns)
val ranksLocations = getLocationsOfRanksWithinEachPart(targetRanks, partitionColumnsFreq, numOfColumns)
val result = findElementsIteratively(sortedValueColumnPairs, ranksLocations)
result.groupByKey().collectAsMap()
}

/**
* Step 1. Map the rows to pairs of (value, colIndex)
* @param dataFrame of double columns to compute the rank satistics for
* @return
* Step 1. Map the rows to pairs of (value, column Index).
*
* For example:
*
* dataFrame:
* 1.5, 1.25, 2.0
* 5.25, 2.5, 1.5
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe get these two rows to line up

* The output RDD will be:
* (1.5, 0) (1.25, 1) (2.0, 2) (5.25, 0) (2.5, 1) (1.5, 2)
*
* @param dataFrame dateframe of doubles
*
* @return RDD of pairs (value, column Index)
*/
private def getPairs(dataFrame : DataFrame ): RDD[(Double, Int )] ={
dataFrame.flatMap( row => row.toSeq.zipWithIndex.map{ case (v, index ) =>
(v.toString.toDouble, index )})
private def getValueColumnIndexPairs(dataFrame : DataFrame): RDD[(Double, Int)] = {
dataFrame.flatMap(row => row.toSeq.zipWithIndex.map{ case (v, index) =>
(v.toString.toDouble, index)})
}

/**
* Step 2. Find the number of elements for each column in each partition
* @param sorted - the sorted (value, colIndex) pairs
* @param numPartitions
* @param n the number of columns
* @return an RDD the length of the number of partitions, where each row contains
* - the partition index
* - an array, totalsPerPart where totalsPerPart(i) = the number of elements in column
* i on this partition
* Step 2. Find the number of elements for each column in each partition.
*
* For Example:
*
* sortedValueColumnPairs:
* Partition 1: (1.5, 0) (1.25, 1) (2.0, 2) (5.25, 0)
* Partition 2: (7.5, 1) (9.5, 2)
*
* numOfColumns: 3
*
* The output will be:
* [(0, [2, 1, 1]), (1, [0, 1, 1])]
*
* @param sortedValueColumnPairs - sorted RDD of (value, column Index) pairs
* @param numOfColumns the number of columns
*
* @return Array that contains (partition index, number of elements from every column on this partition)
*/
private def getTotalsForeachPart(sorted: RDD[(Double, Int)], numPartitions: Int, n : Int ) = {
val zero = Array.fill[Long](n)(0)
sorted.mapPartitionsWithIndex((partitionIndex : Int, it : Iterator[(Double, Int)]) => {
val totalsPerPart : Array[Long] = it.aggregate(zero)(
private def getColumnFreqPerPartition(sortedValueColumnPairs: RDD[(Double, Int)], numOfColumns : Int):
Array[(Int, Array[Long])] = {

val zero = Array.fill[Long](numOfColumns)(0)

def aggregateColumnFrequencies (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) = {
val totalsPerPart : Array[Long] = valueColumnPairs.aggregate(zero)(
(a : Array[Long], v : (Double ,Int)) => {
val (value, colIndex) = v
a(colIndex) = a(colIndex) + 1L
Expand All @@ -70,62 +121,85 @@ object GoldiLocksFirstTry {
require(a.length == b.length)
a.zip(b).map{ case(aVal, bVal) => aVal + bVal}
})

Iterator((partitionIndex, totalsPerPart))
}).collect()
}

sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()
}

/**
* Step 3: For each Partition determine the index of the elements that are desired rank statistics
* @param partitionMap- the result of the previous method
* @return an Array, the length of the number of partitions where each row contains
* - the partition index
* - a list, relevantIndexList where relevantIndexList(i) = the index of an element on this
* partition that matches one of the target ranks
*
* For Example:
* targetRanks: 5
* partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]
* numOfColumns: 2
*
* The output will be:
* [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]
*
* @param partitionColumnsFreq Array of (partition index, columns frequencies per this partition)
*
* @return Array that contains (partition index, relevantIndexList where relevantIndexList(i) = the index
* of an element on this partition that matches one of the target ranks)
*/
private def getLocationsOfRanksWithinEachPart(targetRanks : List[Long],
partitionMap : Array[(Int, Array[Long])], n : Int ) : Array[(Int, List[(Int, Long)])] = {
val runningTotal = Array.fill[Long](n)(0)
partitionMap.sortBy(_._1).map { case (partitionIndex, totals)=>
val relevantIndexList = new scala.collection.mutable.MutableList[(Int, Long)]()
totals.zipWithIndex.foreach{ case (colCount, colIndex) => {
partitionColumnsFreq : Array[(Int, Array[Long])], numOfColumns : Int) : Array[(Int, List[(Int, Long)])] = {

val runningTotal = Array.fill[Long](numOfColumns)(0)

partitionColumnsFreq.sortBy(_._1).map { case (partitionIndex, columnsFreq) =>
val relevantIndexList = new MutableList[(Int, Long)]()

columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) => {
val runningTotalCol = runningTotal(colIndex)
val ranksHere: List[Long] = targetRanks.filter(rank => (runningTotalCol < rank && runningTotalCol + colCount >= rank))

// for each of the rank statistics present add this column index and the index it will be at
// on this partition (the rank - the running total)
relevantIndexList ++= ranksHere.map(rank => (colIndex, rank - runningTotalCol))

runningTotal(colIndex) += colCount
val ranksHere = targetRanks.filter(rank =>
runningTotalCol <= rank && runningTotalCol + colCount >= rank
)
//for each of the rank statistics present add this column index and the index it will be
//at on this partition (the rank - the running total)
ranksHere.foreach(rank => {
relevantIndexList += ((colIndex, rank-runningTotalCol))
})
}}

(partitionIndex, relevantIndexList.toList)
}
}

/**
* Step4: Using the results of the previous method, scan the data and return the elements
* which correspond to the rank statistics we are looking for in each column
*/
private def findElementsIteratively(sorted : RDD[(Double, Int)], locations : Array[(Int, List[(Int, Long)])]) = {
sorted.mapPartitionsWithIndex((index : Int, it : Iterator[(Double, Int)]) => {
val targetsInThisPart = locations(index)._2
val len = targetsInThisPart.length
if (len > 0) {
val partMap = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val keysInThisPart = targetsInThisPart.map(_._1).distinct
* Finds rank statistics elements using ranksLocations.
*
* @param sortedValueColumnPairs - sorted RDD of (value, colIndex) pairs
* @param ranksLocations Array of (partition Index, list of (column index, rank index of this column at this partition))
*
* @return
*/
private def findElementsIteratively(sortedValueColumnPairs : RDD[(Double, Int)],
ranksLocations : Array[(Int, List[(Int, Long)])]): RDD[(Int, Double)] = {

sortedValueColumnPairs.mapPartitionsWithIndex((partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => {
val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2
if (!targetsInThisPart.isEmpty) {
val columnsRelativeIndex: Map[Int, List[Long]] = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val columnsInThisPart = targetsInThisPart.map(_._1).distinct

val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
keysInThisPart.foreach(key => runningTotals+=((key, 0L)))
val newIt : ArrayBuffer[(Int, Double)] = new scala.collection.mutable.ArrayBuffer()
it.foreach{ case( value, colIndex) => {
if(runningTotals isDefinedAt colIndex){
runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap

val result : ArrayBuffer[(Int, Double)] = new scala.collection.mutable.ArrayBuffer()

valueColumnPairs.foreach{ case(value, colIndex) => {
if (runningTotals isDefinedAt colIndex) {
val total = runningTotals(colIndex) + 1L
runningTotals.update(colIndex, total)
if(partMap(colIndex).contains(total)){
newIt += ((colIndex,value ))
}

if (columnsRelativeIndex(colIndex).contains(total))
result += ((colIndex, value))
}
}}
newIt.toIterator

result.toIterator
}
else {
Iterator.empty
Expand Down