diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-03-23 17:34:02 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-03-23 17:34:02 -0700 |
commit | 80c29689ae3b589254a571da3ddb5f9c866ae534 (patch) | |
tree | 1c60763332b65c974ca042ea3306c896e8cc88e5 /mllib/src/test | |
parent | 8265dc7739caccc59bc2456b2df055ca96337fe4 (diff) | |
download | spark-80c29689ae3b589254a571da3ddb5f9c866ae534.tar.gz spark-80c29689ae3b589254a571da3ddb5f9c866ae534.tar.bz2 spark-80c29689ae3b589254a571da3ddb5f9c866ae534.zip |
[SPARK-1212] Adding sparse data support and update KMeans
Continue our discussions from https://github.com/apache/incubator-spark/pull/575
This PR is WIP because it depends on a SNAPSHOT version of breeze.
Per previous discussions and benchmarks, I switched to breeze for linear algebra operations. @dlwh and I made some improvements to breeze to keep its performance comparable to the bare-bone implementation, including norm computation and squared distance. This is why this PR needs to depend on a SNAPSHOT version of breeze.
@fommil , please find the notice of using netlib-core in `NOTICE`. This is following Apache's instructions on appropriate labeling.
I'm going to update this PR to include:
1. Fast distance computation: using `\|a\|_2^2 + \|b\|_2^2 - 2 a^T b` when it doesn't introduce too much numerical error. The squared norms are pre-computed. Otherwise, computing the distance between the center (dense) and a point (possibly sparse) always takes O(n) time.
2. Some numbers about the performance.
3. A released version of breeze. @dlwh, a minor release of breeze will help this PR get merged early. Do you mind sharing breeze's release plan? Thanks!
Author: Xiangrui Meng <meng@databricks.com>
Closes #117 from mengxr/sparse-kmeans and squashes the following commits:
67b368d [Xiangrui Meng] fix SparseVector.toArray
5eda0de [Xiangrui Meng] update NOTICE
67abe31 [Xiangrui Meng] move ArrayRDDs to mllib.rdd
1da1033 [Xiangrui Meng] remove dependency on commons-math3 and compute EPSILON directly
9bb1b31 [Xiangrui Meng] optimize SparseVector.toArray
226d2cd [Xiangrui Meng] update Java friendly methods in Vectors
238ba34 [Xiangrui Meng] add VectorRDDs with a converter from RDD[Array[Double]]
b28ba2f [Xiangrui Meng] add toArray to Vector
e69b10c [Xiangrui Meng] remove examples/JavaKMeans.java, which is replaced by mllib/examples/JavaKMeans.java
72bde33 [Xiangrui Meng] clean up code for distance computation
712cb88 [Xiangrui Meng] make Vectors.sparse Java friendly
27858e4 [Xiangrui Meng] update breeze version to 0.7
07c3cf2 [Xiangrui Meng] change Mahout to breeze in doc use a simple lower bound to avoid unnecessary distance computation
6f5cdde [Xiangrui Meng] fix a bug in filtering finished runs
42512f2 [Xiangrui Meng] Merge branch 'master' into sparse-kmeans
d6e6c07 [Xiangrui Meng] add predict(RDD[Vector]) to KMeansModel
42b4e50 [Xiangrui Meng] line feed at the end
a4ace73 [Xiangrui Meng] Merge branch 'fast-dist' into sparse-kmeans
3ed1a24 [Xiangrui Meng] add doc to BreezeVectorWithSquaredNorm
0107e19 [Xiangrui Meng] update NOTICE
87bc755 [Xiangrui Meng] tuned the KMeans code: changed some for loops to while, use view to avoid copying arrays
0ff8046 [Xiangrui Meng] update KMeans to use fastSquaredDistance
f355411 [Xiangrui Meng] add BreezeVectorWithSquaredNorm case class
ab74f67 [Xiangrui Meng] add fastSquaredDistance for KMeans
4e7d5ca [Xiangrui Meng] minor style update
07ffaf2 [Xiangrui Meng] add dense/sparse vector data models and conversions to/from breeze vectors use breeze to implement KMeans in order to support both dense and sparse data
Diffstat (limited to 'mllib/src/test')
8 files changed, 427 insertions, 125 deletions
diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 33b99f4bd3..49a614bd90 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -18,16 +18,19 @@ package org.apache.spark.mllib.clustering; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + +import com.google.common.collect.Lists; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; public class JavaKMeansSuite implements Serializable { private transient JavaSparkContext sc; @@ -44,72 +47,45 @@ public class JavaKMeansSuite implements Serializable { System.clearProperty("spark.driver.port"); } - // L1 distance between two points - double distance1(double[] v1, double[] v2) { - double distance = 0.0; - for (int i = 0; i < v1.length; ++i) { - distance = Math.max(distance, Math.abs(v1[i] - v2[i])); - } - return distance; - } - - // Assert that two sets of points are equal, within EPSILON tolerance - void assertSetsEqual(double[][] v1, double[][] v2) { - double EPSILON = 1e-4; - Assert.assertTrue(v1.length == v2.length); - for (int i = 0; i < v1.length; ++i) { - double minDistance = Double.MAX_VALUE; - for (int j = 0; j < v2.length; ++j) { - minDistance = Math.min(minDistance, distance1(v1[i], v2[j])); - } - Assert.assertTrue(minDistance <= EPSILON); - } - - for (int i = 0; i < v2.length; ++i) { - double minDistance = Double.MAX_VALUE; - for (int j = 0; j < v1.length; ++j) { - minDistance = Math.min(minDistance, distance1(v2[i], v1[j])); - } - Assert.assertTrue(minDistance <= EPSILON); - } - } - - @Test public void runKMeansUsingStaticMethods() { - List<double[]> points = new ArrayList<double[]>(); - points.add(new double[]{1.0, 2.0, 6.0}); - points.add(new double[]{1.0, 3.0, 0.0}); - points.add(new double[]{1.0, 4.0, 6.0}); + List<Vector> points = Lists.newArrayList( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) + ); - double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0); - JavaRDD<double[]> data = sc.parallelize(points, 2); - KMeansModel model = KMeans.train(data.rdd(), 1, 1); - assertSetsEqual(model.clusterCenters(), expectedCenter); + JavaRDD<Vector> data = sc.parallelize(points, 2); + KMeansModel model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.K_MEANS_PARALLEL()); + assertEquals(1, model.clusterCenters().length); + assertEquals(expectedCenter, model.clusterCenters()[0]); model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.RANDOM()); - assertSetsEqual(model.clusterCenters(), expectedCenter); + assertEquals(expectedCenter, model.clusterCenters()[0]); } @Test public void runKMeansUsingConstructor() { - List<double[]> points = new ArrayList<double[]>(); - points.add(new double[]{1.0, 2.0, 6.0}); - points.add(new double[]{1.0, 3.0, 0.0}); - points.add(new double[]{1.0, 4.0, 6.0}); + List<Vector> points = Lists.newArrayList( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) + ); - double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0); - JavaRDD<double[]> data = sc.parallelize(points, 2); + JavaRDD<Vector> data = sc.parallelize(points, 2); KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); - assertSetsEqual(model.clusterCenters(), expectedCenter); - - model = new KMeans().setK(1) - .setMaxIterations(1) - .setRuns(1) - .setInitializationMode(KMeans.RANDOM()) - .run(data.rdd()); - assertSetsEqual(model.clusterCenters(), expectedCenter); + assertEquals(1, model.clusterCenters().length); + assertEquals(expectedCenter, model.clusterCenters()[0]); + + model = new KMeans() + .setK(1) + .setMaxIterations(1) + .setInitializationMode(KMeans.RANDOM()) + .run(data.rdd()); + assertEquals(expectedCenter, model.clusterCenters()[0]); } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java new file mode 100644 index 0000000000..2c4d795f96 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg; + +import java.io.Serializable; + +import com.google.common.collect.Lists; + +import scala.Tuple2; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class JavaVectorsSuite implements Serializable { + + @Test + public void denseArrayConstruction() { + Vector v = Vectors.dense(1.0, 2.0, 3.0); + assertArrayEquals(new double[]{1.0, 2.0, 3.0}, v.toArray(), 0.0); + } + + @Test + public void sparseArrayConstruction() { + Vector v = Vectors.sparse(3, Lists.newArrayList( + new Tuple2<Integer, Double>(0, 2.0), + new Tuple2<Integer, Double>(2, 3.0))); + assertArrayEquals(new double[]{2.0, 0.0, 3.0}, v.toArray(), 0.0); + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 4ef1d1f64f..560a4ad71a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -17,127 +17,139 @@ package org.apache.spark.mllib.clustering - -import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.linalg.Vectors class KMeansSuite extends FunSuite with LocalSparkContext { - val EPSILON = 1e-4 - import KMeans.{RANDOM, K_MEANS_PARALLEL} - def prettyPrint(point: Array[Double]): String = point.mkString("(", ", ", ")") - - def prettyPrint(points: Array[Array[Double]]): String = { - points.map(prettyPrint).mkString("(", "; ", ")") - } - - // L1 distance between two points - def distance1(v1: Array[Double], v2: Array[Double]): Double = { - v1.zip(v2).map{ case (a, b) => math.abs(a-b) }.max - } - - // Assert that two vectors are equal within tolerance EPSILON - def assertEqual(v1: Array[Double], v2: Array[Double]) { - def errorMessage = prettyPrint(v1) + " did not equal " + prettyPrint(v2) - assert(v1.length == v2.length, errorMessage) - assert(distance1(v1, v2) <= EPSILON, errorMessage) - } - - // Assert that two sets of points are equal, within EPSILON tolerance - def assertSetsEqual(set1: Array[Array[Double]], set2: Array[Array[Double]]) { - def errorMessage = prettyPrint(set1) + " did not equal " + prettyPrint(set2) - assert(set1.length == set2.length, errorMessage) - for (v <- set1) { - val closestDistance = set2.map(w => distance1(v, w)).min - if (closestDistance > EPSILON) { - fail(errorMessage) - } - } - for (v <- set2) { - val closestDistance = set1.map(w => distance1(v, w)).min - if (closestDistance > EPSILON) { - fail(errorMessage) - } - } - } - test("single cluster") { val data = sc.parallelize(Array( - Array(1.0, 2.0, 6.0), - Array(1.0, 3.0, 0.0), - Array(1.0, 4.0, 6.0) + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) )) + val center = Vectors.dense(1.0, 3.0, 4.0) + // No matter how many runs or iterations we use, we should get one cluster, // centered at the mean of the points var model = KMeans.train(data, k=1, maxIterations=1) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=2) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train( data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) } test("single cluster with big dataset") { val smallData = Array( - Array(1.0, 2.0, 6.0), - Array(1.0, 3.0, 0.0), - Array(1.0, 4.0, 6.0) + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) ) val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4) // No matter how many runs or iterations we use, we should get one cluster, // centered at the mean of the points + val center = Vectors.dense(1.0, 3.0, 4.0) + var model = KMeans.train(data, k=1, maxIterations=1) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.size === 1) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=2) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) + } + + test("single cluster with sparse data") { + + val n = 10000 + val data = sc.parallelize((1 to 100).flatMap { i => + val x = i / 1000.0 + Array( + Vectors.sparse(n, Seq((0, 1.0 + x), (1, 2.0), (2, 6.0))), + Vectors.sparse(n, Seq((0, 1.0 - x), (1, 2.0), (2, 6.0))), + Vectors.sparse(n, Seq((0, 1.0), (1, 3.0 + x))), + Vectors.sparse(n, Seq((0, 1.0), (1, 3.0 - x))), + Vectors.sparse(n, Seq((0, 1.0), (1, 4.0), (2, 6.0 + x))), + Vectors.sparse(n, Seq((0, 1.0), (1, 4.0), (2, 6.0 - x))) + ) + }, 4) + + data.persist() + + // No matter how many runs or iterations we use, we should get one cluster, + // centered at the mean of the points + + val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0))) + + var model = KMeans.train(data, k=1, maxIterations=1) + assert(model.clusterCenters.head === center) + + model = KMeans.train(data, k=1, maxIterations=2) + assert(model.clusterCenters.head === center) + + model = KMeans.train(data, k=1, maxIterations=5) + assert(model.clusterCenters.head === center) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) + assert(model.clusterCenters.head === center) + + model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) + assert(model.clusterCenters.head === center) + + data.unpersist() } test("k-means|| initialization") { - val points = Array( - Array(1.0, 2.0, 6.0), - Array(1.0, 3.0, 0.0), - Array(1.0, 4.0, 6.0), - Array(1.0, 0.0, 1.0), - Array(1.0, 1.0, 1.0) + val points = Seq( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0), + Vectors.dense(1.0, 0.0, 1.0), + Vectors.dense(1.0, 1.0, 1.0) ) val rdd = sc.parallelize(points) @@ -146,14 +158,39 @@ class KMeansSuite extends FunSuite with LocalSparkContext { // unselected point as long as it hasn't yet selected all of them var model = KMeans.train(rdd, k=5, maxIterations=1) - assertSetsEqual(model.clusterCenters, points) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) // Iterations of Lloyd's should not change the answer either model = KMeans.train(rdd, k=5, maxIterations=10) - assertSetsEqual(model.clusterCenters, points) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) // Neither should more runs model = KMeans.train(rdd, k=5, maxIterations=10, runs=5) - assertSetsEqual(model.clusterCenters, points) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) + } + + test("two clusters") { + val points = Seq( + Vectors.dense(0.0, 0.0), + Vectors.dense(0.0, 0.1), + Vectors.dense(0.1, 0.0), + Vectors.dense(9.0, 0.0), + Vectors.dense(9.0, 0.2), + Vectors.dense(9.2, 0.0) + ) + val rdd = sc.parallelize(points, 3) + + for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { + // Two iterations are sufficient no matter where the initial centers are. + val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode) + + val predicts = model.predict(rdd).collect() + + assert(predicts(0) === predicts(1)) + assert(predicts(0) === predicts(2)) + assert(predicts(3) === predicts(4)) + assert(predicts(3) === predicts(5)) + assert(predicts(0) != predicts(3)) + } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala new file mode 100644 index 0000000000..aacaa30084 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV} + +/** + * Test Breeze vector conversions. + */ +class BreezeVectorConversionSuite extends FunSuite { + + val arr = Array(0.1, 0.2, 0.3, 0.4) + val n = 20 + val indices = Array(0, 3, 5, 10, 13) + val values = Array(0.1, 0.5, 0.3, -0.8, -1.0) + + test("dense to breeze") { + val vec = Vectors.dense(arr) + assert(vec.toBreeze === new BDV[Double](arr)) + } + + test("sparse to breeze") { + val vec = Vectors.sparse(n, indices, values) + assert(vec.toBreeze === new BSV[Double](indices, values, n)) + } + + test("dense breeze to vector") { + val breeze = new BDV[Double](arr) + val vec = Vectors.fromBreeze(breeze).asInstanceOf[DenseVector] + assert(vec.size === arr.length) + assert(vec.values.eq(arr), "should not copy data") + } + + test("sparse breeze to vector") { + val breeze = new BSV[Double](indices, values, n) + val vec = Vectors.fromBreeze(breeze).asInstanceOf[SparseVector] + assert(vec.size === n) + assert(vec.indices.eq(indices), "should not copy data") + assert(vec.values.eq(values), "should not copy data") + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala new file mode 100644 index 0000000000..8a200310e0 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.scalatest.FunSuite + +class VectorsSuite extends FunSuite { + + val arr = Array(0.1, 0.0, 0.3, 0.4) + val n = 4 + val indices = Array(0, 2, 3) + val values = Array(0.1, 0.3, 0.4) + + test("dense vector construction with varargs") { + val vec = Vectors.dense(arr).asInstanceOf[DenseVector] + assert(vec.size === arr.length) + assert(vec.values.eq(arr)) + } + + test("dense vector construction from a double array") { + val vec = Vectors.dense(arr).asInstanceOf[DenseVector] + assert(vec.size === arr.length) + assert(vec.values.eq(arr)) + } + + test("sparse vector construction") { + val vec = Vectors.sparse(n, indices, values).asInstanceOf[SparseVector] + assert(vec.size === n) + assert(vec.indices.eq(indices)) + assert(vec.values.eq(values)) + } + + test("sparse vector construction with unordered elements") { + val vec = Vectors.sparse(n, indices.zip(values).reverse).asInstanceOf[SparseVector] + assert(vec.size === n) + assert(vec.indices === indices) + assert(vec.values === values) + } + + test("dense to array") { + val vec = Vectors.dense(arr).asInstanceOf[DenseVector] + assert(vec.toArray.eq(arr)) + } + + test("sparse to array") { + val vec = Vectors.sparse(n, indices, values).asInstanceOf[SparseVector] + assert(vec.toArray === arr) + } + + test("vector equals") { + val dv1 = Vectors.dense(arr.clone()) + val dv2 = Vectors.dense(arr.clone()) + val sv1 = Vectors.sparse(n, indices.clone(), values.clone()) + val sv2 = Vectors.sparse(n, indices.clone(), values.clone()) + + val vectors = Seq(dv1, dv2, sv1, sv2) + + for (v <- vectors; u <- vectors) { + assert(v === u) + assert(v.## === u.##) + } + + val another = Vectors.dense(0.1, 0.2, 0.3, 0.4) + + for (v <- vectors) { + assert(v != another) + assert(v.## != another.##) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala new file mode 100644 index 0000000000..692f025e95 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDsSuite.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.LocalSparkContext + +class VectorRDDsSuite extends FunSuite with LocalSparkContext { + + test("from array rdd") { + val data = Seq(Array(1.0, 2.0), Array(3.0, 4.0)) + val arrayRdd = sc.parallelize(data, 2) + val vectorRdd = VectorRDDs.fromArrayRDD(arrayRdd) + assert(arrayRdd.collect().map(v => Vectors.dense(v)) === vectorRdd.collect()) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala index 7d840043e5..212fbe9288 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.util import org.scalatest.Suite diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala new file mode 100644 index 0000000000..60f053b381 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNorm, + squaredDistance => breezeSquaredDistance} + +import org.apache.spark.mllib.util.MLUtils._ + +class MLUtilsSuite extends FunSuite { + + test("epsilon computation") { + assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") + assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.") + } + + test("fast squared distance") { + val a = (30 to 0 by -1).map(math.pow(2.0, _)).toArray + val n = a.length + val v1 = new BDV[Double](a) + val norm1 = breezeNorm(v1, 2.0) + val precision = 1e-6 + for (m <- 0 until n) { + val indices = (0 to m).toArray + val values = indices.map(i => a(i)) + val v2 = new BSV[Double](indices, values, n) + val norm2 = breezeNorm(v2, 2.0) + val squaredDist = breezeSquaredDistance(v1, v2) + val fastSquaredDist1 = fastSquaredDistance(v1, norm1, v2, norm2, precision) + assert((fastSquaredDist1 - squaredDist) <= precision * squaredDist, s"failed with m = $m") + val fastSquaredDist2 = fastSquaredDistance(v1, norm1, v2.toDenseVector, norm2, precision) + assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m") + } + } +} |