Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/knn #200

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@
</execution>
</executions>
</plugin>
<!-- clenaup artifact sign
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this needed for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somehow my local compilation was failing on signing the artifact...I need to install some plugin...will fix it

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
Expand All @@ -357,6 +358,7 @@
<executable>gpg2</executable>
</configuration>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
Expand Down
170 changes: 170 additions & 0 deletions src/main/scala/com/cloudera/sparkts/models/KNNRegression.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* Copyright (C) 2017 Verizon. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sparkts.models

import org.apache.spark.mllib.KernelType._
import org.apache.spark.mllib.Kernel
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import scala.collection.mutable.ArrayBuffer
import java.util.PriorityQueue
import java.util.Comparator
import scala.collection.JavaConverters._

class BoundedPriorityQueue[E](k: Int,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark has a BoundedPriorityQueue implementation - would it make sense to rely on that? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for BLAS I had to add the spark package and I can use the BoundedPriorityQueue from there...

comp: Comparator[E]) extends PriorityQueue[E](k, comp) {
override def add(elem: E): Boolean = {
if (size() < k) {
offer(elem)
true
} else {
val head = peek()
if (head != null && comp.compare(elem, head) > 0) {
poll()
offer(elem)
return true
}
return false
}
}
}

/**
* @author debasish83, xiangzhe, santanu.das
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use author tags in spark-ts and let readers use git blame to track down the originators of code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 other colleagues contributed to the idea and so I wanted to include them...I can add these in log messages....

*/
//TODO: history and query can be combined into a vector with offset giving queries
//TODO: distance and point both should be covered by Neighbor
case class Neighbor(history: Vector, query: Double)

class NeighborOrder extends Ordering[(Int, Double)] {
override def compare(x: (Int, Double), y: (Int, Double)): Int = {
x._2 compare y._2
}
}

class KNNRegressionModel extends TimeSeriesModel {
override def addTimeDependentEffects(ts: Vector, dest: Vector) = ???

override def removeTimeDependentEffects(ts: Vector, dest: Vector) = ???
}

object KNNRegression {
def maxnorm(timeseries: Array[Double]): Double = {
val max = timeseries.max
var i = 0
while (i < timeseries.length) {
timeseries(i) /= max
i += 1
}
return max
}

def nearestNeighbors(timeseries: Array[Double],
featureDim: Int,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queryLength: Int,
kernel: Kernel,
topk: Int): Array[Neighbor] = {
val targetArray = new Array[Neighbor](queryLength)
var i = 0
while (i < queryLength) {
var j = i
val regressorEnd = timeseries.size - queryLength - featureDim - 1 + i
val queryStart = regressorEnd + 1
val regressor = new Array[Double](featureDim)
val query = new Array[Double](featureDim)

if (queryStart > 0) {
Array.copy(timeseries, queryStart, query, 0, featureDim)
}

val ord = new NeighborOrder()
val minHeap = new BoundedPriorityQueue[(Int, Double)](topk, ord.reverse)

while (j <= regressorEnd) {
Array.copy(timeseries, j, regressor, 0, featureDim)
// Generate feature matrix for linear model generation
val distance = kernel.compute(Vectors.dense(regressor), 0, Vectors.dense(query), 0)
val targetIndex = j + featureDim
if (minHeap.size == topk) {
if (minHeap.peek()._2 > distance) {
minHeap.poll()
minHeap.add((targetIndex, distance))
}
}
else {
minHeap.add((targetIndex, distance))
}
j = j + 1
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

j += 1

}
val indexArray = new Array[Int](minHeap.size)
val matchedVector = new Array[Double](minHeap.size)
val heapToArray = minHeap.iterator().asScala.toArray
var k = 0
while (k < minHeap.size) {
val index = heapToArray(k)._1
indexArray(k) = index
matchedVector(k) = timeseries(index)
k += 1
}
val matchedTarget = Vectors.dense(matchedVector)
val queryPoint = timeseries(queryStart + featureDim)
minHeap.clear()
targetArray.update(i, Neighbor(matchedTarget, queryPoint))
i = i + 1
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i += 1?

}
targetArray
}

def predict(timeseries: Array[Double],
topk: Int,
featureDim: Int,
normalize: Boolean,
multiStep: Int,
metric: KernelType = Euclidean): Array[Double] = {
val kernel = Kernel(metric)
val max =
if (normalize) maxnorm(timeseries)
else 1.0

val historyBuf = new ArrayBuffer[Double](timeseries.length + multiStep)
timeseries.foreach(historyBuf += _)

val multiPredict = (0 until multiStep).toArray.map { case (_) =>
val neighbors = nearestNeighbors(
historyBuf.toArray,
featureDim,
queryLength = 1,
kernel,
topk)
require(neighbors.length == 1, s"neighbors ${neighbors.length} higher than 1")
val point = neighbors(0).query
val history = neighbors(0).history.toArray
val predicted = history.foldLeft(point)(_ + _) / (history.length + 1)
historyBuf += predicted
predicted
}

var i = 0
while (i < timeseries.length) {
timeseries.update(i, timeseries(i) * max)
i += 1
}
i = 0
while (i < multiPredict.length) {
multiPredict.update(i, multiPredict(i) * max)
i += 1
}
multiPredict
}
}
94 changes: 94 additions & 0 deletions src/main/scala/org/apache/spark/mllib/Kernel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright (C) 2017 Verizon. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib

