aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2014-12-31 11:50:53 -0800
committerXiangrui Meng <meng@databricks.com>2014-12-31 11:50:53 -0800
commit06a9aa589c518a40a3c7cc201e89d75af77ab93e (patch)
tree36b6887040603a9d3df7feab6a13cd556523d187 /mllib
parent352ed6bbe3c3b67e52e298e7c535ae414d96beca (diff)
downloadspark-06a9aa589c518a40a3c7cc201e89d75af77ab93e.tar.gz
spark-06a9aa589c518a40a3c7cc201e89d75af77ab93e.tar.bz2
spark-06a9aa589c518a40a3c7cc201e89d75af77ab93e.zip
[SPARK-4797] Replace breezeSquaredDistance
This PR replaces slow breezeSquaredDistance. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #3643 from viirya/faster_squareddistance and squashes the following commits: f28b275 [Liang-Chi Hsieh] Move the implementation to linalg.Vectors and rename as sqdist. 0bc48ee [Liang-Chi Hsieh] Merge branch 'master' into faster_squareddistance ba34422 [Liang-Chi Hsieh] Fix bug. 91849d0 [Liang-Chi Hsieh] Modified for comment. 44a65ad [Liang-Chi Hsieh] Modified for comments. 35db395 [Liang-Chi Hsieh] Fix bug and some modifications for comments. f4f5ebb [Liang-Chi Hsieh] Follow BLAS.dot pattern to replace intersect, diff with while-loop. a36e09f [Liang-Chi Hsieh] Use while-loop to replace foreach for better performance. d3e0628 [Liang-Chi Hsieh] Make the methods private. dd415bc [Liang-Chi Hsieh] Consider different cases of SparseVector and DenseVector. 13669db [Liang-Chi Hsieh] Replace breezeSquaredDistance.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala80
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala13
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala15
3 files changed, 100 insertions, 8 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 01f3f90577..6a782b079a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -312,6 +312,86 @@ object Vectors {
math.pow(sum, 1.0 / p)
}
}
+
+ /**
+ * Returns the squared distance between two Vectors.
+ * @param v1 first Vector.
+ * @param v2 second Vector.
+ * @return squared distance between two Vectors.
+ */
+ def sqdist(v1: Vector, v2: Vector): Double = {
+ var squaredDistance = 0.0
+ (v1, v2) match {
+ case (v1: SparseVector, v2: SparseVector) =>
+ val v1Values = v1.values
+ val v1Indices = v1.indices
+ val v2Values = v2.values
+ val v2Indices = v2.indices
+ val nnzv1 = v1Indices.size
+ val nnzv2 = v2Indices.size
+
+ var kv1 = 0
+ var kv2 = 0
+ while (kv1 < nnzv1 || kv2 < nnzv2) {
+ var score = 0.0
+
+ if (kv2 >= nnzv2 || (kv1 < nnzv1 && v1Indices(kv1) < v2Indices(kv2))) {
+ score = v1Values(kv1)
+ kv1 += 1
+ } else if (kv1 >= nnzv1 || (kv2 < nnzv2 && v2Indices(kv2) < v1Indices(kv1))) {
+ score = v2Values(kv2)
+ kv2 += 1
+ } else {
+ score = v1Values(kv1) - v2Values(kv2)
+ kv1 += 1
+ kv2 += 1
+ }
+ squaredDistance += score * score
+ }
+
+ case (v1: SparseVector, v2: DenseVector) if v1.indices.length / v1.size < 0.5 =>
+ squaredDistance = sqdist(v1, v2)
+
+ case (v1: DenseVector, v2: SparseVector) if v2.indices.length / v2.size < 0.5 =>
+ squaredDistance = sqdist(v2, v1)
+
+ // When a SparseVector is approximately dense, we treat it as a DenseVector
+ case (v1, v2) =>
+ squaredDistance = v1.toArray.zip(v2.toArray).foldLeft(0.0){ (distance, elems) =>
+ val score = elems._1 - elems._2
+ distance + score * score
+ }
+ }
+ squaredDistance
+ }
+
+ /**
+ * Returns the squared distance between DenseVector and SparseVector.
+ */
+ private[mllib] def sqdist(v1: SparseVector, v2: DenseVector): Double = {
+ var kv1 = 0
+ var kv2 = 0
+ val indices = v1.indices
+ var squaredDistance = 0.0
+ var iv1 = indices(kv1)
+ val nnzv2 = v2.size
+
+ while (kv2 < nnzv2) {
+ var score = 0.0
+ if (kv2 != iv1) {
+ score = v2(kv2)
+ } else {
+ score = v1.values(kv1) - v2(kv2)
+ if (kv1 < indices.length - 1) {
+ kv1 += 1
+ iv1 = indices(kv1)
+ }
+ }
+ squaredDistance += score * score
+ kv2 += 1
+ }
+ squaredDistance
+ }
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index da0da0a168..c7843464a7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -19,8 +19,7 @@ package org.apache.spark.mllib.util
import scala.reflect.ClassTag
-import breeze.linalg.{DenseVector => BDV, SparseVector => BSV,
- squaredDistance => breezeSquaredDistance}
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
import org.apache.spark.annotation.Experimental
import org.apache.spark.SparkContext
@@ -28,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.util.random.BernoulliCellSampler
import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.dot
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
@@ -266,7 +265,7 @@ object MLUtils {
}
Vectors.fromBreeze(vector1)
}
-
+
/**
* Returns the squared Euclidean distance between two vectors. The following formula will be used
* if it does not introduce too much numerical error:
@@ -316,12 +315,10 @@ object MLUtils {
val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) /
(sqDist + EPSILON)
if (precisionBound2 > precision) {
- // TODO: breezeSquaredDistance is slow,
- // so we should replace it with our own implementation.
- sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
+ sqDist = Vectors.sqdist(v1, v2)
}
} else {
- sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
+ sqDist = Vectors.sqdist(v1, v2)
}
sqDist
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index df07987093..7778847f8b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -52,12 +52,27 @@ class MLUtilsSuite extends FunSuite with MLlibTestSparkContext {
val values = indices.map(i => a(i))
val v2 = Vectors.sparse(n, indices, values)
val norm2 = Vectors.norm(v2, 2.0)
+ val v3 = Vectors.sparse(n, indices, indices.map(i => a(i) + 0.5))
+ val norm3 = Vectors.norm(v3, 2.0)
val squaredDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
val fastSquaredDist1 = fastSquaredDistance(v1, norm1, v2, norm2, precision)
assert((fastSquaredDist1 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
val fastSquaredDist2 =
fastSquaredDistance(v1, norm1, Vectors.dense(v2.toArray), norm2, precision)
assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
+ val squaredDist2 = breezeSquaredDistance(v2.toBreeze, v3.toBreeze)
+ val fastSquaredDist3 =
+ fastSquaredDistance(v2, norm2, v3, norm3, precision)
+ assert((fastSquaredDist3 - squaredDist2) <= precision * squaredDist2, s"failed with m = $m")
+ if (m > 10) {
+ val v4 = Vectors.sparse(n, indices.slice(0, m - 10),
+ indices.map(i => a(i) + 0.5).slice(0, m - 10))
+ val norm4 = Vectors.norm(v4, 2.0)
+ val squaredDist = breezeSquaredDistance(v2.toBreeze, v4.toBreeze)
+ val fastSquaredDist =
+ fastSquaredDistance(v2, norm2, v4, norm4, precision)
+ assert((fastSquaredDist - squaredDist) <= precision * squaredDist, s"failed with m = $m")
+ }
}
}