aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDB Tsai <dbtsai@alpinenow.com>2014-12-02 11:40:43 +0800
committerXiangrui Meng <meng@databricks.com>2014-12-02 11:40:43 +0800
commit64f3175bf976f5a28e691cedc7a4b333709e0c58 (patch)
tree5e9f414bb51f79f7de184909c82fbc7c90e5d2ae
parentb0a46d899541ec17db090aac6f9ea1b287ee9331 (diff)
downloadspark-64f3175bf976f5a28e691cedc7a4b333709e0c58.tar.gz
spark-64f3175bf976f5a28e691cedc7a4b333709e0c58.tar.bz2
spark-64f3175bf976f5a28e691cedc7a4b333709e0c58.zip
[SPARK-4611][MLlib] Implement the efficient vector norm
The vector norm in breeze is implemented by `activeIterator` which is known to be very slow. In this PR, an efficient vector norm is implemented, and with this API, `Normalizer` and `k-means` have big performance improvement. Here is the benchmark against mnist8m dataset. a) `Normalizer` Before DenseVector: 68.25secs SparseVector: 17.01secs With this PR DenseVector: 12.71secs SparseVector: 2.73secs b) `k-means` Before DenseVector: 83.46secs SparseVector: 61.60secs With this PR DenseVector: 70.04secs SparseVector: 59.05secs Author: DB Tsai <dbtsai@alpinenow.com> Closes #3462 from dbtsai/norm and squashes the following commits: 63c7165 [DB Tsai] typo 0c3637f [DB Tsai] add import org.apache.spark.SparkContext._ back 6fa616c [DB Tsai] address feedback 9b7cb56 [DB Tsai] move norm to static method 0b632e6 [DB Tsai] kmeans dbed124 [DB Tsai] style c1a877c [DB Tsai] first commit
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala51
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala24
4 files changed, 79 insertions, 6 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 34ea0de706..0f8dee58d8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer
-import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}
+import breeze.linalg.{DenseVector => BDV, Vector => BV}
import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
@@ -125,7 +125,7 @@ class KMeans private (
}
// Compute squared norms and cache them.
- val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
+ val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) =>
new BreezeVectorWithNorm(v, norm)
@@ -425,7 +425,7 @@ object KMeans {
private[clustering]
class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable {
- def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0))
+ def this(vector: BV[Double]) = this(vector, Vectors.norm(Vectors.fromBreeze(vector), 2.0))
def this(array: Array[Double]) = this(new BDV[Double](array))
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
index a9c2e23717..1ced26a9b7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
@@ -17,8 +17,6 @@
package org.apache.spark.mllib.feature
-import breeze.linalg.{norm => brzNorm}
-
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
@@ -47,7 +45,7 @@ class Normalizer(p: Double) extends VectorTransformer {
* @return normalized vector. If the norm of the input is zero, it will return the input vector.
*/
override def transform(vector: Vector): Vector = {
- val norm = brzNorm(vector.toBreeze, p)
+ val norm = Vectors.norm(vector, p)
if (norm != 0.0) {
// For dense vector, we've to allocate new memory for new output vector.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index c6d5fe5bc6..47d1a76fa3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -261,6 +261,57 @@ object Vectors {
sys.error("Unsupported Breeze vector type: " + v.getClass.getName)
}
}
+
+ /**
+ * Returns the p-norm of this vector.
+ * @param vector input vector.
+ * @param p norm.
+ * @return norm in L^p^ space.
+ */
+ private[spark] def norm(vector: Vector, p: Double): Double = {
+ require(p >= 1.0)
+ val values = vector match {
+ case dv: DenseVector => dv.values
+ case sv: SparseVector => sv.values
+ case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+ }
+ val size = values.size
+
+ if (p == 1) {
+ var sum = 0.0
+ var i = 0
+ while (i < size) {
+ sum += math.abs(values(i))
+ i += 1
+ }
+ sum
+ } else if (p == 2) {
+ var sum = 0.0
+ var i = 0
+ while (i < size) {
+ sum += values(i) * values(i)
+ i += 1
+ }
+ math.sqrt(sum)
+ } else if (p == Double.PositiveInfinity) {
+ var max = 0.0
+ var i = 0
+ while (i < size) {
+ val value = math.abs(values(i))
+ if (value > max) max = value
+ i += 1
+ }
+ max
+ } else {
+ var sum = 0.0
+ var i = 0
+ while (i < size) {
+ sum += math.pow(math.abs(values(i)), p)
+ i += 1
+ }
+ math.pow(sum, 1.0 / p)
+ }
+ }
}
/**
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 9492f604af..f99f014509 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -21,6 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite
import org.apache.spark.SparkException
+import org.apache.spark.mllib.util.TestingUtils._
class VectorsSuite extends FunSuite {
@@ -197,4 +198,27 @@ class VectorsSuite extends FunSuite {
assert(svMap.get(2) === Some(3.1))
assert(svMap.get(3) === Some(0.0))
}
+
+ test("vector p-norm") {
+ val dv = Vectors.dense(0.0, -1.2, 3.1, 0.0, -4.5, 1.9)
+ val sv = Vectors.sparse(6, Seq((1, -1.2), (2, 3.1), (3, 0.0), (4, -4.5), (5, 1.9)))
+
+ assert(Vectors.norm(dv, 1.0) ~== dv.toArray.foldLeft(0.0)((a, v) =>
+ a + math.abs(v)) relTol 1E-8)
+ assert(Vectors.norm(sv, 1.0) ~== sv.toArray.foldLeft(0.0)((a, v) =>
+ a + math.abs(v)) relTol 1E-8)
+
+ assert(Vectors.norm(dv, 2.0) ~== math.sqrt(dv.toArray.foldLeft(0.0)((a, v) =>
+ a + v * v)) relTol 1E-8)
+ assert(Vectors.norm(sv, 2.0) ~== math.sqrt(sv.toArray.foldLeft(0.0)((a, v) =>
+ a + v * v)) relTol 1E-8)
+
+ assert(Vectors.norm(dv, Double.PositiveInfinity) ~== dv.toArray.map(math.abs).max relTol 1E-8)
+ assert(Vectors.norm(sv, Double.PositiveInfinity) ~== sv.toArray.map(math.abs).max relTol 1E-8)
+
+ assert(Vectors.norm(dv, 3.7) ~== math.pow(dv.toArray.foldLeft(0.0)((a, v) =>
+ a + math.pow(math.abs(v), 3.7)), 1.0 / 3.7) relTol 1E-8)
+ assert(Vectors.norm(sv, 3.7) ~== math.pow(sv.toArray.foldLeft(0.0)((a, v) =>
+ a + math.pow(math.abs(v), 3.7)), 1.0 / 3.7) relTol 1E-8)
+ }
}