aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rwxr-xr-xbin/run-example13
-rwxr-xr-xbin/spark-shell11
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala41
-rw-r--r--docs/configuration.md13
-rw-r--r--docs/mllib-guide.md51
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java1
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java1
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java1
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java1
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala59
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala27
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala29
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala189
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala30
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala158
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala13
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala16
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala16
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala23
44 files changed, 690 insertions, 145 deletions
diff --git a/.gitignore b/.gitignore
index 39635d7eef..3d17899212 100644
--- a/.gitignore
+++ b/.gitignore
@@ -44,4 +44,4 @@ derby.log
dist/
spark-*-bin.tar.gz
unit-tests.log
-lib/
+/lib/
diff --git a/bin/run-example b/bin/run-example
index 2e9d51440b..adba7dd97a 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -76,11 +76,20 @@ else
fi
fi
+# Set JAVA_OPTS to be able to load native libraries and to set heap size
+JAVA_OPTS="$SPARK_JAVA_OPTS"
+JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
+# Load extra JAVA_OPTS from conf/java-opts, if it exists
+if [ -e "$FWDIR/conf/java-opts" ] ; then
+ JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
+fi
+export JAVA_OPTS
+
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: "
- echo "$RUNNER" -cp "$CLASSPATH" "$@"
+ echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
echo
fi
-exec "$RUNNER" -cp "$CLASSPATH" "$@"
+exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
diff --git a/bin/spark-shell b/bin/spark-shell
index e6885b51ef..05a46ee0ca 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -45,13 +45,18 @@ for o in "$@"; do
done
# Set MASTER from spark-env if possible
+DEFAULT_SPARK_MASTER_PORT=7077
if [ -z "$MASTER" ]; then
if [ -e "$FWDIR/conf/spark-env.sh" ]; then
. "$FWDIR/conf/spark-env.sh"
fi
- if [[ "x" != "x$SPARK_MASTER_IP" && "y" != "y$SPARK_MASTER_PORT" ]]; then
- MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
- export MASTER
+ if [ "x" != "x$SPARK_MASTER_IP" ]; then
+ if [ "y" != "y$SPARK_MASTER_PORT" ]; then
+ SPARK_MASTER_PORT="${SPARK_MASTER_PORT}"
+ else
+ SPARK_MASTER_PORT=$DEFAULT_SPARK_MASTER_PORT
+ fi
+ export MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
fi
fi
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 951bfd79d0..45d19bcbfa 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
/** Get all akka conf variables set on this SparkConf */
- def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
+ def getAkkaConf: Seq[(String, String)] =
+ /* This is currently undocumented. If we want to make this public we should consider
+ * nesting options under the spark namespace to avoid conflicts with user akka options.
+ * Otherwise users configuring their own akka code via system properties could mess up
+ * spark's akka options.
+ *
+ * E.g. spark.akka.option.x.y.x = "value"
+ */
+ getAll.filter {case (k, v) => k.startsWith("akka.")}
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 83109d1a6f..30e578dd93 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
val numPartitions =
// listStatus can throw exception if path does not exist.
if (fs.exists(cpath)) {
- val dirContents = fs.listStatus(cpath)
- val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+ val dirContents = fs.listStatus(cpath).map(_.getPath)
+ val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.size
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5ad00a1ed1..e91470800c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(
/** Check whether a task is currently running an attempt on a given host */
private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
- !taskAttempts(taskIndex).exists(_.host == host)
+ taskAttempts(taskIndex).exists(_.host == host)
}
/**
@@ -592,7 +592,7 @@ private[spark] class TaskSetManager(
override def removeSchedulable(schedulable: Schedulable) {}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
- var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
+ var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
sortedTaskSetQueue += this
sortedTaskSetQueue
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 48cec4be41..530712b5df 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -138,6 +138,7 @@ private[spark] class DiskBlockObjectWriter(
fos = null
ts = null
objOut = null
+ initialized = false
}
}
@@ -145,7 +146,8 @@ private[spark] class DiskBlockObjectWriter(
override def commit(): Long = {
if (initialized) {
- // NOTE: Flush the serializer first and then the compressed/buffered output stream
+ // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
+ // serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
@@ -175,7 +177,6 @@ private[spark] class DiskBlockObjectWriter(
}
override def fileSegment(): FileSegment = {
- val bytesWritten = lastValidPosition - initialPosition
new FileSegment(file, initialPosition, bytesWritten)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b24298a5..bb07c8cb13 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
+import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -106,6 +107,15 @@ class ShuffleBlockManager(blockManager: BlockManager) {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
+ // Because of previous failures, the shuffle file may already exist on this machine.
+ // If so, remove it.
+ if (blockFile.exists) {
+ if (blockFile.delete()) {
+ logInfo(s"Removed existing shuffle file $blockFile")
+ } else {
+ logWarning(s"Failed to remove existing shuffle file $blockFile")
+ }
+ }
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b436f0..fb73636162 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
import java.io._
import java.util.Comparator
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
-import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
+import org.apache.spark.io.LZFCompressionCodec
+import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
@@ -153,9 +154,33 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
- val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
+ /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
+ * closes and re-opens serialization and compression streams within each file. This makes some
+ * assumptions about the way that serialization and compression streams work, specifically:
+ *
+ * 1) The serializer input streams do not pre-fetch data from the underlying stream.
+ *
+ * 2) Several compression streams can be opened, written to, and flushed on the write path
+ * while only one compression input stream is created on the read path
+ *
+ * In practice (1) is only true for Java, so we add a special fix below to make it work for
+ * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
+ *
+ * To avoid making these assumptions we should create an intermediate stream that batches
+ * objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
+ * This is a bit tricky because, within each segment, you'd need to track the total number
+ * of bytes written and then re-wind and write it at the beginning of the segment. This will
+ * most likely require using the file channel API.
+ */
+
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
+ def wrapForCompression(outputStream: OutputStream) = {
+ if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream
+ }
+
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
- compressStream, syncWrites)
+ wrapForCompression, syncWrites)
var writer = getNewWriter
var objectsWritten = 0
@@ -168,6 +193,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten == serializerBatchSize) {
writer.commit()
+ writer.close()
+ _diskBytesSpilled += writer.bytesWritten
writer = getNewWriter
objectsWritten = 0
}
@@ -176,8 +203,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten > 0) writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
- _diskBytesSpilled += writer.bytesWritten
writer.close()
+ _diskBytesSpilled += writer.bytesWritten
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file, blockId))
diff --git a/docs/configuration.md b/docs/configuration.md
index 00864906b3..3bb655075f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -98,7 +98,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.default.parallelism</td>
<td>8</td>
<td>
- Default number of tasks to use for distributed shuffle operations (<code>groupByKey</code>,
+ Default number of tasks to use across the cluster for distributed shuffle operations (<code>groupByKey</code>,
<code>reduceByKey</code>, etc) when not set by user.
</td>
</tr>
@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.shuffle.spill.compress</td>
<td>true</td>
<td>
- Whether to compress data spilled during shuffles.
+ Whether to compress data spilled during shuffles. If enabled, spill compression
+ always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
+ regardless of the value of `spark.io.compression.codec`.
</td>
</tr>
<tr>
@@ -379,13 +381,6 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
-<tr>
- <td>akka.x.y....</td>
- <td>value</td>
- <td>
- An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
- </td>
-</tr>
<tr>
<td>spark.shuffle.consolidateFiles</td>
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index a22a22184b..0cc5505b50 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -438,3 +438,54 @@ signals), you can use the trainImplicit method to get better results.
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, 1, 20)
{% endhighlight %}
+
+
+# Singular Value Decomposition
+Singular Value Decomposition for Tall and Skinny matrices.
+Given an *m x n* matrix *A*, we can compute matrices *U, S, V* such that
+
+*A = U * S * V^T*
+
+There is no restriction on m, but we require n^2 doubles to
+fit in memory locally on one machine.
+Further, n should be less than m.
+
+The decomposition is computed by first computing *A^TA = V S^2 V^T*,
+computing SVD locally on that (since n x n is small),
+from which we recover S and V.
+Then we compute U via easy matrix multiplication
+as *U = A * V * S^-1*
+
+Only singular vectors associated with largest k singular values
+are recovered. If there are k
+such values, then the dimensions of the return will be:
+
+* *S* is *k x k* and diagonal, holding the singular values on diagonal.
+* *U* is *m x k* and satisfies U^T*U = eye(k).
+* *V* is *n x k* and satisfies V^TV = eye(k).
+
+All input and output is expected in sparse matrix format, 0-indexed
+as tuples of the form ((i,j),value) all in
+SparseMatrix RDDs. Below is example usage.
+
+{% highlight scala %}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.SVD
+import org.apache.spark.mllib.linalg.SparseMatrix
+import org.apache.spark.mllib.linalg.MatrixEntry
+
+// Load and parse the data file
+val data = sc.textFile("mllib/data/als/test.data").map { line =>
+ val parts = line.split(',')
+ MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
+}
+val m = 4
+val n = 4
+val k = 1
+
+// recover largest singular vector
+val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
+val = decomposed.S.data
+
+println("singular values = " + s.toArray.mkString)
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 7b5a243e26..f061001dd2 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -70,5 +70,6 @@ public final class JavaFlumeEventCount {
}).print();
ssc.start();
+ ssc.awaitTermination();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 04f62ee204..2ffd351b4e 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -104,5 +104,6 @@ public final class JavaKafkaWordCount {
wordCounts.print();
jssc.start();
+ jssc.awaitTermination();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 349d826ab5..7777c9832a 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -84,5 +84,6 @@ public final class JavaNetworkWordCount {
wordCounts.print();
ssc.start();
+ ssc.awaitTermination();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index e2d55f1a4e..26c44620ab 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -80,5 +80,6 @@ public final class JavaQueueStream {
reducedStream.print();
ssc.start();
+ ssc.awaitTermination();
}
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
new file mode 100644
index 0000000000..19676fcc1a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.SVD
+import org.apache.spark.mllib.linalg.MatrixEntry
+import org.apache.spark.mllib.linalg.SparseMatrix
+
+/**
+ * Compute SVD of an example matrix
+ * Input file should be comma separated, 1 indexed of the form
+ * i,j,value
+ * Where i is the column, j the row, and value is the matrix entry
+ *
+ * For example input file, see:
+ * mllib/data/als/test.data (example is 4 x 4)
+ */
+object SparkSVD {
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println("Usage: SparkSVD <master> <file> m n")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SVD",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
+ // Load and parse the data file
+ val data = sc.textFile(args(1)).map { line =>
+ val parts = line.split(',')
+ MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble)
+ }
+ val m = args(2).toInt
+ val n = args(3).toInt
+
+ // recover largest singular vector
+ val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
+ val u = decomposed.U.data
+ val s = decomposed.S.data
+ val v = decomposed.V.data
+
+ println("singular values = " + s.toArray.mkString)
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 5a4aa7f3a2..a5888811cc 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -171,5 +171,6 @@ object ActorWordCount {
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index a59be7899d..11c3aaad3c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -60,5 +60,6 @@ object FlumeEventCount {
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index 704b315ef8..954bcc9b6e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -50,6 +50,7 @@ object HdfsWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 4a3d81c09a..d9cb7326bb 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -61,6 +61,7 @@ object KafkaWordCount {
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 78b49fdcf1..eb61caf8c8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -101,5 +101,6 @@ object MQTTWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 0226475712..5656d487a5 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -54,5 +54,6 @@ object NetworkWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index 99b79c3949..cdd7547d0d 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -61,5 +61,6 @@ object RawNetworkGrep {
union.filter(_.contains("the")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index 8c5d0bd568..aa82bf3c6b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -114,5 +114,6 @@ object RecoverableNetworkWordCount {
createContext(master, ip, port, outputPath)
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 1183eba846..88f1cef89b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -65,5 +65,6 @@ object StatefulNetworkWordCount {
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 483c4d3118..bbd44948b6 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -110,5 +110,6 @@ object TwitterAlgebirdCMS {
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 94c2bf29ac..a0094d460f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -87,5 +87,6 @@ object TwitterAlgebirdHLL {
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 8a70d4a978..896d010c68 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -69,5 +69,6 @@ object TwitterPopularTags {
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 12d2a1084f..85b4ce5e81 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -91,5 +91,6 @@ object ZeroMQWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
new file mode 100644
index 0000000000..416996fcbe
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+/**
+ * Class that represents an entry in a sparse matrix of doubles.
+ *
+ * @param i row index (0 indexing used)
+ * @param j column index (0 indexing used)
+ * @param mval value of entry in matrix
+ */
+case class MatrixEntry(val i: Int, val j: Int, val mval: Double)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
new file mode 100644
index 0000000000..319f82b449
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+/**
+ * Class that represents the SV decomposition of a matrix
+ *
+ * @param U such that A = USV^T
+ * @param S such that A = USV^T
+ * @param V such that A = USV^T
+ */
+case class MatrixSVD(val U: SparseMatrix,
+ val S: SparseMatrix,
+ val V: SparseMatrix)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
new file mode 100644
index 0000000000..e476b53450
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+
+
+/**
+ * Class used to obtain singular value decompositions
+ */
+class SVD {
+ private var k: Int = 1
+
+ /**
+ * Set the number of top-k singular vectors to return
+ */
+ def setK(k: Int): SVD = {
+ this.k = k
+ this
+ }
+
+ /**
+ * Compute SVD using the current set parameters
+ */
+ def compute(matrix: SparseMatrix) : MatrixSVD = {
+ SVD.sparseSVD(matrix, k)
+ }
+}
+
+
+/**
+ * Top-level methods for calling Singular Value Decomposition
+ * NOTE: All matrices are in 0-indexed sparse format RDD[((int, int), value)]
+ */
+object SVD {
+/**
+ * Singular Value Decomposition for Tall and Skinny matrices.
+ * Given an m x n matrix A, this will compute matrices U, S, V such that
+ * A = U * S * V'
+ *
+ * There is no restriction on m, but we require n^2 doubles to fit in memory.
+ * Further, n should be less than m.
+ *
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small),
+ * from which we recover S and V.
+ * Then we compute U via easy matrix multiplication
+ * as U = A * V * S^-1
+ *
+ * Only the k largest singular values and associated vectors are found.
+ * If there are k such values, then the dimensions of the return will be:
+ *
+ * S is k x k and diagonal, holding the singular values on diagonal
+ * U is m x k and satisfies U'U = eye(k)
+ * V is n x k and satisfies V'V = eye(k)
+ *
+ * All input and output is expected in sparse matrix format, 0-indexed
+ * as tuples of the form ((i,j),value) all in RDDs using the
+ * SparseMatrix class
+ *
+ * @param matrix sparse matrix to factorize
+ * @param k Recover k singular values and vectors
+ * @return Three sparse matrices: U, S, V such that A = USV^T
+ */
+ def sparseSVD(
+ matrix: SparseMatrix,
+ k: Int)
+ : MatrixSVD =
+ {
+ val data = matrix.data
+ val m = matrix.m
+ val n = matrix.n
+
+ if (m < n || m <= 0 || n <= 0) {
+ throw new IllegalArgumentException("Expecting a tall and skinny matrix")
+ }
+
+ if (k < 1 || k > n) {
+ throw new IllegalArgumentException("Must request up to n singular values")
+ }
+
+ // Compute A^T A, assuming rows are sparse enough to fit in memory
+ val rows = data.map(entry =>
+ (entry.i, (entry.j, entry.mval))).groupByKey()
+ val emits = rows.flatMap{ case (rowind, cols) =>
+ cols.flatMap{ case (colind1, mval1) =>
+ cols.map{ case (colind2, mval2) =>
+ ((colind1, colind2), mval1*mval2) } }
+ }.reduceByKey(_+_)
+
+ // Construct jblas A^T A locally
+ val ata = DoubleMatrix.zeros(n, n)
+ for (entry <- emits.toArray) {
+ ata.put(entry._1._1, entry._1._2, entry._2)
+ }
+
+ // Since A^T A is small, we can compute its SVD directly
+ val svd = Singular.sparseSVD(ata)
+ val V = svd(0)
+ val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)
+
+ if (sigmas.size < k) {
+ throw new Exception("Not enough singular values to return")
+ }
+
+ val sigma = sigmas.take(k)
+
+ val sc = data.sparkContext
+
+ // prepare V for returning
+ val retVdata = sc.makeRDD(
+ Array.tabulate(V.rows, sigma.length){ (i,j) =>
+ MatrixEntry(i, j, V.get(i,j)) }.flatten)
+ val retV = SparseMatrix(retVdata, V.rows, sigma.length)
+
+ val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
+ x => MatrixEntry(x, x, sigma(x))})
+
+ val retS = SparseMatrix(retSdata, sigma.length, sigma.length)
+
+ // Compute U as U = A V S^-1
+ // turn V S^-1 into an RDD as a sparse matrix
+ val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
+ { (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten)
+
+ // Multiply A by VS^-1
+ val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
+ val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
+ val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+ => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+ .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
+ val retU = SparseMatrix(retUdata, m, sigma.length)
+
+ MatrixSVD(retU, retS, retV)
+ }
+
+
+ def main(args: Array[String]) {
+ if (args.length < 8) {
+ println("Usage: SVD <master> <matrix_file> <m> <n> " +
+ "<k> <output_U_file> <output_S_file> <output_V_file>")
+ System.exit(1)
+ }
+
+ val (master, inputFile, m, n, k, output_u, output_s, output_v) =
+ (args(0), args(1), args(2).toInt, args(3).toInt,
+ args(4).toInt, args(5), args(6), args(7))
+
+ val sc = new SparkContext(master, "SVD")
+
+ val rawdata = sc.textFile(inputFile)
+ val data = rawdata.map { line =>
+ val parts = line.split(',')
+ MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
+ }
+
+ val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
+ val u = decomposed.U.data
+ val s = decomposed.S.data
+ val v = decomposed.V.data
+
+ println("Computed " + s.toArray.length + " singular values and vectors")
+ u.saveAsTextFile(output_u)
+ s.saveAsTextFile(output_s)
+ v.saveAsTextFile(output_v)
+ System.exit(0)
+ }
+}
+
+
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
new file mode 100644
index 0000000000..cbd1a2a5a4
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Class that represents a sparse matrix
+ *
+ * @param data RDD of nonzero entries
+ * @param m number of rows
+ * @param n numner of columns
+ */
+case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 02ede71137..05322b024d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.LocalSparkContext
object LogisticRegressionSuite {
@@ -66,19 +67,7 @@ object LogisticRegressionSuite {
}
-class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
-
+class LogisticRegressionSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
prediction != expected.label
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index b615f76e66..9dd6c79ee6 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.util.LocalSparkContext
object NaiveBayesSuite {
@@ -59,17 +59,7 @@ object NaiveBayesSuite {
}
}
-class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class NaiveBayesSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOfPredictions = predictions.zip(input).count {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index 3357b86f9b..bc7abb568a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -25,8 +25,9 @@ import org.scalatest.FunSuite
import org.jblas.DoubleMatrix
-import org.apache.spark.{SparkException, SparkContext}
+import org.apache.spark.SparkException
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.LocalSparkContext
object SVMSuite {
@@ -58,17 +59,7 @@ object SVMSuite {
}
-class SVMSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class SVMSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
index 73657cac89..4ef1d1f64f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
@@ -21,20 +21,9 @@ package org.apache.spark.mllib.clustering
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.util.LocalSparkContext
-
-class KMeansSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class KMeansSuite extends FunSuite with LocalSparkContext {
val EPSILON = 1e-4
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
new file mode 100644
index 0000000000..32f3f141cd
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.jblas._
+
+class SVDSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ val EPSILON = 1e-4
+
+ // Return jblas matrix from sparse matrix RDD
+ def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
+ val data = matrix.data
+ val m = matrix.m
+ val n = matrix.n
+ val ret = DoubleMatrix.zeros(m, n)
+ matrix.data.toArray.map(x => ret.put(x.i, x.j, x.mval))
+ ret
+ }
+
+ def assertMatrixEquals(a: DoubleMatrix, b: DoubleMatrix) {
+ assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch")
+ val diff = DoubleMatrix.zeros(a.rows, a.columns)
+ Array.tabulate(a.rows, a.columns){(i, j) =>
+ diff.put(i, j,
+ Math.min(Math.abs(a.get(i, j) - b.get(i, j)),
+ Math.abs(a.get(i, j) + b.get(i, j)))) }
+ assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1)
+ }
+
+ test("full rank matrix svd") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten )
+
+ val a = SparseMatrix(data, m, n)
+
+ val decomposed = SVD.sparseSVD(a, n)
+ val u = decomposed.U
+ val v = decomposed.V
+ val s = decomposed.S
+
+ val densea = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
+ assertMatrixEquals(retv, svd(2))
+
+ // check multiplication guarantee
+ assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ }
+
+ test("rank one matrix svd") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) =>
+ MatrixEntry(a, b, 1.0) }.flatten )
+ val k = 1
+
+ val a = SparseMatrix(data, m, n)
+
+ val decomposed = SVD.sparseSVD(a, k)
+ val u = decomposed.U
+ val s = decomposed.S
+ val v = decomposed.V
+ val retrank = s.data.toArray.length
+
+ assert(retrank == 1, "rank returned not one")
+
+ val densea = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0).getColumn(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixEquals(retv, svd(2).getColumn(0))
+
+ // check multiplication guarantee
+ assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ }
+
+ test("truncated with k") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
+ val a = SparseMatrix(data, m, n)
+
+ val k = 1 // only one svalue above this
+
+ val decomposed = SVD.sparseSVD(a, k)
+ val u = decomposed.U
+ val s = decomposed.S
+ val v = decomposed.V
+ val retrank = s.data.toArray.length
+
+ val densea = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
+
+ assert(retrank == 1, "rank returned not one")
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0).getColumn(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixEquals(retv, svd(2).getColumn(0))
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
index a6028a1e98..a453de6767 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.LocalSparkContext
object GradientDescentSuite {
@@ -62,17 +63,7 @@ object GradientDescentSuite {
}
}
-class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
test("Assert the loss is decreasing.") {
val nPoints = 10000
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 4e8dbde658..5dcec7dc3e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -23,10 +23,10 @@ import scala.util.Random
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-
import org.jblas._
+import org.apache.spark.mllib.util.LocalSparkContext
+
object ALSSuite {
def generateRatingsAsJavaList(
@@ -73,17 +73,7 @@ object ALSSuite {
}
-class ALSSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class ALSSuite extends FunSuite with LocalSparkContext {
test("rank-1 matrices") {
testALS(50, 100, 1, 15, 0.7, 0.3)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index b2c8df97a8..64e4cbb860 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -22,21 +22,9 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
-import org.apache.spark.mllib.util.LinearDataGenerator
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
-
-class LassoSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class LassoSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 406afbaa3e..281f9df36d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -20,20 +20,9 @@ package org.apache.spark.mllib.regression
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.util.LinearDataGenerator
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
-class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class LinearRegressionSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index 1d6a10b66e..67dd06cc0f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -22,20 +22,10 @@ import org.jblas.DoubleMatrix
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.util.LinearDataGenerator
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
-class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = {
predictions.zip(input).map { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala
new file mode 100644
index 0000000000..7d840043e5
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala
@@ -0,0 +1,23 @@
+package org.apache.spark.mllib.util
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkContext
+
+trait LocalSparkContext extends BeforeAndAfterAll { self: Suite =>
+ @transient var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ if (sc != null) {
+ sc.stop()
+ }
+ System.clearProperty("spark.driver.port")
+ super.afterAll()
+ }
+}