From 06a9aa589c518a40a3c7cc201e89d75af77ab93e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 31 Dec 2014 11:50:53 -0800 Subject: [SPARK-4797] Replace breezeSquaredDistance This PR replaces slow breezeSquaredDistance. Author: Liang-Chi Hsieh Closes #3643 from viirya/faster_squareddistance and squashes the following commits: f28b275 [Liang-Chi Hsieh] Move the implementation to linalg.Vectors and rename as sqdist. 0bc48ee [Liang-Chi Hsieh] Merge branch 'master' into faster_squareddistance ba34422 [Liang-Chi Hsieh] Fix bug. 91849d0 [Liang-Chi Hsieh] Modified for comment. 44a65ad [Liang-Chi Hsieh] Modified for comments. 35db395 [Liang-Chi Hsieh] Fix bug and some modifications for comments. f4f5ebb [Liang-Chi Hsieh] Follow BLAS.dot pattern to replace intersect, diff with while-loop. a36e09f [Liang-Chi Hsieh] Use while-loop to replace foreach for better performance. d3e0628 [Liang-Chi Hsieh] Make the methods private. dd415bc [Liang-Chi Hsieh] Consider different cases of SparseVector and DenseVector. 13669db [Liang-Chi Hsieh] Replace breezeSquaredDistance. --- .../org/apache/spark/mllib/linalg/Vectors.scala | 80 ++++++++++++++++++++++ .../org/apache/spark/mllib/util/MLUtils.scala | 13 ++-- 2 files changed, 85 insertions(+), 8 deletions(-) (limited to 'mllib/src/main') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 01f3f90577..6a782b079a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -312,6 +312,86 @@ object Vectors { math.pow(sum, 1.0 / p) } } + + /** + * Returns the squared distance between two Vectors. + * @param v1 first Vector. + * @param v2 second Vector. + * @return squared distance between two Vectors. + */ + def sqdist(v1: Vector, v2: Vector): Double = { + var squaredDistance = 0.0 + (v1, v2) match { + case (v1: SparseVector, v2: SparseVector) => + val v1Values = v1.values + val v1Indices = v1.indices + val v2Values = v2.values + val v2Indices = v2.indices + val nnzv1 = v1Indices.size + val nnzv2 = v2Indices.size + + var kv1 = 0 + var kv2 = 0 + while (kv1 < nnzv1 || kv2 < nnzv2) { + var score = 0.0 + + if (kv2 >= nnzv2 || (kv1 < nnzv1 && v1Indices(kv1) < v2Indices(kv2))) { + score = v1Values(kv1) + kv1 += 1 + } else if (kv1 >= nnzv1 || (kv2 < nnzv2 && v2Indices(kv2) < v1Indices(kv1))) { + score = v2Values(kv2) + kv2 += 1 + } else { + score = v1Values(kv1) - v2Values(kv2) + kv1 += 1 + kv2 += 1 + } + squaredDistance += score * score + } + + case (v1: SparseVector, v2: DenseVector) if v1.indices.length / v1.size < 0.5 => + squaredDistance = sqdist(v1, v2) + + case (v1: DenseVector, v2: SparseVector) if v2.indices.length / v2.size < 0.5 => + squaredDistance = sqdist(v2, v1) + + // When a SparseVector is approximately dense, we treat it as a DenseVector + case (v1, v2) => + squaredDistance = v1.toArray.zip(v2.toArray).foldLeft(0.0){ (distance, elems) => + val score = elems._1 - elems._2 + distance + score * score + } + } + squaredDistance + } + + /** + * Returns the squared distance between DenseVector and SparseVector. + */ + private[mllib] def sqdist(v1: SparseVector, v2: DenseVector): Double = { + var kv1 = 0 + var kv2 = 0 + val indices = v1.indices + var squaredDistance = 0.0 + var iv1 = indices(kv1) + val nnzv2 = v2.size + + while (kv2 < nnzv2) { + var score = 0.0 + if (kv2 != iv1) { + score = v2(kv2) + } else { + score = v1.values(kv1) - v2(kv2) + if (kv1 < indices.length - 1) { + kv1 += 1 + iv1 = indices(kv1) + } + } + squaredDistance += score * score + kv2 += 1 + } + squaredDistance + } } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index da0da0a168..c7843464a7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -19,8 +19,7 @@ package org.apache.spark.mllib.util import scala.reflect.ClassTag -import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, - squaredDistance => breezeSquaredDistance} +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV} import org.apache.spark.annotation.Experimental import org.apache.spark.SparkContext @@ -28,7 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD import org.apache.spark.util.random.BernoulliCellSampler import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.dot import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext @@ -266,7 +265,7 @@ object MLUtils { } Vectors.fromBreeze(vector1) } - + /** * Returns the squared Euclidean distance between two vectors. The following formula will be used * if it does not introduce too much numerical error: @@ -316,12 +315,10 @@ object MLUtils { val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) / (sqDist + EPSILON) if (precisionBound2 > precision) { - // TODO: breezeSquaredDistance is slow, - // so we should replace it with our own implementation. - sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze) + sqDist = Vectors.sqdist(v1, v2) } } else { - sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze) + sqDist = Vectors.sqdist(v1, v2) } sqDist } -- cgit v1.2.3