/**
* @author debasish83, santanu.das
*/

import KernelType._
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.mllib.util._

trait Kernel {
def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double

def compute(vi: Vector, vj: Vector): Double = {
compute(vi: Vector, 0, vj: Vector, 0)
}
}

case class CosineKernelWithNorm(rowNorms: Map[Long, Double], threshold: Double) extends Kernel {
override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = {
val similarity = BLAS.dot(vi, vj) / rowNorms(indexi) / rowNorms(indexj)
if (similarity <= threshold) return 0.0
similarity
}
}

case class CosineKernel() extends Kernel {
override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = {
val similarity = BLAS.dot(vi, vj) / Vectors.norm(vi, 2) / Vectors.norm(vi, 2)
similarity
}
}

case class EuclideanKernelWithNorm(rowNorms: Map[Long, Double], threshold: Double) extends Kernel {
override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = {
val distanceSquare = MLUtils.fastSquaredDistance(vi, Vectors.norm(vi, 2), vj, Vectors.norm(vj, 2))
val similarity = Math.sqrt(distanceSquare)
if (similarity <= threshold) return 0.0
similarity
}
}

case class EuclideanKernel() extends Kernel {
override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = {
val distanceSquare = MLUtils.fastSquaredDistance(vi, Vectors.norm(vi, 2), vj, Vectors.norm(vj, 2))
val similarity = Math.sqrt(distanceSquare)
similarity
}
}

case class ProductKernel() extends Kernel {
override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = {
BLAS.dot(vi, vj)
}
}

case class ScaledProductKernelWithNorm(rowNorms: Map[Long, Double]) extends Kernel {
override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = {
BLAS.dot(vi, vj) / rowNorms(indexi)
}
}

case class ScaledProductKernel() extends Kernel {
override def compute(vi: Vector, indexi: Long, vj: Vector, indexj: Long): Double = {
BLAS.dot(vi, vj) / Vectors.norm(vi, 2)
}
}

// TO DO: Add more sparse kernels like poly2 and neural net kernel for kernel factorization/classification
object Kernel {
def apply(metric: KernelType) : Kernel = {
metric match {
case Euclidean => new EuclideanKernel()
case Cosine => new CosineKernel()
case Product => new ProductKernel()
case ScaledProduct => new ScaledProductKernel()
}
}
}
25 changes: 25 additions & 0 deletions src/main/scala/org/apache/spark/mllib/KernelType.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright (C) 2017 Verizon. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib

/**
* Supported kernel functions by Kernel
* @author santanu.das
*/

object KernelType extends Enumeration {
type KernelType = Value
val Cosine, Euclidean, Product, ScaledProduct = Value
}